@Experimental(value=SOURCE_SINK) public class RabbitMqIO extends java.lang.Object
Documentation in this module tends to reference interacting with a "queue" vs interacting with an "exchange". AMQP doesn't technically work this way. For readers, notes on reading from/writing to a "queue" implies the "default exchange" is in use, operating as a "direct exchange". Notes on interacting with an "exchange" are more flexible and are generally more applicable.
RabbitMqIO
RabbitMqIO.Read
returns an unbounded PCollection
containing RabbitMQ
messages body (as byte[]
) wrapped as RabbitMqMessage
.
To configure a RabbitMQ source, you have to provide a RabbitMQ URI
to connect to a
RabbitMQ broker. The following example illustrates various options for configuring the source,
reading from a named queue on the default exchange:
PCollection<RabbitMqMessage> messages = pipeline.apply(
RabbitMqIO.read().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE"))
Often one will want to read from an exchange. The exchange can be declared by Beam or can be
pre-existing. The supplied routingKey
has variable functionality depending on the
exchange type. As examples:
// reading from an fanout (pubsub) exchange, declared (non-durable) by RabbitMqIO.
// Note the routingKey is 'null' as a fanout exchange publishes all messages to
// all queues, and the specified binding will be ignored
PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
.withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", null));
// reading from an existing topic exchange named 'EVENTS'
// this will use a dynamically-created, non-durable queue subscribing to all
// messages with a routing key beginning with 'users.'
PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
.withUri("amqp://user:password@localhost:5672").withExchange("EVENTS", "users.#"));
RabbitMqIO
RabbitMqIO.Write
can send RabbitMqMessage
to a RabbitMQ server queue
or exchange.
As for the RabbitMqIO.Read
, the RabbitMqIO.Write
is configured with a RabbitMQ URI.
Examples
// Publishing to a named, non-durable exchange, declared by Beam:
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout"));
// Publishing to an existing exchange
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE"));
// Publishing to a named queue in the default exchange:
pipeline
.apply(...) // provide PCollection<RabbitMqMessage>
.apply(RabbitMqIO.write().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE"));
Modifier and Type | Class and Description |
---|---|
static class |
RabbitMqIO.Read
A
PTransform to consume messages from RabbitMQ server. |
static class |
RabbitMqIO.Write
A
PTransform to publish messages to a RabbitMQ server. |
Modifier and Type | Method and Description |
---|---|
static RabbitMqIO.Read |
read() |
static RabbitMqIO.Write |
write() |
public static RabbitMqIO.Read read()
public static RabbitMqIO.Write write()