Class UnboundedBatchedSolaceWriter
java.lang.Object
org.apache.beam.sdk.transforms.DoFn<KV<Integer,Solace.Record>,Solace.PublishResult>
org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter
org.apache.beam.sdk.io.solace.write.UnboundedBatchedSolaceWriter
- All Implemented Interfaces:
Serializable
,HasDisplayData
This DoFn is the responsible for writing to Solace in batch mode (holding up any messages), and
emit the corresponding output (success or fail; only for persistent messages), so the
SolaceIO.Write connector can be composed with other subsequent transforms in the pipeline.
The DoFn will create several JCSMP sessions per VM, and the sessions and producers will be reused across different threads (if the number of threads is higher than the number of sessions, which is probably the most common case).
The producer uses the JCSMP send multiple mode to publish a batch of messages together with a single API call. The acks from this publication are also processed in batch, and returned as the output of the DoFn.
The batch size is 50, and this is currently the maximum value supported by Solace.
There are no acks if the delivery mode is set to DIRECT.
This writer DoFn offers higher throughput than UnboundedStreamingSolaceWriter
but also
higher latency.
- See Also:
-
Nested Class Summary
Nested classes/interfaces inherited from class org.apache.beam.sdk.transforms.DoFn
DoFn.AlwaysFetched, DoFn.BoundedPerElement, DoFn.BundleFinalizer, DoFn.Element, DoFn.FieldAccess, DoFn.FinishBundle, DoFn.FinishBundleContext, DoFn.GetInitialRestriction, DoFn.GetInitialWatermarkEstimatorState, DoFn.GetRestrictionCoder, DoFn.GetSize, DoFn.GetWatermarkEstimatorStateCoder, DoFn.Key, DoFn.MultiOutputReceiver, DoFn.NewTracker, DoFn.NewWatermarkEstimator, DoFn.OnTimer, DoFn.OnTimerContext, DoFn.OnTimerFamily, DoFn.OnWindowExpiration, DoFn.OnWindowExpirationContext, DoFn.OutputReceiver<T>, DoFn.ProcessContext, DoFn.ProcessContinuation, DoFn.ProcessElement, DoFn.RequiresStableInput, DoFn.RequiresTimeSortedInput, DoFn.Restriction, DoFn.Setup, DoFn.SideInput, DoFn.SplitRestriction, DoFn.StartBundle, DoFn.StartBundleContext, DoFn.StateId, DoFn.Teardown, DoFn.TimerFamily, DoFn.TimerId, DoFn.Timestamp, DoFn.TruncateRestriction, DoFn.UnboundedPerElement, DoFn.WatermarkEstimatorState, DoFn.WindowedContext
-
Constructor Summary
ConstructorsConstructorDescriptionUnboundedBatchedSolaceWriter
(SerializableFunction<Solace.Record, Destination> destinationFn, SessionServiceFactory sessionServiceFactory, DeliveryMode deliveryMode, SolaceIO.SubmissionMode submissionMode, int producersMapCardinality, boolean publishLatencyMetrics) -
Method Summary
Modifier and TypeMethodDescriptionvoid
void
void
processElement
(KV<Integer, Solace.Record> element, Timer bundleFlusherTimer, Instant timestamp) Methods inherited from class org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter
addToCurrentBundle, createMessagesArray, createSingleMessage, getCurrentBundle, getCurrentBundleTimestamp, getDeliveryMode, getDestinationFn, getFailedLatencyMetric, getProducersMapCardinality, getPublishLatencyMetric, getSubmissionMode, publishResults, setCurrentBundleTimestamp, shouldPublishLatencyMetrics, solaceSessionServiceWithProducer, startBundle, teardown, updateProducerIndex
Methods inherited from class org.apache.beam.sdk.transforms.DoFn
getAllowedTimestampSkew, getInputTypeDescriptor, getOutputTypeDescriptor, populateDisplayData, prepareForProcessing
-
Constructor Details
-
UnboundedBatchedSolaceWriter
public UnboundedBatchedSolaceWriter(SerializableFunction<Solace.Record, Destination> destinationFn, SessionServiceFactory sessionServiceFactory, DeliveryMode deliveryMode, SolaceIO.SubmissionMode submissionMode, int producersMapCardinality, boolean publishLatencyMetrics)
-
-
Method Details
-
processElement
@ProcessElement public void processElement(@Element KV<Integer, Solace.Record> element, @TimerId("bundle_flusher") Timer bundleFlusherTimer, @Timestamp Instant timestamp) -
finishBundle
@FinishBundle public void finishBundle(DoFn<KV<Integer, Solace.Record>, throws IOExceptionSolace.PublishResult>.FinishBundleContext context) - Throws:
IOException
-
flushBundle
@OnTimer("bundle_flusher") public void flushBundle(DoFn<KV<Integer, Solace.Record>, throws IOExceptionSolace.PublishResult>.OnTimerContext context) - Throws:
IOException
-