Class RabbitMqIO
Documentation in this module tends to reference interacting with a "queue" vs interacting with an "exchange". AMQP doesn't technically work this way. For readers, notes on reading from/writing to a "queue" implies the "default exchange" is in use, operating as a "direct exchange". Notes on interacting with an "exchange" are more flexible and are generally more applicable.
Consuming messages from RabbitMQ server
RabbitMqIO
RabbitMqIO.Read
returns an unbounded PCollection
containing RabbitMQ
messages body (as byte[]
) wrapped as RabbitMqMessage
.
To configure a RabbitMQ source, you have to provide a RabbitMQ URI
to connect to a
RabbitMQ broker. The following example illustrates various options for configuring the source,
reading from a named queue on the default exchange:
PCollection<RabbitMqMessage> messages = pipeline.apply(
RabbitMqIO.read().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE"))
Often one will want to read from an exchange. The exchange can be declared by Beam or can be
pre-existing. The supplied routingKey
has variable functionality depending on the
exchange type. As examples:
// reading from an fanout (pubsub) exchange, declared (non-durable) by RabbitMqIO.
// Note the routingKey is 'null' as a fanout exchange publishes all messages to
// all queues, and the specified binding will be ignored
PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
.withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", null));
// reading from an existing topic exchange named 'EVENTS'
// this will use a dynamically-created, non-durable queue subscribing to all
// messages with a routing key beginning with 'users.'
PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
.withUri("amqp://user:password@localhost:5672").withExchange("EVENTS", "users.#"));
Publishing messages to RabbitMQ server
RabbitMqIO
RabbitMqIO.Write
can send RabbitMqMessage
to a RabbitMQ server queue
or exchange.
As for the RabbitMqIO.Read
, the RabbitMqIO.Write
is configured with a RabbitMQ URI.
Examples
// Publishing to a named, non-durable exchange, declared by Beam:
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout"));
// Publishing to an existing exchange
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE"));
// Publishing to a named queue in the default exchange:
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE"));
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
APTransform
to consume messages from RabbitMQ server.static class
APTransform
to publish messages to a RabbitMQ server. -
Method Summary
-
Method Details
-
read
-
write
-