public abstract static class SolaceIO.Write<T> extends PTransform<PCollection<T>,SolaceOutput>
Modifier and Type | Field and Description |
---|---|
static TupleTag<BadRecord> |
FAILED_PUBLISH_TAG |
static TupleTag<Solace.PublishResult> |
SUCCESSFUL_PUBLISH_TAG |
annotations, displayData, name, resourceHints
Constructor and Description |
---|
Write() |
Modifier and Type | Method and Description |
---|---|
SolaceOutput |
expand(PCollection<T> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
SolaceIO.Write<T> |
publishLatencyMetrics()
Publish latency metrics using Beam metrics.
|
SolaceIO.Write<T> |
to(Solace.Queue queue)
Write to a Solace queue.
|
SolaceIO.Write<T> |
to(Solace.Topic topic)
Write to a Solace topic.
|
SolaceIO.Write<T> |
withDeliveryMode(DeliveryMode deliveryMode)
Set the delivery mode.
|
SolaceIO.Write<T> |
withErrorHandler(ErrorHandler<BadRecord,?> errorHandler)
An optional error handler for handling records that failed to publish to Solace.
|
SolaceIO.Write<T> |
withNumberOfClientsPerWorker(int numberOfClientsPerWorker)
The number of clients that each worker will create.
|
SolaceIO.Write<T> |
withNumShards(int numShards)
The number of workers used by the job to write to Solace.
|
SolaceIO.Write<T> |
withSessionServiceFactory(SessionServiceFactory factory)
Set the provider used to obtain the properties to initialize a new session in the broker.
|
SolaceIO.Write<T> |
withSubmissionMode(SolaceIO.SubmissionMode submissionMode)
This setting controls the JCSMP property MESSAGE_CALLBACK_ON_REACTOR.
|
SolaceIO.Write<T> |
withWriterType(SolaceIO.WriterType writerType)
Set the type of writer used by the connector.
|
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate, validate
public static final TupleTag<Solace.PublishResult> SUCCESSFUL_PUBLISH_TAG
public SolaceIO.Write<T> to(Solace.Topic topic)
The topic does not need to exist before launching the pipeline.
This will write all records to the same topic, ignoring their destination field.
Optional. If not specified, the connector will use dynamic destinations based on the
destination field of Solace.Record
.
public SolaceIO.Write<T> to(Solace.Queue queue)
The queue must exist prior to launching the pipeline.
This will write all records to the same queue, ignoring their destination field.
Optional. If not specified, the connector will use dynamic destinations based on the
destination field of Solace.Record
.
public SolaceIO.Write<T> withNumShards(int numShards)
This is optional, the default value is 20.
This is the maximum value that the job would use, but depending on the amount of data, the actual number of writers may be lower than this value. With the Dataflow runner, the connector will as maximum this number of VMs in the job (but the job itself may use more VMs).
Set this number taking into account the limits in the number of clients in your Solace cluster, and the need for performance when writing to Solace (more workers will achieve higher throughput).
public SolaceIO.Write<T> withNumberOfClientsPerWorker(int numberOfClientsPerWorker)
This is optional, the default number is 4.
The number of clients is per worker. If there are more than one worker, the number of clients will be multiplied by the number of workers. With the Dataflow runner, this will be the number of clients created per VM. The clients will be re-used across different threads in the same worker.
Set this number in combination with withNumShards(int)
, to ensure that the limit for
number of clients in your Solace cluster is not exceeded.
Normally, using a higher number of clients with fewer workers will achieve better throughput at a lower cost, since the workers are better utilized. A good rule of thumb to use is setting as many clients per worker as vCPUs the worker has.
public SolaceIO.Write<T> withDeliveryMode(DeliveryMode deliveryMode)
For more details, see https://docs.solace.com/API/API-Developer-Guide/Message-Delivery-Modes.htm
public SolaceIO.Write<T> publishLatencyMetrics()
Latency metrics are only available if withDeliveryMode(DeliveryMode)
is set to
PERSISTENT. In that mode, latency is measured for each single message, as the time difference
between the message creation and the reception of the publishing confirmation.
For the batched writer, the creation time is set for every message in a batch shortly before the batch is submitted. So the latency is very close to the actual publishing latency, and it does not take into account the time spent waiting for the batch to be submitted.
This is optional, the default value is false (don't publish latency metrics).
public SolaceIO.Write<T> withSubmissionMode(SolaceIO.SubmissionMode submissionMode)
For full details, please check https://docs.solace.com/API/API-Developer-Guide/Java-API-Best-Practices.htm.
The Solace JCSMP client libraries can dispatch messages using three different modes:
One of the modes dispatches messages directly from the same thread that is doing the rest of I/O work. This mode favors lower latency but lower throughput. Set this to LOWER_LATENCY to use that mode (MESSAGE_CALLBACK_ON_REACTOR set to True).
Another mode uses a parallel thread to accumulate and dispatch messages. This mode favors higher throughput but also has higher latency. Set this to HIGHER_THROUGHPUT to use that mode. This is the default mode (MESSAGE_CALLBACK_ON_REACTOR set to False).
If you prefer to have full control over all the JCSMP properties, set this to CUSTOM, and
override the classes SessionServiceFactory
and SessionService
to have full
control on how to create the JCSMP sessions and producers used by the connector.
This is optional, the default value is HIGHER_THROUGHPUT.
public SolaceIO.Write<T> withWriterType(SolaceIO.WriterType writerType)
The Solace writer can either use the JCSMP modes in streaming or batched.
In streaming mode, the publishing latency will be lower, but the throughput will also be lower.
With the batched mode, messages are accumulated until a batch size of 50 is reached, or
UnboundedBatchedSolaceWriter.ACKS_FLUSHING_INTERVAL_SECS
seconds have elapsed since
the first message in the batch was received. The 50 messages are sent to Solace in a single
batch. This writer offers higher throughput but higher publishing latency, as messages can be
held up for up to UnboundedBatchedSolaceWriter.ACKS_FLUSHING_INTERVAL_SECS
5seconds
until they are published.
Notice that this is the message publishing latency, not the end-to-end latency. For very large scale pipelines, you will probably prefer to use the HIGHER_THROUGHPUT mode, as with lower throughput messages will accumulate in the pipeline, and the end-to-end latency may actually be higher.
This is optional, the default is the BATCHED writer.
public SolaceIO.Write<T> withSessionServiceFactory(SessionServiceFactory factory)
This provider should define the destination host where the broker is listening, and all the properties related to authentication (base auth, client certificate, etc.).
public SolaceIO.Write<T> withErrorHandler(ErrorHandler<BadRecord,?> errorHandler)
If provided, this error handler will be invoked for each record that could not be successfully published. The error handler can implement custom logic for dealing with failed records, such as writing them to a dead-letter queue or logging them.
If no error handler is provided, failed records will be ignored.
public SolaceOutput expand(PCollection<T> input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<T>,SolaceOutput>