Class AmqpIO

java.lang.Object
org.apache.beam.sdk.io.amqp.AmqpIO

public class AmqpIO extends Object
AmqpIO supports AMQP 1.0 protocol using the Apache QPid Proton-J library.

It's also possible to use AMQP 1.0 protocol via Apache Qpid JMS connection factory and the Apache Beam JmsIO.

Binding AMQP and receive messages

The AmqpIO AmqpIO.Read can bind a AMQP listener endpoint and receive messages. It can also connect to a AMPQ broker (such as Apache Qpid or Apache ActiveMQ).

AmqpIO AmqpIO.Read returns an unbounded PCollection of Message containing the received messages.

To configure a AMQP source, you have to provide a list of addresses where it will receive messages. An address has the following form: [amqp[s]://][user[:password]@]domain[/[name]] where domain can be one of host | host:port | ip | ip:port | name. NB: the ~ character allows to bind a AMQP listener instead of connecting to a remote broker. For instance amqp://~0.0.0.0:1234 will bind a AMQP listener on any network interface on the 1234 port number.

The following example illustrates how to configure a AMQP source:


 pipeline.apply(AmqpIO.read()
   .withAddresses(Collections.singletonList("amqp://host:1234")))

 

Sending messages to a AMQP endpoint

AmqpIO provides a sink to send PCollection elements as messages.

As for the AmqpIO.Read, AmqpIO AmqpIO.Write requires a list of addresses where to send messages. The following example illustrates how to configure the AmqpIO AmqpIO.Write:


 pipeline
   .apply(...) // provide PCollection<Message>
   .apply(AmqpIO.write());