Class SolaceIO.Write<T>

java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PCollection<T>,SolaceOutput>
org.apache.beam.sdk.io.solace.SolaceIO.Write<T>
All Implemented Interfaces:
Serializable, HasDisplayData
Enclosing class:
SolaceIO

public abstract static class SolaceIO.Write<T> extends PTransform<PCollection<T>,SolaceOutput>
See Also:
  • Field Details

  • Constructor Details

    • Write

      public Write()
  • Method Details

    • to

      public SolaceIO.Write<T> to(Solace.Topic topic)
      Write to a Solace topic.

      The topic does not need to exist before launching the pipeline.

      This will write all records to the same topic, ignoring their destination field.

      Optional. If not specified, the connector will use dynamic destinations based on the destination field of Solace.Record.

    • to

      public SolaceIO.Write<T> to(Solace.Queue queue)
      Write to a Solace queue.

      The queue must exist prior to launching the pipeline.

      This will write all records to the same queue, ignoring their destination field.

      Optional. If not specified, the connector will use dynamic destinations based on the destination field of Solace.Record.

    • withNumShards

      public SolaceIO.Write<T> withNumShards(int numShards)
      The number of workers used by the job to write to Solace.

      This is optional, the default value is 20.

      This is the maximum value that the job would use, but depending on the amount of data, the actual number of writers may be lower than this value. With the Dataflow runner, the connector will as maximum this number of VMs in the job (but the job itself may use more VMs).

      Set this number taking into account the limits in the number of clients in your Solace cluster, and the need for performance when writing to Solace (more workers will achieve higher throughput).

    • withNumberOfClientsPerWorker

      public SolaceIO.Write<T> withNumberOfClientsPerWorker(int numberOfClientsPerWorker)
      The number of clients that each worker will create.

      This is optional, the default number is 4.

      The number of clients is per worker. If there are more than one worker, the number of clients will be multiplied by the number of workers. With the Dataflow runner, this will be the number of clients created per VM. The clients will be re-used across different threads in the same worker.

      Set this number in combination with withNumShards(int), to ensure that the limit for number of clients in your Solace cluster is not exceeded.

      Normally, using a higher number of clients with fewer workers will achieve better throughput at a lower cost, since the workers are better utilized. A good rule of thumb to use is setting as many clients per worker as vCPUs the worker has.

    • withDeliveryMode

      public SolaceIO.Write<T> withDeliveryMode(DeliveryMode deliveryMode)
      Set the delivery mode. This is optional, the default value is DIRECT.

      For more details, see https://docs.solace.com/API/API-Developer-Guide/Message-Delivery-Modes.htm

    • publishLatencyMetrics

      public SolaceIO.Write<T> publishLatencyMetrics()
      Publish latency metrics using Beam metrics.

      Latency metrics are only available if withDeliveryMode(DeliveryMode) is set to PERSISTENT. In that mode, latency is measured for each single message, as the time difference between the message creation and the reception of the publishing confirmation.

      For the batched writer, the creation time is set for every message in a batch shortly before the batch is submitted. So the latency is very close to the actual publishing latency, and it does not take into account the time spent waiting for the batch to be submitted.

      This is optional, the default value is false (don't publish latency metrics).

    • withSubmissionMode

      public SolaceIO.Write<T> withSubmissionMode(SolaceIO.SubmissionMode submissionMode)
      This setting controls the JCSMP property MESSAGE_CALLBACK_ON_REACTOR. Optional.

      For full details, please check https://docs.solace.com/API/API-Developer-Guide/Java-API-Best-Practices.htm.

      The Solace JCSMP client libraries can dispatch messages using three different modes:

      One of the modes dispatches messages directly from the same thread that is doing the rest of I/O work. This mode favors lower latency but lower throughput. Set this to LOWER_LATENCY to use that mode (MESSAGE_CALLBACK_ON_REACTOR set to True).

      Another mode uses a parallel thread to accumulate and dispatch messages. This mode favors higher throughput but also has higher latency. Set this to HIGHER_THROUGHPUT to use that mode. This is the default mode (MESSAGE_CALLBACK_ON_REACTOR set to False).

      If you prefer to have full control over all the JCSMP properties, set this to CUSTOM, and override the classes SessionServiceFactory and SessionService to have full control on how to create the JCSMP sessions and producers used by the connector.

      This is optional, the default value is HIGHER_THROUGHPUT.

    • withWriterType

      public SolaceIO.Write<T> withWriterType(SolaceIO.WriterType writerType)
      Set the type of writer used by the connector. Optional.

      The Solace writer can either use the JCSMP modes in streaming or batched.

      In streaming mode, the publishing latency will be lower, but the throughput will also be lower.

      With the batched mode, messages are accumulated until a batch size of 50 is reached, or UnboundedBatchedSolaceWriter.ACKS_FLUSHING_INTERVAL_SECS seconds have elapsed since the first message in the batch was received. The 50 messages are sent to Solace in a single batch. This writer offers higher throughput but higher publishing latency, as messages can be held up for up to UnboundedBatchedSolaceWriter.ACKS_FLUSHING_INTERVAL_SECS5seconds until they are published.

      Notice that this is the message publishing latency, not the end-to-end latency. For very large scale pipelines, you will probably prefer to use the HIGHER_THROUGHPUT mode, as with lower throughput messages will accumulate in the pipeline, and the end-to-end latency may actually be higher.

      This is optional, the default is the BATCHED writer.

    • withSessionServiceFactory

      public SolaceIO.Write<T> withSessionServiceFactory(SessionServiceFactory factory)
      Set the provider used to obtain the properties to initialize a new session in the broker.

      This provider should define the destination host where the broker is listening, and all the properties related to authentication (base auth, client certificate, etc.).

    • withErrorHandler

      public SolaceIO.Write<T> withErrorHandler(ErrorHandler<BadRecord,?> errorHandler)
      An optional error handler for handling records that failed to publish to Solace.

      If provided, this error handler will be invoked for each record that could not be successfully published. The error handler can implement custom logic for dealing with failed records, such as writing them to a dead-letter queue or logging them.

      If no error handler is provided, failed records will be ignored.

    • expand

      public SolaceOutput expand(PCollection<T> input)
      Description copied from class: PTransform
      Override this method to specify how this PTransform should be expanded on the given InputT.

      NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

      Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

      Specified by:
      expand in class PTransform<PCollection<T>,SolaceOutput>