@Internal public class SolaceIO extends java.lang.Object
PTransform
to read and write from/to Solace event
broker.
Note: Internal use only; this API is beta and subject to change.
read()
or read(TypeDescriptor,
SerializableFunction, SerializableFunction)
.
read()
top-level methodThis method returns a PCollection of Solace.Record
objects. It uses a default mapper
(Solace.SolaceRecordMapper.map(BytesXMLMessage)
) to map from the received BytesXMLMessage
from Solace, to the Solace.Record
objects.
By default, it also uses a XMLMessage.getSenderTimestamp()
for watermark
estimation. This SerializableFunction
can be overridden with SolaceIO.Read.withTimestampFn(SerializableFunction)
method.
When using this method, the Coders are inferred automatically.
read(TypeDescriptor, SerializableFunction, SerializableFunction)
top-level methodWith this method, the user can:
BytesXMLMessage
and their output type and
read()
method.
SolaceIO.Read.from(Solace.Queue)
} or a topic (SolaceIO.Read.from(Solace.Topic)
)Regardless of the top-level read method choice, the user can specify whether to read from a
Queue - SolaceIO.Read.from(Solace.Queue)
, or a Topic SolaceIO.Read.from(Solace.Topic)
.
Note: when a user specifies to read from a Topic, the connector will create a matching Queue and a Subscription. The user must ensure that the SEMP API is reachable from the driver program and must provide credentials that have `write` permission to the SEMP Config API. The created Queue will be non-exclusive. The Queue will not be deleted when the pipeline is terminated.
Note: If the user specifies to read from a Queue, the driver program
will execute a call to the SEMP API to check if the Queue is `exclusive` or `non-exclusive`. The
user must ensure that the SEMP API is reachable from the driver program and provide credentials
with `read` permission to the SolaceIO.Read.withSempClientFactory(SempClientFactory)
.
read()
methodThe minimal example - reading from an existing Queue, using the no-arg read()
method, with all the default configuration options.
PCollection<Solace.Record> events =
pipeline.apply(
SolaceIO.read()
.from(Queue.fromName("your-queue-name"))
.withSempClientFactory(
BasicAuthSempClientFactory.builder()
.host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
.username("semp-username")
.password("semp-password")
.vpnName("vpn-name")
.build())
.withSessionServiceFactory(
BasicAuthJcsmpSessionServiceFactory.builder()
.host("your-host-name")
// e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
.username("username")
.password("password")
.vpnName("vpn-name")
.build()));
read(TypeDescriptor, SerializableFunction,
SerializableFunction)
methodWhen using this method you can specify a custom output PCollection type and a custom timestamp function.
{@code
Modifier and Type | Class and Description |
---|---|
static class |
SolaceIO.Read<T> |
Modifier and Type | Field and Description |
---|---|
static SerializableFunction<Solace.Record,Instant> |
SENDER_TIMESTAMP_FUNCTION |
Constructor and Description |
---|
SolaceIO() |
Modifier and Type | Method and Description |
---|---|
static com.solacesystems.jcsmp.Destination |
convertToJcsmpDestination(Solace.Destination destination)
Convert to a JCSMP destination from a schema-enabled
Solace.Destination . |
static SolaceIO.Read<Solace.Record> |
read()
Create a
SolaceIO.Read transform, to read from Solace. |
static <T> SolaceIO.Read<T> |
read(TypeDescriptor<T> typeDescriptor,
SerializableFunction<com.solacesystems.jcsmp.BytesXMLMessage,T> parseFn,
SerializableFunction<T,Instant> timestampFn)
Create a
SolaceIO.Read transform, to read from Solace. |
public static final SerializableFunction<Solace.Record,Instant> SENDER_TIMESTAMP_FUNCTION
public static com.solacesystems.jcsmp.Destination convertToJcsmpDestination(Solace.Destination destination)
Solace.Destination
.
This method returns a Destination
, which may be either a Topic
or a Queue
public static SolaceIO.Read<Solace.Record> read()
SolaceIO.Read
transform, to read from Solace. The ingested records will be mapped to
the Solace.Record
objects.public static <T> SolaceIO.Read<T> read(TypeDescriptor<T> typeDescriptor, SerializableFunction<com.solacesystems.jcsmp.BytesXMLMessage,T> parseFn, SerializableFunction<T,Instant> timestampFn)
SolaceIO.Read
transform, to read from Solace. Specify a SerializableFunction
to
map incoming BytesXMLMessage
records, to the object of your choice. You also need to
specify a TypeDescriptor
for your class and the timestamp function which returns an
Instant
from the record.
The type descriptor will be used to infer a coder from CoderRegistry or Schema Registry. You can initialize a new TypeDescriptor in the following manner:
TypeDescriptor<T> typeDescriptor = TypeDescriptor.of(YourOutputType.class);