Class RabbitMqIO

java.lang.Object
org.apache.beam.sdk.io.rabbitmq.RabbitMqIO

public class RabbitMqIO extends Object
A IO to publish or consume messages with a RabbitMQ broker.

Documentation in this module tends to reference interacting with a "queue" vs interacting with an "exchange". AMQP doesn't technically work this way. For readers, notes on reading from/writing to a "queue" implies the "default exchange" is in use, operating as a "direct exchange". Notes on interacting with an "exchange" are more flexible and are generally more applicable.

Consuming messages from RabbitMQ server

RabbitMqIO RabbitMqIO.Read returns an unbounded PCollection containing RabbitMQ messages body (as byte[]) wrapped as RabbitMqMessage.

To configure a RabbitMQ source, you have to provide a RabbitMQ URI to connect to a RabbitMQ broker. The following example illustrates various options for configuring the source, reading from a named queue on the default exchange:


 PCollection<RabbitMqMessage> messages = pipeline.apply(
   RabbitMqIO.read().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE"))

 

Often one will want to read from an exchange. The exchange can be declared by Beam or can be pre-existing. The supplied routingKey has variable functionality depending on the exchange type. As examples:


  // reading from an fanout (pubsub) exchange, declared (non-durable) by RabbitMqIO.
  // Note the routingKey is 'null' as a fanout exchange publishes all messages to
  // all queues, and the specified binding will be ignored
 	PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
 			.withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", null));

 	// reading from an existing topic exchange named 'EVENTS'
 	// this will use a dynamically-created, non-durable queue subscribing to all
 	// messages with a routing key beginning with 'users.'
 	PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
 			.withUri("amqp://user:password@localhost:5672").withExchange("EVENTS", "users.#"));
 

Publishing messages to RabbitMQ server

RabbitMqIO RabbitMqIO.Write can send RabbitMqMessage to a RabbitMQ server queue or exchange.

As for the RabbitMqIO.Read, the RabbitMqIO.Write is configured with a RabbitMQ URI.

Examples


 // Publishing to a named, non-durable exchange, declared by Beam:
 pipeline
   .apply(...) // provide PCollection<RabbitMqMessage>
   .apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout"));

 // Publishing to an existing exchange
 pipeline
   .apply(...) // provide PCollection<RabbitMqMessage>
   .apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE"));

 // Publishing to a named queue in the default exchange:
 pipeline
   .apply(...) // provide PCollection<RabbitMqMessage>
   .apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE"));