Class SolaceIO.Read<T>

java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PBegin,PCollection<T>>
org.apache.beam.sdk.io.solace.SolaceIO.Read<T>
All Implemented Interfaces:
Serializable, HasDisplayData
Enclosing class:
SolaceIO

public static class SolaceIO.Read<T> extends PTransform<PBegin,PCollection<T>>
See Also:
  • Method Details

    • from

      public SolaceIO.Read<T> from(Solace.Queue queue)
      Set the queue name to read from. Use this or the `from(Topic)` method.
    • from

      public SolaceIO.Read<T> from(Solace.Topic topic)
      Set the topic name to read from. Use this or the `from(Queue)` method.
    • withTimestampFn

      public SolaceIO.Read<T> withTimestampFn(SerializableFunction<T,Instant> timestampFn)
      The timestamp function, used for estimating the watermark, mapping the record T to an Instant

      Optional when using the no-arg SolaceIO.read() method. Defaults to SolaceIO.SENDER_TIMESTAMP_FUNCTION. When using the SolaceIO.read(TypeDescriptor, SerializableFunction, SerializableFunction) method, the function mapping from T to Instant has to be passed as an argument.

    • withMaxNumConnections

      public SolaceIO.Read<T> withMaxNumConnections(Integer maxNumConnections)
      Optional. Sets the maximum number of connections to the broker. The actual number of sessions is determined by this and the number set by the runner. If not set, the number of sessions is determined by the runner. The number of connections created follows this logic: `numberOfConnections = min(maxNumConnections, desiredNumberOfSplits)`, where the `desiredNumberOfSplits` is set by the runner.
    • withWatermarkIdleDurationThreshold

      public SolaceIO.Read<T> withWatermarkIdleDurationThreshold(Duration idleDurationThreshold)
      Optional. Denotes the duration for which the watermark can be idle. If there are no incoming messages for this ‘idle’ period of time, the watermark is set to a timestamp representing a time earlier than now by the ‘idle’ period of time (e.g. if the ‘idle’ period of time is set to 30 seconds, and there is no new data incoming for 30 seconds, the watermark will be set to max(currentWatermark, now() - 30 seconds). The default watermark idle duration threshold is SolaceIO.DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD.
    • withDeduplicateRecords

      public SolaceIO.Read<T> withDeduplicateRecords(boolean deduplicateRecords)
      Optional, default: false. Set to deduplicate messages based on the XMLMessage.getApplicationMessageId() of the incoming BytesXMLMessage. If the field is null, then the XMLMessage.getReplicationGroupMessageId() will be used, which is always set by Solace.
    • withSempClientFactory

      public SolaceIO.Read<T> withSempClientFactory(SempClientFactory sempClientFactory)
      Set a factory that creates a SempClientFactory.

      The factory `create()` method is invoked in each instance of an org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader<T>. Created SempClient has to communicate with broker management API. It must support operations such as:

      • query for outstanding backlog bytes in a Queue,
      • query for metadata such as access-type of a Queue,
      • requesting creation of new Queues.

      An existing implementation of the SempClientFactory includes BasicAuthSempClientFactory which implements connection to the SEMP with the Basic Authentication method.

      To use it, specify the credentials with the builder methods.

      The format of the host is `[Protocol://]Host[:Port]`

      
       .withSempClientFactory(
               BasicAuthSempClientFactory.builder()
                     .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
                     .username("username")
                     .password("password")
                     .vpnName("vpn-name")
                     .build())
       
    • withSessionServiceFactory

      public SolaceIO.Read<T> withSessionServiceFactory(SessionServiceFactory sessionServiceFactory)
      Set a factory that creates a SessionService.

      The factory `create()` method is invoked in each instance of an org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader<T>. Created SessionService has to be able to:

      • initialize a connection with the broker,
      • check liveliness of the connection,
      • close the connection,
      • create a MessageReceiver.

      The BasicAuthJcsmpSessionServiceFactory is an existing implementation of the SessionServiceFactory which implements the Basic Authentication to Solace.

      To use it, specify the credentials with the builder methods. *

      The host is the IPv4 or IPv6 or host name of the appliance. IPv6 addresses must be encoded in brackets ([]). For example, "12.34.56.78", or "[fe80::1]". If connecting to a non-default port, it can be specified here using the "Host:Port" format. For example, "12.34.56.78:4444", or "[fe80::1]:4444".

      
       BasicAuthJcsmpSessionServiceFactory.builder()
           .host("your-host-name")
                 // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
           .username("semp-username")
           .password("semp-password")
           .vpnName("vpn-name")
           .build()));
       
    • validate

      public void validate(@Nullable PipelineOptions options)
      Description copied from class: PTransform
      Called before running the Pipeline to verify this transform is fully and correctly specified.

      By default, does nothing.

      Overrides:
      validate in class PTransform<PBegin,PCollection<T>>
    • expand

      public PCollection<T> expand(PBegin input)
      Description copied from class: PTransform
      Override this method to specify how this PTransform should be expanded on the given InputT.

      NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

      Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

      Specified by:
      expand in class PTransform<PBegin,PCollection<T>>