Class AmqpIO
It's also possible to use AMQP 1.0 protocol via Apache Qpid JMS connection factory and the Apache Beam JmsIO.
Binding AMQP and receive messages
The AmqpIO AmqpIO.Read can bind a AMQP listener endpoint and receive messages. It can
also connect to a AMPQ broker (such as Apache Qpid or Apache ActiveMQ).
AmqpIO AmqpIO.Read returns an unbounded PCollection of Message
containing the received messages.
To configure a AMQP source, you have to provide a list of addresses where it will receive
messages. An address has the following form:
[amqp[s]://][user[:password]@]domain[/[name]] where domain can be one of
host | host:port | ip | ip:port | name. NB: the ~ character allows to bind a AMQP
listener instead of connecting to a remote broker. For instance amqp://~0.0.0.0:1234 will
bind a AMQP listener on any network interface on the 1234 port number.
The following example illustrates how to configure a AMQP source:
pipeline.apply(AmqpIO.read()
.withAddresses(Collections.singletonList("amqp://host:1234")))
Sending messages to a AMQP endpoint
AmqpIO provides a sink to send PCollection elements as messages.
As for the AmqpIO.Read, AmqpIO AmqpIO.Write requires a list of addresses where to
send messages. The following example illustrates how to configure the AmqpIO AmqpIO.Write:
pipeline
.apply(...) // provide PCollection<Message>
.apply(AmqpIO.write());
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classAPTransformto read/receive messages using AMQP 1.0 protocol.static classAPTransformto send messages using AMQP 1.0 protocol. -
Method Summary
-
Method Details
-
read
-
write
-