@Internal public class SolaceIO extends java.lang.Object
PTransform
to read and write from/to Solace event
broker.
Note: Internal use only; this API is beta and subject to change.
read()
or read(TypeDescriptor,
SerializableFunction, SerializableFunction)
.
read()
top-level methodThis method returns a PCollection of Solace.Record
objects. It uses a default mapper
(Solace.SolaceRecordMapper.map(BytesXMLMessage)
) to map from the received BytesXMLMessage
from Solace, to the Solace.Record
objects.
By default, it also uses a XMLMessage.getSenderTimestamp()
for watermark
estimation. This SerializableFunction
can be overridden with SolaceIO.Read.withTimestampFn(SerializableFunction)
method.
When using this method, the Coders are inferred automatically.
read(TypeDescriptor, SerializableFunction, SerializableFunction)
top-level methodWith this method, the user can:
BytesXMLMessage
and their output type and
read()
method.
SolaceIO.Read.from(Solace.Queue)
} or a topic (SolaceIO.Read.from(Solace.Topic)
)Regardless of the top-level read method choice, the user can specify whether to read from a
Queue - SolaceIO.Read.from(Solace.Queue)
, or a Topic SolaceIO.Read.from(Solace.Topic)
.
Note: when a user specifies to read from a Topic, the connector will create a matching Queue and a Subscription. The user must ensure that the SEMP API is reachable from the driver program and must provide credentials that have `write` permission to the SEMP Config API. The created Queue will be non-exclusive. The Queue will not be deleted when the pipeline is terminated.
Note: If the user specifies to read from a Queue, the driver program
will execute a call to the SEMP API to check if the Queue is `exclusive` or `non-exclusive`. The
user must ensure that the SEMP API is reachable from the driver program and provide credentials
with `read` permission to the SolaceIO.Read.withSempClientFactory(SempClientFactory)
.
read()
methodThe minimal example - reading from an existing Queue, using the no-arg read()
method, with all the default configuration options.
PCollection<Solace.Record> events =
pipeline.apply(
SolaceIO.read()
.from(Queue.fromName("your-queue-name"))
.withSempClientFactory(
BasicAuthSempClientFactory.builder()
.host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
.username("semp-username")
.password("semp-password")
.vpnName("vpn-name")
.build())
.withSessionServiceFactory(
BasicAuthJcsmpSessionServiceFactory.builder()
.host("your-host-name")
// e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
.username("username")
.password("password")
.vpnName("vpn-name")
.build()));
read(TypeDescriptor, SerializableFunction,
SerializableFunction)
methodWhen using this method you can specify a custom output PCollection type and a custom timestamp function.
{@code
Modifier and Type | Class and Description |
---|---|
static class |
SolaceIO.Read<T> |
static class |
SolaceIO.SubmissionMode |
static class |
SolaceIO.Write<T> |
static class |
SolaceIO.WriterType |
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_WRITER_CLIENTS_PER_WORKER |
static DeliveryMode |
DEFAULT_WRITER_DELIVERY_MODE |
static int |
DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS |
static java.lang.Boolean |
DEFAULT_WRITER_PUBLISH_LATENCY_METRICS |
static SolaceIO.SubmissionMode |
DEFAULT_WRITER_SUBMISSION_MODE |
static SolaceIO.WriterType |
DEFAULT_WRITER_TYPE |
static SerializableFunction<Solace.Record,Instant> |
SENDER_TIMESTAMP_FUNCTION |
Constructor and Description |
---|
SolaceIO() |
Modifier and Type | Method and Description |
---|---|
static Destination |
convertToJcsmpDestination(Solace.Destination destination)
Convert to a JCSMP destination from a schema-enabled
Solace.Destination . |
static SolaceIO.Read<Solace.Record> |
read()
Create a
SolaceIO.Read transform, to read from Solace. |
static <T> SolaceIO.Read<T> |
read(TypeDescriptor<T> typeDescriptor,
SerializableFunction<BytesXMLMessage,T> parseFn,
SerializableFunction<T,Instant> timestampFn)
Create a
SolaceIO.Read transform, to read from Solace. |
static SolaceIO.Write<Solace.Record> |
write()
Create a
SolaceIO.Write transform, to write to Solace using Solace.Record objects. |
static <T> SolaceIO.Write<T> |
write(SerializableFunction<T,Solace.Record> formatFunction)
Create a
SolaceIO.Write transform, to write to Solace with a custom type. |
public static final SerializableFunction<Solace.Record,Instant> SENDER_TIMESTAMP_FUNCTION
public static final int DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS
public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER
public static final java.lang.Boolean DEFAULT_WRITER_PUBLISH_LATENCY_METRICS
public static final SolaceIO.SubmissionMode DEFAULT_WRITER_SUBMISSION_MODE
public static final DeliveryMode DEFAULT_WRITER_DELIVERY_MODE
public static final SolaceIO.WriterType DEFAULT_WRITER_TYPE
public static Destination convertToJcsmpDestination(Solace.Destination destination)
Solace.Destination
.
This method returns a Destination
, which may be either a Topic
or a Queue
public static SolaceIO.Read<Solace.Record> read()
SolaceIO.Read
transform, to read from Solace. The ingested records will be mapped to
the Solace.Record
objects.public static <T> SolaceIO.Read<T> read(TypeDescriptor<T> typeDescriptor, SerializableFunction<BytesXMLMessage,T> parseFn, SerializableFunction<T,Instant> timestampFn)
SolaceIO.Read
transform, to read from Solace. Specify a SerializableFunction
to
map incoming BytesXMLMessage
records, to the object of your choice. You also need to
specify a TypeDescriptor
for your class and the timestamp function which returns an
Instant
from the record.
The type descriptor will be used to infer a coder from CoderRegistry or Schema Registry. You can initialize a new TypeDescriptor in the following manner:
TypeDescriptor<T> typeDescriptor = TypeDescriptor.of(YourOutputType.class);
public static <T> SolaceIO.Write<T> write(SerializableFunction<T,Solace.Record> formatFunction)
SolaceIO.Write
transform, to write to Solace with a custom type.
If you are using a custom data class, the format function should return a Solace.Record
corresponding to your custom data class instance.
If you are using this formatting function with dynamic destinations, you must ensure that
you set the right value in the destination value of the Solace.Record
messages.
public static SolaceIO.Write<Solace.Record> write()
SolaceIO.Write
transform, to write to Solace using Solace.Record
objects.