Class SolaceIO.Read<T>
- All Implemented Interfaces:
Serializable
,HasDisplayData
- Enclosing class:
SolaceIO
- See Also:
-
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Method Summary
Modifier and TypeMethodDescriptionOverride this method to specify how thisPTransform
should be expanded on the givenInputT
.from
(Solace.Queue queue) Set the queue name to read from.from
(Solace.Topic topic) Set the topic name to read from.void
validate
(@Nullable PipelineOptions options) Called before running the Pipeline to verify this transform is fully and correctly specified.withDeduplicateRecords
(boolean deduplicateRecords) Optional, default: false.withMaxNumConnections
(Integer maxNumConnections) Optional.withSempClientFactory
(SempClientFactory sempClientFactory) Set a factory that creates aSempClientFactory
.withSessionServiceFactory
(SessionServiceFactory sessionServiceFactory) Set a factory that creates aSessionService
.withTimestampFn
(SerializableFunction<T, Instant> timestampFn) The timestamp function, used for estimating the watermark, mapping the record T to anInstant
withWatermarkIdleDurationThreshold
(Duration idleDurationThreshold) Optional.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate
-
Method Details
-
from
Set the queue name to read from. Use this or the `from(Topic)` method. -
from
Set the topic name to read from. Use this or the `from(Queue)` method. -
withTimestampFn
The timestamp function, used for estimating the watermark, mapping the record T to anInstant
Optional when using the no-arg
SolaceIO.read()
method. Defaults toSolaceIO.SENDER_TIMESTAMP_FUNCTION
. When using theSolaceIO.read(TypeDescriptor, SerializableFunction, SerializableFunction)
method, the function mapping from T toInstant
has to be passed as an argument. -
withMaxNumConnections
Optional. Sets the maximum number of connections to the broker. The actual number of sessions is determined by this and the number set by the runner. If not set, the number of sessions is determined by the runner. The number of connections created follows this logic: `numberOfConnections = min(maxNumConnections, desiredNumberOfSplits)`, where the `desiredNumberOfSplits` is set by the runner. -
withWatermarkIdleDurationThreshold
Optional. Denotes the duration for which the watermark can be idle. If there are no incoming messages for this ‘idle’ period of time, the watermark is set to a timestamp representing a time earlier than now by the ‘idle’ period of time (e.g. if the ‘idle’ period of time is set to 30 seconds, and there is no new data incoming for 30 seconds, the watermark will be set to max(currentWatermark, now() - 30 seconds). The default watermark idle duration threshold isSolaceIO.DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD
. -
withDeduplicateRecords
Optional, default: false. Set to deduplicate messages based on theXMLMessage.getApplicationMessageId()
of the incomingBytesXMLMessage
. If the field is null, then theXMLMessage.getReplicationGroupMessageId()
will be used, which is always set by Solace. -
withSempClientFactory
Set a factory that creates aSempClientFactory
.The factory `create()` method is invoked in each instance of an
org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader<T>
. CreatedSempClient
has to communicate with broker management API. It must support operations such as:- query for outstanding backlog bytes in a Queue,
- query for metadata such as access-type of a Queue,
- requesting creation of new Queues.
An existing implementation of the SempClientFactory includes
BasicAuthSempClientFactory
which implements connection to the SEMP with the Basic Authentication method.To use it, specify the credentials with the builder methods.
The format of the host is `[Protocol://]Host[:Port]`
.withSempClientFactory( BasicAuthSempClientFactory.builder() .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080" .username("username") .password("password") .vpnName("vpn-name") .build())
-
withSessionServiceFactory
Set a factory that creates aSessionService
.The factory `create()` method is invoked in each instance of an
org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader<T>
. CreatedSessionService
has to be able to:- initialize a connection with the broker,
- check liveliness of the connection,
- close the connection,
- create a
MessageReceiver
.
The
BasicAuthJcsmpSessionServiceFactory
is an existing implementation of theSessionServiceFactory
which implements the Basic Authentication to Solace.To use it, specify the credentials with the builder methods. *
The host is the IPv4 or IPv6 or host name of the appliance. IPv6 addresses must be encoded in brackets ([]). For example, "12.34.56.78", or "[fe80::1]". If connecting to a non-default port, it can be specified here using the "Host:Port" format. For example, "12.34.56.78:4444", or "[fe80::1]:4444".
BasicAuthJcsmpSessionServiceFactory.builder() .host("your-host-name") // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444" .username("semp-username") .password("semp-password") .vpnName("vpn-name") .build()));
-
validate
Description copied from class:PTransform
Called before running the Pipeline to verify this transform is fully and correctly specified.By default, does nothing.
- Overrides:
validate
in classPTransform<PBegin,
PCollection<T>>
-
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PBegin,
PCollection<T>>
-