@Internal public class SolaceIO extends java.lang.Object
PTransform to read and write from/to Solace event
broker.
Note: Internal use only; this API is beta and subject to change.
read() or read(TypeDescriptor,
SerializableFunction, SerializableFunction).
read() top-level methodThis method returns a PCollection of Solace.Record objects. It uses a default mapper
(Solace.SolaceRecordMapper.map(BytesXMLMessage)) to map from the received BytesXMLMessage from Solace, to the Solace.Record objects.
By default, it also uses a XMLMessage.getSenderTimestamp() for watermark
estimation. This SerializableFunction can be overridden with SolaceIO.Read.withTimestampFn(SerializableFunction) method.
When using this method, the Coders are inferred automatically.
read(TypeDescriptor, SerializableFunction, SerializableFunction)
top-level methodWith this method, the user can:
BytesXMLMessage and their output type and
read() method.
SolaceIO.Read.from(Solace.Queue)} or a topic (SolaceIO.Read.from(Solace.Topic))Regardless of the top-level read method choice, the user can specify whether to read from a
Queue - SolaceIO.Read.from(Solace.Queue), or a Topic SolaceIO.Read.from(Solace.Topic).
Note: when a user specifies to read from a Topic, the connector will create a matching Queue and a Subscription. The user must ensure that the SEMP API is reachable from the driver program and must provide credentials that have `write` permission to the SEMP Config API. The created Queue will be non-exclusive. The Queue will not be deleted when the pipeline is terminated.
Note: If the user specifies to read from a Queue, the driver program
will execute a call to the SEMP API to check if the Queue is `exclusive` or `non-exclusive`. The
user must ensure that the SEMP API is reachable from the driver program and provide credentials
with `read` permission to the SolaceIO.Read.withSempClientFactory(SempClientFactory).
read() methodThe minimal example - reading from an existing Queue, using the no-arg read()
method, with all the default configuration options.
PCollection<Solace.Record> events =
pipeline.apply(
SolaceIO.read()
.from(Queue.fromName("your-queue-name"))
.withSempClientFactory(
BasicAuthSempClientFactory.builder()
.host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
.username("semp-username")
.password("semp-password")
.vpnName("vpn-name")
.build())
.withSessionServiceFactory(
BasicAuthJcsmpSessionServiceFactory.builder()
.host("your-host-name")
// e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
.username("username")
.password("password")
.vpnName("vpn-name")
.build()));
read(TypeDescriptor, SerializableFunction,
SerializableFunction) methodWhen using this method you can specify a custom output PCollection type and a custom timestamp function.
{@code| Modifier and Type | Class and Description |
|---|---|
static class |
SolaceIO.Read<T> |
| Modifier and Type | Field and Description |
|---|---|
static SerializableFunction<Solace.Record,Instant> |
SENDER_TIMESTAMP_FUNCTION |
| Constructor and Description |
|---|
SolaceIO() |
| Modifier and Type | Method and Description |
|---|---|
static com.solacesystems.jcsmp.Destination |
convertToJcsmpDestination(Solace.Destination destination)
Convert to a JCSMP destination from a schema-enabled
Solace.Destination. |
static SolaceIO.Read<Solace.Record> |
read()
Create a
SolaceIO.Read transform, to read from Solace. |
static <T> SolaceIO.Read<T> |
read(TypeDescriptor<T> typeDescriptor,
SerializableFunction<com.solacesystems.jcsmp.BytesXMLMessage,T> parseFn,
SerializableFunction<T,Instant> timestampFn)
Create a
SolaceIO.Read transform, to read from Solace. |
public static final SerializableFunction<Solace.Record,Instant> SENDER_TIMESTAMP_FUNCTION
public static com.solacesystems.jcsmp.Destination convertToJcsmpDestination(Solace.Destination destination)
Solace.Destination.
This method returns a Destination, which may be either a Topic or a Queue
public static SolaceIO.Read<Solace.Record> read()
SolaceIO.Read transform, to read from Solace. The ingested records will be mapped to
the Solace.Record objects.public static <T> SolaceIO.Read<T> read(TypeDescriptor<T> typeDescriptor, SerializableFunction<com.solacesystems.jcsmp.BytesXMLMessage,T> parseFn, SerializableFunction<T,Instant> timestampFn)
SolaceIO.Read transform, to read from Solace. Specify a SerializableFunction to
map incoming BytesXMLMessage records, to the object of your choice. You also need to
specify a TypeDescriptor for your class and the timestamp function which returns an
Instant from the record.
The type descriptor will be used to infer a coder from CoderRegistry or Schema Registry. You can initialize a new TypeDescriptor in the following manner:
TypeDescriptor<T> typeDescriptor = TypeDescriptor.of(YourOutputType.class);