Class KafkaIO.WriteRecords<K,V>
- All Implemented Interfaces:
Serializable
,HasDisplayData
- Enclosing class:
KafkaIO
PTransform
to write to a Kafka topic with ProducerRecord's. See KafkaIO
for
more information on usage and configuration.- See Also:
-
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionexpand
(PCollection<ProducerRecord<K, V>> input) Override this method to specify how thisPTransform
should be expanded on the givenInputT
.abstract ErrorHandler
<BadRecord, ?> abstract BadRecordRouter
abstract @Nullable Class
<? extends Serializer<K>> abstract int
abstract @Nullable KafkaPublishTimestampFunction
<ProducerRecord<K, V>> getTopic()
abstract @Nullable Class
<? extends Serializer<V>> abstract boolean
isEOS()
void
populateDisplayData
(DisplayData.Builder builder) Register display data for the given transform or component.updateProducerProperties
(Map<String, Object> configUpdates) Deprecated.as of version 2.13.void
validate
(@Nullable PipelineOptions options) Called before running the Pipeline to verify this transform is fully and correctly specified.withBadRecordErrorHandler
(ErrorHandler<BadRecord, ?> badRecordErrorHandler) withBootstrapServers
(String bootstrapServers) Returns a newKafkaIO.Write
transform with Kafka producer pointing tobootstrapServers
.withConsumerFactoryFn
(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) When exactly-once semantics are enabled (seewithEOS(int, String)
), the sink needs to fetch previously stored state with Kafka topic.Provides exactly-once semantics while writing to Kafka, which enables applications with end-to-end exactly-once guarantees on top of exactly-once semantics within Beam pipelines.The timestamp for each record being published is set to timestamp of the element in the pipeline.withKeySerializer
(Class<? extends Serializer<K>> keySerializer) Sets aSerializer
for serializing key (if any) to bytes.withProducerConfigUpdates
(Map<String, Object> configUpdates) Update configuration for the producer.withProducerFactoryFn
(SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) Sets a custom function to create Kafka producer.withPublishTimestampFunction
(KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) Deprecated.useProducerRecords
to set publish timestamp.Sets the default Kafka topic to write to.withValueSerializer
(Class<? extends Serializer<V>> valueSerializer) Sets aSerializer
for serializing value to bytes.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setDisplayData, setResourceHints, toString, validate
-
Constructor Details
-
WriteRecords
public WriteRecords()
-
-
Method Details
-
getTopic
-
getProducerConfig
-
getProducerFactoryFn
-
getKeySerializer
-
getValueSerializer
-
getPublishTimestampFunction
@Pure public abstract @Nullable KafkaPublishTimestampFunction<ProducerRecord<K,V>> getPublishTimestampFunction() -
isEOS
-
getSinkGroupId
-
getNumShards
-
getConsumerFactoryFn
-
getBadRecordRouter
-
getBadRecordErrorHandler
-
withBootstrapServers
Returns a newKafkaIO.Write
transform with Kafka producer pointing tobootstrapServers
. -
withTopic
Sets the default Kafka topic to write to. UseProducerRecords
to set topic name per published record. -
withKeySerializer
Sets aSerializer
for serializing key (if any) to bytes.A key is optional while writing to Kafka. Note when a key is set, its hash is used to determine partition in Kafka (see
ProducerRecord
for more details). -
withValueSerializer
public KafkaIO.WriteRecords<K,V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) Sets aSerializer
for serializing value to bytes. -
updateProducerProperties
@Deprecated public KafkaIO.WriteRecords<K,V> updateProducerProperties(Map<String, Object> configUpdates) Deprecated.as of version 2.13. UsewithProducerConfigUpdates(Map)
instead.Adds the given producer properties, overriding old values of properties with the same key. -
withProducerConfigUpdates
Update configuration for the producer. Note that the default producer properties will not be completely overridden. This method only updates the value which has the same key.By default, the producer uses the configuration from
DEFAULT_PRODUCER_PROPERTIES
. -
withProducerFactoryFn
public KafkaIO.WriteRecords<K,V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) Sets a custom function to create Kafka producer. Primarily used for tests. Default isKafkaProducer
-
withInputTimestamp
The timestamp for each record being published is set to timestamp of the element in the pipeline. This is equivalent towithPublishTimestampFunction((e, ts) -> ts)
.
NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is processing messages from the past, they might be deleted immediately by Kafka after being published if the timestamps are older than Kafka cluster'slog.retention.hours
. -
withPublishTimestampFunction
@Deprecated public KafkaIO.WriteRecords<K,V> withPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) Deprecated.useProducerRecords
to set publish timestamp.A function to provide timestamp for records being published.
NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is processing messages from the past, they might be deleted immediately by Kafka after being published if the timestamps are older than Kafka cluster'slog.retention.hours
. -
withEOS
Provides exactly-once semantics while writing to Kafka, which enables applications with end-to-end exactly-once guarantees on top of exactly-once semantics within Beam pipelines. It ensures that records written to sink are committed on Kafka exactly once, even in the case of retries during pipeline execution even when some processing is retried. Retries typically occur when workers restart (as in failure recovery), or when the work is redistributed (as in an autoscaling event).Beam runners typically provide exactly-once semantics for results of a pipeline, but not for side effects from user code in transform. If a transform such as Kafka sink writes to an external system, those writes might occur more than once. When EOS is enabled here, the sink transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka (version 0.11+) to ensure a record is written only once. As the implementation relies on runners checkpoint semantics, not all the runners are compatible. The sink throws an exception during initialization if the runner is not explicitly allowed. The Dataflow, Flink, and Spark runners are compatible.
Note on performance: Exactly-once sink involves two shuffles of the records. In addition to cost of shuffling the records among workers, the records go through 2 serialization-deserialization cycles. Depending on volume and cost of serialization, the CPU cost might be noticeable. The CPU cost can be reduced by writing byte arrays (i.e. serializing them to byte before writing to Kafka sink).
- Parameters:
numShards
- Sets sink parallelism. The state metadata stored on Kafka is stored across this many virtual partitions usingsinkGroupId
. A good rule of thumb is to set this to be around number of partitions in Kafka topic.sinkGroupId
- The group id used to store small amount of state as metadata on Kafka. It is similar to consumer group id used with aKafkaConsumer
. Each job should use a unique group id so that restarts/updates of job preserve the state to ensure exactly-once semantics. The state is committed atomically with sink transactions on Kafka. SeeKafkaProducer.sendOffsetsToTransaction(Map, String)
for more information. The sink performs multiple sanity checks during initialization to catch common mistakes so that it does not end up using state that does not seem to be written by the same job.
-
withConsumerFactoryFn
public KafkaIO.WriteRecords<K,V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) When exactly-once semantics are enabled (seewithEOS(int, String)
), the sink needs to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer. Similar toKafkaIO.Read.withConsumerFactoryFn(SerializableFunction)
, a factory function can be supplied if required in a specific case. The default isKafkaConsumer
. -
withBadRecordErrorHandler
public KafkaIO.WriteRecords<K,V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) -
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<ProducerRecord<K,
V>>, PDone>
-
validate
Description copied from class:PTransform
Called before running the Pipeline to verify this transform is fully and correctly specified.By default, does nothing.
- Overrides:
validate
in classPTransform<PCollection<ProducerRecord<K,
V>>, PDone>
-
populateDisplayData
Description copied from class:PTransform
Register display data for the given transform or component.populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect display data viaDisplayData.from(HasDisplayData)
. Implementations may callsuper.populateDisplayData(builder)
in order to register display data in the current namespace, but should otherwise usesubcomponent.populateDisplayData(builder)
to use the namespace of the subcomponent.By default, does not register any display data. Implementors may override this method to provide their own display data.
- Specified by:
populateDisplayData
in interfaceHasDisplayData
- Overrides:
populateDisplayData
in classPTransform<PCollection<ProducerRecord<K,
V>>, PDone> - Parameters:
builder
- The builder to populate with display data.- See Also:
-