Class RabbitMqIO
Documentation in this module tends to reference interacting with a "queue" vs interacting with an "exchange". AMQP doesn't technically work this way. For readers, notes on reading from/writing to a "queue" implies the "default exchange" is in use, operating as a "direct exchange". Notes on interacting with an "exchange" are more flexible and are generally more applicable.
Consuming messages from RabbitMQ server
RabbitMqIO RabbitMqIO.Read returns an unbounded PCollection containing RabbitMQ
messages body (as byte[]) wrapped as RabbitMqMessage.
To configure a RabbitMQ source, you have to provide a RabbitMQ URI to connect to a
RabbitMQ broker. The following example illustrates various options for configuring the source,
reading from a named queue on the default exchange:
PCollection<RabbitMqMessage> messages = pipeline.apply(
RabbitMqIO.read().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE"))
Often one will want to read from an exchange. The exchange can be declared by Beam or can be
pre-existing. The supplied routingKey has variable functionality depending on the
exchange type. As examples:
// reading from an fanout (pubsub) exchange, declared (non-durable) by RabbitMqIO.
// Note the routingKey is 'null' as a fanout exchange publishes all messages to
// all queues, and the specified binding will be ignored
PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
.withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", null));
// reading from an existing topic exchange named 'EVENTS'
// this will use a dynamically-created, non-durable queue subscribing to all
// messages with a routing key beginning with 'users.'
PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
.withUri("amqp://user:password@localhost:5672").withExchange("EVENTS", "users.#"));
Publishing messages to RabbitMQ server
RabbitMqIO RabbitMqIO.Write can send RabbitMqMessage to a RabbitMQ server queue
or exchange.
As for the RabbitMqIO.Read, the RabbitMqIO.Write is configured with a RabbitMQ URI.
Examples
// Publishing to a named, non-durable exchange, declared by Beam:
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout"));
// Publishing to an existing exchange
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE"));
// Publishing to a named queue in the default exchange:
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE"));
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classAPTransformto consume messages from RabbitMQ server.static classAPTransformto publish messages to a RabbitMQ server. -
Method Summary
-
Method Details
-
read
-
write
-