Class AmqpIO
It's also possible to use AMQP 1.0 protocol via Apache Qpid JMS connection factory and the Apache Beam JmsIO.
Binding AMQP and receive messages
The AmqpIO
AmqpIO.Read
can bind a AMQP listener endpoint and receive messages. It can
also connect to a AMPQ broker (such as Apache Qpid or Apache ActiveMQ).
AmqpIO
AmqpIO.Read
returns an unbounded PCollection
of Message
containing the received messages.
To configure a AMQP source, you have to provide a list of addresses where it will receive
messages. An address has the following form:
[amqp[s]://][user[:password]@]domain[/[name]]
where domain
can be one of
host | host:port | ip | ip:port | name
. NB: the ~
character allows to bind a AMQP
listener instead of connecting to a remote broker. For instance amqp://~0.0.0.0:1234
will
bind a AMQP listener on any network interface on the 1234 port number.
The following example illustrates how to configure a AMQP source:
pipeline.apply(AmqpIO.read()
.withAddresses(Collections.singletonList("amqp://host:1234")))
Sending messages to a AMQP endpoint
AmqpIO
provides a sink to send PCollection
elements as messages.
As for the AmqpIO.Read
, AmqpIO
AmqpIO.Write
requires a list of addresses where to
send messages. The following example illustrates how to configure the AmqpIO
AmqpIO.Write
:
pipeline
.apply(...) // provide PCollection<Message>
.apply(AmqpIO.write());
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
APTransform
to read/receive messages using AMQP 1.0 protocol.static class
APTransform
to send messages using AMQP 1.0 protocol. -
Method Summary
-
Method Details
-
read
-
write
-