@Internal public final class UnboundedStreamingSolaceWriter extends UnboundedSolaceWriter
The DoFn will create several JCSMP sessions per VM, and the sessions and producers will be reused across different threads (if the number of threads is higher than the number of sessions, which is probably the most common case).
The producer uses the JCSMP streaming mode to publish a single message at a time, processing the acks from this publication, and returning them as output of the DoFn.
There are no acks if the delivery mode is set to DIRECT.
This writer DoFn offers lower latency and lower throughput than UnboundedBatchedSolaceWriter
.
DoFn.AlwaysFetched, DoFn.BoundedPerElement, DoFn.BundleFinalizer, DoFn.Element, DoFn.FieldAccess, DoFn.FinishBundle, DoFn.FinishBundleContext, DoFn.GetInitialRestriction, DoFn.GetInitialWatermarkEstimatorState, DoFn.GetRestrictionCoder, DoFn.GetSize, DoFn.GetWatermarkEstimatorStateCoder, DoFn.Key, DoFn.MultiOutputReceiver, DoFn.NewTracker, DoFn.NewWatermarkEstimator, DoFn.OnTimer, DoFn.OnTimerContext, DoFn.OnTimerFamily, DoFn.OnWindowExpiration, DoFn.OnWindowExpirationContext, DoFn.OutputReceiver<T>, DoFn.ProcessContext, DoFn.ProcessContinuation, DoFn.ProcessElement, DoFn.RequiresStableInput, DoFn.RequiresTimeSortedInput, DoFn.Restriction, DoFn.Setup, DoFn.SideInput, DoFn.SplitRestriction, DoFn.StartBundle, DoFn.StartBundleContext, DoFn.StateId, DoFn.Teardown, DoFn.TimerFamily, DoFn.TimerId, DoFn.Timestamp, DoFn.TruncateRestriction, DoFn.UnboundedPerElement, DoFn.WatermarkEstimatorState, DoFn.WindowedContext
Constructor and Description |
---|
UnboundedStreamingSolaceWriter(SerializableFunction<Solace.Record,Destination> destinationFn,
SessionServiceFactory sessionServiceFactory,
DeliveryMode deliveryMode,
SolaceIO.SubmissionMode submissionMode,
int producersMapCardinality,
boolean publishLatencyMetrics) |
Modifier and Type | Method and Description |
---|---|
void |
finishBundle(DoFn.FinishBundleContext context) |
void |
processElement(KV<java.lang.Integer,Solace.Record> element,
Instant timestamp,
ValueState<java.lang.Integer> currentKeyState) |
addToCurrentBundle, createMessagesArray, createSingleMessage, getCurrentBundle, getCurrentBundleTimestamp, getDeliveryMode, getDestinationFn, getFailedLatencyMetric, getProducersMapCardinality, getPublishLatencyMetric, getSubmissionMode, publishResults, setCurrentBundleTimestamp, shouldPublishLatencyMetrics, solaceSessionServiceWithProducer, startBundle, teardown, updateProducerIndex
getAllowedTimestampSkew, getInputTypeDescriptor, getOutputTypeDescriptor, populateDisplayData, prepareForProcessing
public UnboundedStreamingSolaceWriter(SerializableFunction<Solace.Record,Destination> destinationFn, SessionServiceFactory sessionServiceFactory, DeliveryMode deliveryMode, SolaceIO.SubmissionMode submissionMode, int producersMapCardinality, boolean publishLatencyMetrics)
@DoFn.ProcessElement public void processElement(@DoFn.Element KV<java.lang.Integer,Solace.Record> element, @DoFn.Timestamp Instant timestamp, @DoFn.AlwaysFetched @DoFn.StateId(value="current_key") ValueState<java.lang.Integer> currentKeyState)
@DoFn.FinishBundle public void finishBundle(DoFn.FinishBundleContext context)