Class RabbitMqIO.Read
- All Implemented Interfaces:
Serializable
,HasDisplayData
- Enclosing class:
RabbitMqIO
PTransform
to consume messages from RabbitMQ server.- See Also:
-
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionOverride this method to specify how thisPTransform
should be expanded on the givenInputT
.withExchange
(String name, @Nullable String routingKey) In AMQP, messages are published to an exchange and routed to queues based on the exchange type and a queue binding.withExchange
(String name, String type, @Nullable String routingKey) In AMQP, messages are published to an exchange and routed to queues based on the exchange type and a queue binding.withMaxNumRecords
(long maxNumRecords) Define the max number of records received by theRabbitMqIO.Read
.withMaxReadTime
(Duration maxReadTime) Define the max read time (duration) while theRabbitMqIO.Read
will receive messages.If you want to directly consume messages from a specific queue, you just have to specify the queue name.withQueueDeclare
(boolean queueDeclare) You can "force" the declaration of a queue on the RabbitMQ broker.withUseCorrelationId
(boolean useCorrelationId) Toggles deduplication of messages based on the amqp correlation-id property on incoming messages.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate, validate
-
Constructor Details
-
Read
public Read()
-
-
Method Details
-
withUri
-
withQueue
If you want to directly consume messages from a specific queue, you just have to specify the queue name. Optionally, you can declare the queue usingwithQueueDeclare(boolean)
. -
withQueueDeclare
You can "force" the declaration of a queue on the RabbitMQ broker. Exchanges and queues are the high-level building blocks of AMQP. These must be "declared" (created) before they can be used. Declaring either type of object ensures that one of that name and of the specified properties exists, creating it if necessary.NOTE: When declaring a queue or exchange that already exists, the properties specified in the declaration must match those of the existing queue or exchange. That is, if you declare a queue to be non-durable but a durable queue already exists with the same name, the declaration will fail. When declaring a queue, RabbitMqIO will declare it to be non-durable.
- Parameters:
queueDeclare
- Iftrue
,RabbitMqIO
will declare a non-durable queue. If another application created the queue, this is not required and should be set tofalse
-
withExchange
In AMQP, messages are published to an exchange and routed to queues based on the exchange type and a queue binding. Most exchange types utilize the routingKey to determine which queues to deliver messages to. It is incumbent upon the developer to understand the paradigm in place to determine whether to declare a queue, what the appropriate binding should be, and what routingKey will be in use.This function should be used if the Beam pipeline will be responsible for declaring the exchange. As a result of calling this function,
exchangeDeclare
will be set totrue
and the resulting exchange will be non-durable and of the supplied type. If an exchange with the given name already exists but is durable or is of another type, exchange declaration will fail.To use an exchange without declaring it, especially for cases when the exchange is shared with other applications or already exists, use
withExchange(String, String)
instead. -
withExchange
In AMQP, messages are published to an exchange and routed to queues based on the exchange type and a queue binding. Most exchange types utilize the routingKey to determine which queues to deliver messages to. It is incumbent upon the developer to understand the paradigm in place to determine whether to declare a queue, with the appropriate binding should be, and what routingKey will be in use.This function should be used if the Beam pipeline will be using an exchange that has already been declared or when using an exchange shared by other applications, such as an events bus or pubsub. As a result of calling this function,
exchangeDeclare
will be set tofalse
. -
withMaxNumRecords
Define the max number of records received by theRabbitMqIO.Read
. When this max number of records is lower thanLong.MAX_VALUE
, theRabbitMqIO.Read
will provide a boundedPCollection
. -
withMaxReadTime
Define the max read time (duration) while theRabbitMqIO.Read
will receive messages. When this max read time is not null, theRabbitMqIO.Read
will provide a boundedPCollection
. -
withUseCorrelationId
Toggles deduplication of messages based on the amqp correlation-id property on incoming messages.When set to
true
all read messages will require the amqp correlation-id property to be set.When set to
false
the correlation-id property will not be used by the Reader and no automatic deduplication will occur. -
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PBegin,
PCollection<RabbitMqMessage>>
-