Class KafkaIO.WriteRecords<K,V>
- All Implemented Interfaces:
Serializable,HasDisplayData
- Enclosing class:
KafkaIO
PTransform to write to a Kafka topic with ProducerRecord's. See KafkaIO for
more information on usage and configuration.- See Also:
-
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionexpand(PCollection<ProducerRecord<K, V>> input) Override this method to specify how thisPTransformshould be expanded on the givenInputT.abstract ErrorHandler<BadRecord, ?> abstract BadRecordRouterabstract @Nullable Class<? extends Serializer<K>> abstract intabstract @Nullable KafkaPublishTimestampFunction<ProducerRecord<K, V>> getTopic()abstract @Nullable Class<? extends Serializer<V>> abstract booleanisEOS()voidpopulateDisplayData(DisplayData.Builder builder) Register display data for the given transform or component.updateProducerProperties(Map<String, Object> configUpdates) Deprecated.as of version 2.13.voidvalidate(@Nullable PipelineOptions options) Called before running the Pipeline to verify this transform is fully and correctly specified.withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) withBootstrapServers(String bootstrapServers) Returns a newKafkaIO.Writetransform with Kafka producer pointing tobootstrapServers.withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) When exactly-once semantics are enabled (seewithEOS(int, String)), the sink needs to fetch previously stored state with Kafka topic.Provides exactly-once semantics while writing to Kafka, which enables applications with end-to-end exactly-once guarantees on top of exactly-once semantics within Beam pipelines.The timestamp for each record being published is set to timestamp of the element in the pipeline.withKeySerializer(Class<? extends Serializer<K>> keySerializer) Sets aSerializerfor serializing key (if any) to bytes.withProducerConfigUpdates(Map<String, Object> configUpdates) Update configuration for the producer.withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) Sets a custom function to create Kafka producer.withPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) Deprecated.useProducerRecordsto set publish timestamp.Sets the default Kafka topic to write to.withValueSerializer(Class<? extends Serializer<V>> valueSerializer) Sets aSerializerfor serializing value to bytes.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setDisplayData, setResourceHints, toString, validate
-
Constructor Details
-
WriteRecords
public WriteRecords()
-
-
Method Details
-
getTopic
-
getProducerConfig
-
getProducerFactoryFn
-
getKeySerializer
-
getValueSerializer
-
getPublishTimestampFunction
@Pure public abstract @Nullable KafkaPublishTimestampFunction<ProducerRecord<K,V>> getPublishTimestampFunction() -
isEOS
-
getSinkGroupId
-
getNumShards
-
getConsumerFactoryFn
-
getBadRecordRouter
-
getBadRecordErrorHandler
-
withBootstrapServers
Returns a newKafkaIO.Writetransform with Kafka producer pointing tobootstrapServers. -
withTopic
Sets the default Kafka topic to write to. UseProducerRecordsto set topic name per published record. -
withKeySerializer
Sets aSerializerfor serializing key (if any) to bytes.A key is optional while writing to Kafka. Note when a key is set, its hash is used to determine partition in Kafka (see
ProducerRecordfor more details). -
withValueSerializer
public KafkaIO.WriteRecords<K,V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) Sets aSerializerfor serializing value to bytes. -
updateProducerProperties
@Deprecated public KafkaIO.WriteRecords<K,V> updateProducerProperties(Map<String, Object> configUpdates) Deprecated.as of version 2.13. UsewithProducerConfigUpdates(Map)instead.Adds the given producer properties, overriding old values of properties with the same key. -
withProducerConfigUpdates
Update configuration for the producer. Note that the default producer properties will not be completely overridden. This method only updates the value which has the same key.By default, the producer uses the configuration from
DEFAULT_PRODUCER_PROPERTIES. -
withProducerFactoryFn
public KafkaIO.WriteRecords<K,V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) Sets a custom function to create Kafka producer. Primarily used for tests. Default isKafkaProducer -
withInputTimestamp
The timestamp for each record being published is set to timestamp of the element in the pipeline. This is equivalent towithPublishTimestampFunction((e, ts) -> ts).
NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is processing messages from the past, they might be deleted immediately by Kafka after being published if the timestamps are older than Kafka cluster'slog.retention.hours. -
withPublishTimestampFunction
@Deprecated public KafkaIO.WriteRecords<K,V> withPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) Deprecated.useProducerRecordsto set publish timestamp.A function to provide timestamp for records being published.
NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is processing messages from the past, they might be deleted immediately by Kafka after being published if the timestamps are older than Kafka cluster'slog.retention.hours. -
withEOS
Provides exactly-once semantics while writing to Kafka, which enables applications with end-to-end exactly-once guarantees on top of exactly-once semantics within Beam pipelines. It ensures that records written to sink are committed on Kafka exactly once, even in the case of retries during pipeline execution even when some processing is retried. Retries typically occur when workers restart (as in failure recovery), or when the work is redistributed (as in an autoscaling event).Beam runners typically provide exactly-once semantics for results of a pipeline, but not for side effects from user code in transform. If a transform such as Kafka sink writes to an external system, those writes might occur more than once. When EOS is enabled here, the sink transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka (version 0.11+) to ensure a record is written only once. As the implementation relies on runners checkpoint semantics, not all the runners are compatible. The sink throws an exception during initialization if the runner is not explicitly allowed. The Dataflow, Flink, and Spark runners are compatible.
Note on performance: Exactly-once sink involves two shuffles of the records. In addition to cost of shuffling the records among workers, the records go through 2 serialization-deserialization cycles. Depending on volume and cost of serialization, the CPU cost might be noticeable. The CPU cost can be reduced by writing byte arrays (i.e. serializing them to byte before writing to Kafka sink).
- Parameters:
numShards- Sets sink parallelism. The state metadata stored on Kafka is stored across this many virtual partitions usingsinkGroupId. A good rule of thumb is to set this to be around number of partitions in Kafka topic.sinkGroupId- The group id used to store small amount of state as metadata on Kafka. It is similar to consumer group id used with aKafkaConsumer. Each job should use a unique group id so that restarts/updates of job preserve the state to ensure exactly-once semantics. The state is committed atomically with sink transactions on Kafka. SeeKafkaProducer.sendOffsetsToTransaction(Map, String)for more information. The sink performs multiple sanity checks during initialization to catch common mistakes so that it does not end up using state that does not seem to be written by the same job.
-
withConsumerFactoryFn
public KafkaIO.WriteRecords<K,V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) When exactly-once semantics are enabled (seewithEOS(int, String)), the sink needs to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer. Similar toKafkaIO.Read.withConsumerFactoryFn(SerializableFunction), a factory function can be supplied if required in a specific case. The default isKafkaConsumer. -
withBadRecordErrorHandler
public KafkaIO.WriteRecords<K,V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) -
expand
Description copied from class:PTransformOverride this method to specify how thisPTransformshould be expanded on the givenInputT.NOTE: This method should not be called directly. Instead apply the
PTransformshould be applied to theInputTusing theapplymethod.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expandin classPTransform<PCollection<ProducerRecord<K,V>>, PDone>
-
validate
Description copied from class:PTransformCalled before running the Pipeline to verify this transform is fully and correctly specified.By default, does nothing.
- Overrides:
validatein classPTransform<PCollection<ProducerRecord<K,V>>, PDone>
-
populateDisplayData
Description copied from class:PTransformRegister display data for the given transform or component.populateDisplayData(DisplayData.Builder)is invoked by Pipeline runners to collect display data viaDisplayData.from(HasDisplayData). Implementations may callsuper.populateDisplayData(builder)in order to register display data in the current namespace, but should otherwise usesubcomponent.populateDisplayData(builder)to use the namespace of the subcomponent.By default, does not register any display data. Implementors may override this method to provide their own display data.
- Specified by:
populateDisplayDatain interfaceHasDisplayData- Overrides:
populateDisplayDatain classPTransform<PCollection<ProducerRecord<K,V>>, PDone> - Parameters:
builder- The builder to populate with display data.- See Also:
-