Class SolaceIO
PTransform to read and write from/to Solace event
 broker.
 Note: Internal use only; this API is beta and subject to change.
Reading from Solace
To read from Solace, use theread() or read(TypeDescriptor, SerializableFunction, SerializableFunction).
 No-argument read() top-level method
 This method returns a PCollection of Solace.Record objects. It uses a default mapper
 (Solace.SolaceRecordMapper.map(BytesXMLMessage)) to map from the received BytesXMLMessage from Solace, to the Solace.Record objects.
 
By default, it also uses a XMLMessage.getSenderTimestamp() for watermark
 estimation. This SerializableFunction can be overridden with SolaceIO.Read.withTimestampFn(SerializableFunction) method.
 
When using this method, the Coders are inferred automatically.
Advanced read(TypeDescriptor, SerializableFunction, SerializableFunction)
 top-level method
 With this method, the user can:
- specify a custom output type for the PTransform (for example their own class consisting only of the relevant fields, optimized for their use-case), and
 - create a custom mapping between 
BytesXMLMessageand their output type and - specify what field to use for watermark estimation from their mapped field (for example, in
       this method the user can use a field which is encoded in the payload as a timestamp, which
       cannot be done with the 
read()method. 
Reading from a queue (SolaceIO.Read.from(Solace.Queue)} or a topic (SolaceIO.Read.from(Solace.Topic))
 Regardless of the top-level read method choice, the user can specify whether to read from a
 Queue - SolaceIO.Read.from(Solace.Queue), or a Topic SolaceIO.Read.from(Solace.Topic).
 
Note: when a user specifies to read from a Topic, the connector will create a matching Queue and a Subscription. The user must ensure that the SEMP API is reachable from the driver program and must provide credentials that have `write` permission to the SEMP Config API. The created Queue will be non-exclusive. The Queue will not be deleted when the pipeline is terminated.
Note: If the user specifies to read from a Queue, the driver program
 will execute a call to the SEMP API to check if the Queue is `exclusive` or `non-exclusive`. The
 user must ensure that the SEMP API is reachable from the driver program and provide credentials
 with `read` permission to the SolaceIO.Read.withSempClientFactory(SempClientFactory).
 
Usage example
The no-arg read() method
 The minimal example - reading from an existing Queue, using the no-arg read()
 method, with all the default configuration options.
 
 PCollection<Solace.Record> events =
   pipeline.apply(
     SolaceIO.read()
         .from(Queue.fromName("your-queue-name"))
         .withSempClientFactory(
                 BasicAuthSempClientFactory.builder()
                         .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
                         .username("semp-username")
                         .password("semp-password")
                         .vpnName("vpn-name")
                         .build())
         .withSessionServiceFactory(
                 BasicAuthJcsmpSessionServiceFactory.builder()
                         .host("your-host-name")
                               // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
                         .username("username")
                         .password("password")
                         .vpnName("vpn-name")
                         .build()));
 
 The advanced read(TypeDescriptor, SerializableFunction, SerializableFunction) method
 When using this method you can specify a custom output PCollection type and a custom timestamp function.
 {@literal @}DefaultSchema(JavaBeanSchema.class)
 public static class SimpleRecord {
    public String payload;
    public String messageId;
    public Instant timestamp;
    public SimpleRecord() {}
    public SimpleRecord(String payload, String messageId, Instant timestamp) {
        this.payload = payload;
        this.messageId = messageId;
        this.timestamp = timestamp;
    }
 }
 private static SimpleRecord toSimpleRecord(BytesXMLMessage record) {
    if (record == null) {
        return null;
    }
    return new SimpleRecord(
            new String(record.getBytes(), StandardCharsets.UTF_8),
            record.getApplicationMessageId(),
            record.getSenderTimestamp() != null
                    ? Instant.ofEpochMilli(record.getSenderTimestamp())
                    : Instant.now());
 }
 PCollection<SimpleRecord> events =
  pipeline.apply(
      SolaceIO.read(
                      TypeDescriptor.of(SimpleRecord.class),
                      record -> toSimpleRecord(record),
                      record -> record.timestamp)
              .from(Topic.fromName("your-topic-name"))
              .withSempClientFactory(...)
              .withSessionServiceFactory(...);
 
 Authentication
When reading from Solace, the user must use SolaceIO.Read.withSessionServiceFactory(SessionServiceFactory) to create a JCSMP session and SolaceIO.Read.withSempClientFactory(SempClientFactory) to authenticate to the SEMP API.
 
See SolaceIO.Read.withSessionServiceFactory(SessionServiceFactory) for session authentication.
 The connector provides implementation of the SessionServiceFactory using the Basic
 Authentication: BasicAuthJcsmpSessionServiceFactory.
 
For the authentication to the SEMP API (SolaceIO.Read.withSempClientFactory(SempClientFactory))
 the connector provides BasicAuthSempClientFactory to
 authenticate using the Basic Authentication.
 
Writing
To write to Solace, use write() with a PCollection<Solace.Record>. You can
 also use write(SerializableFunction) to specify a format function to convert the input
 type to Solace.Record.
 
Writing to a static topic or queue
The connector uses the Solace JCSMP API. The clients will write to a SMF topic to the port 55555 of the host. If you want to use a different port, specify it in the host property with the format "X.X.X.X:PORT".
Once you have a PCollection of Solace.Record, you can write to Solace using:
 
 PCollection<Solace.Record> solaceRecs = ...;
 PCollection<Solace.PublishResult> results =
         solaceRecs.apply(
                 "Write to Solace",
                 SolaceIO.write()
                         .to(SolaceIO.topicFromName("some-topic"))
                         .withSessionServiceFactory(
                            BasicAuthJcsmpSessionServiceFactory.builder()
                              .username("username")
                              .password("password")
                              .host("host:port")
                              .build()));
 
 The above code snippet will write to the VPN named "default", using 4 clients per worker (VM
 in Dataflow), and a maximum of 20 workers/VMs for writing (default values). You can change the
 default VPN name by setting the required JCSMP property in the session factory (in this case,
 with BasicAuthJcsmpSessionServiceFactory.vpnName()), the number of clients per worker
 with SolaceIO.Write.withNumberOfClientsPerWorker(int) and the number of parallel write clients
 using SolaceIO.Write.withNumShards(int).
 
Writing to dynamic destinations
To write to dynamic destinations, don't set theSolaceIO.Write.to(Solace.Queue) or SolaceIO.Write.to(Solace.Topic) property and make sure that all the Solace.Records have their
 destination field set to either a topic or a queue. You can do this prior to calling the write
 connector, or by using a format function and write(SerializableFunction).
 For instance, you can create a function like the following:
 // Generate Record with different destinations
 SerializableFunction<MyType, Solace.Record> formatFn =
    (MyType msg) -> {
       int queue = ... // some random number
       return Solace.Record.builder()
         .setDestination(Solace.Destination.builder()
                        .setName(String.format("q%d", queue))
                        .setType(Solace.DestinationType.QUEUE)
                        .build())
         .setMessageId(msg.getMessageId())
         .build();
 };
 
 And then use the connector as follows:
 
 // Ignore "to" method to use dynamic destinations
 SolaceOutput solaceResponses = msgs.apply("Write to Solace",
   SolaceIO.<MyType>write(formatFn)
        .withDeliveryMode(DeliveryMode.PERSISTENT)
        .withWriterType(SolaceIO.WriterType.STREAMING)
 ...
 
 Direct and persistent messages, and latency metrics
The connector can write either direct or persistent messages. The default mode is DIRECT.
The connector returns a PCollection of Solace.PublishResult, that you can use
 to get a confirmation of messages that have been published, or rejected, but only if it is
 publishing persistent messages.
 
If you are publishing persistent messages, then you can have some feedback about whether the
 messages have been published, and some publishing latency metrics. If the message has been
 published, Solace.PublishResult.getPublished() will be true. If it is false, it means
 that the message could not be published, and Solace.PublishResult.getError() will contain
 more details about why the message could not be published. To get latency metrics as well as the
 results, set the property SolaceIO.Write.publishLatencyMetrics().
 
Throughput and latency
This connector can work in two main modes: high latency or high throughput. The default mode
 favors high throughput over high latency. You can control this behavior with the methods SolaceIO.Write.withSubmissionMode(SubmissionMode) and SolaceIO.Write.withWriterType(WriterType).
 
The default mode works like the following options:
 PCollection<Solace.Record> solaceRecs = ...;
 PCollection<Solace.PublishResult> results =
         solaceRecs.apply(
                 "Write to Solace",
                 SolaceIO.write()
                         .to(SolaceIO.topicFromName("some-topic"))
                         .withSessionServiceFactory(
                            BasicAuthJcsmpSessionServiceFactory.builder()
                              .username("username")
                              .password("password")
                              .host("host:port")
                              .build())
                         .withSubmissionMode(SubmissionMode.HIGHER_THROUGHPUT)
                         .withWriterType(WriterType.BATCHED));
 
 SolaceIO.SubmissionMode.HIGHER_THROUGHPUT and SolaceIO.WriterType.BATCHED are the default
 values, and offer the higher possible throughput, and the lowest usage of resources in the runner
 side (due to the lower backpressure).
 
This connector writes bundles of 50 messages, using a bulk publish JCSMP method. This will increase the latency, since a message needs to "wait" until 50 messages are accumulated, before they are submitted to Solace.
For the lowest latency possible, use SolaceIO.SubmissionMode.LOWER_LATENCY and SolaceIO.WriterType.STREAMING.
 
 PCollection<Solace.PublishResult> results =
         solaceRecs.apply(
                 "Write to Solace",
                 SolaceIO.write()
                         .to(SolaceIO.topicFromName("some-topic"))
                         .withSessionServiceFactory(
                            BasicAuthJcsmpSessionServiceFactory.builder()
                              .username("username")
                              .password("password")
                              .host("host:port")
                              .build())
                         .withSubmissionMode(SubmissionMode.LOWER_LATENCY)
                         .withWriterType(WriterType.STREAMING));
 
 The streaming connector publishes each message individually, without holding up or batching before the message is sent to Solace. This will ensure the lowest possible latency, but it will offer a much lower throughput. The streaming connector does not use state and timers.
Both connectors uses state and timers to control the level of parallelism. If you are using Cloud Dataflow, it is recommended that you enable Streaming Engine to use this connector.
For full control over all the properties, use SolaceIO.SubmissionMode.CUSTOM. The connector
 will not override any property that you set, and you will have full control over all the JCSMP
 properties.
 
Authentication
When writing to Solace, the user must use SolaceIO.Write.withSessionServiceFactory(SessionServiceFactory) to create a JCSMP session.
 
See SolaceIO.Write.withSessionServiceFactory(SessionServiceFactory) for session authentication.
 The connector provides implementation of the SessionServiceFactory using basic
 authentication (BasicAuthJcsmpSessionServiceFactory), and another implementation using
 basic authentication but with a password stored as a secret in Google Cloud Secret Manager
 (GCPSecretSessionServiceFactory)
 
Connector retries
When the worker using the connector is created, the connector will attempt to connect to Solace.
If the client cannot connect to Solace for whatever reason, the connector will retry the connections using the following strategy. There will be a maximum of 4 retries. The first retry will be attempted 1 second after the first connection attempt. Every subsequent retry will multiply that time by a factor of two, with a maximum of 10 seconds.
If after those retries the client is still unable to connect to Solace, the connector will attempt to reconnect using the same strategy repeated for every single incoming message. If for some reason, there is a persistent issue that prevents the connection (e.g. client quota exhausted), you will need to stop your job manually, or the connector will keep retrying.
This strategy is applied to all the remote calls sent to Solace, either to connect, pull messages, push messages, etc.
- 
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classstatic enumstatic classstatic enum - 
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intstatic final DeliveryModestatic final intstatic final Booleanstatic final SolaceIO.SubmissionModestatic final SolaceIO.WriterTypestatic final SerializableFunction<Solace.Record, Instant>  - 
Constructor Summary
Constructors - 
Method Summary
Modifier and TypeMethodDescriptionstatic DestinationconvertToJcsmpDestination(Solace.Destination destination) Convert to a JCSMP destination from a schema-enabledSolace.Destination.static SolaceIO.Read<Solace.Record> read()Create aSolaceIO.Readtransform, to read from Solace.static <T> SolaceIO.Read<T> read(TypeDescriptor<T> typeDescriptor, SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn, SerializableFunction<T, Instant> timestampFn) Create aSolaceIO.Readtransform, to read from Solace.static SolaceIO.Write<Solace.Record> write()Create aSolaceIO.Writetransform, to write to Solace usingSolace.Recordobjects.static <T> SolaceIO.Write<T> write(SerializableFunction<T, Solace.Record> formatFunction) Create aSolaceIO.Writetransform, to write to Solace with a custom type. 
- 
Field Details
- 
SENDER_TIMESTAMP_FUNCTION
 - 
DEFAULT_WRITER_NUM_SHARDS
public static final int DEFAULT_WRITER_NUM_SHARDS- See Also:
 
 - 
DEFAULT_WRITER_CLIENTS_PER_WORKER
public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER- See Also:
 
 - 
DEFAULT_WRITER_PUBLISH_LATENCY_METRICS
 - 
DEFAULT_WRITER_SUBMISSION_MODE
 - 
DEFAULT_WRITER_DELIVERY_MODE
 - 
DEFAULT_WRITER_TYPE
 
 - 
 - 
Constructor Details
- 
SolaceIO
public SolaceIO() 
 - 
 - 
Method Details
- 
convertToJcsmpDestination
Convert to a JCSMP destination from a schema-enabledSolace.Destination.This method returns a
Destination, which may be either aTopicor aQueue - 
read
Create aSolaceIO.Readtransform, to read from Solace. The ingested records will be mapped to theSolace.Recordobjects. - 
read
public static <T> SolaceIO.Read<T> read(TypeDescriptor<T> typeDescriptor, SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn, SerializableFunction<T, Instant> timestampFn) Create aSolaceIO.Readtransform, to read from Solace. Specify aSerializableFunctionto map incomingBytesXMLMessagerecords, to the object of your choice. You also need to specify aTypeDescriptorfor your class and the timestamp function which returns anInstantfrom the record.The type descriptor will be used to infer a coder from CoderRegistry or Schema Registry. You can initialize a new TypeDescriptor in the following manner:
TypeDescriptor<T> typeDescriptor = TypeDescriptor.of(YourOutputType.class); - 
write
Create aSolaceIO.Writetransform, to write to Solace with a custom type.If you are using a custom data class, the format function should return a
Solace.Recordcorresponding to your custom data class instance.If you are using this formatting function with dynamic destinations, you must ensure that you set the right value in the destination value of the
Solace.Recordmessages. - 
write
Create aSolaceIO.Writetransform, to write to Solace usingSolace.Recordobjects. 
 -