Class KafkaIO.WriteRecords<K,V>

java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PCollection<ProducerRecord<K,V>>,PDone>
org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords<K,V>
All Implemented Interfaces:
Serializable, HasDisplayData
Enclosing class:
KafkaIO

public abstract static class KafkaIO.WriteRecords<K,V> extends PTransform<PCollection<ProducerRecord<K,V>>,PDone>
A PTransform to write to a Kafka topic with ProducerRecord's. See KafkaIO for more information on usage and configuration.
See Also:
  • Constructor Details

    • WriteRecords

      public WriteRecords()
  • Method Details

    • getTopic

      @Pure public abstract @Nullable String getTopic()
    • getProducerConfig

      @Pure public abstract Map<String,Object> getProducerConfig()
    • getProducerFactoryFn

      @Pure public abstract @Nullable SerializableFunction<Map<String,Object>,Producer<K,V>> getProducerFactoryFn()
    • getKeySerializer

      @Pure public abstract @Nullable Class<? extends Serializer<K>> getKeySerializer()
    • getValueSerializer

      @Pure public abstract @Nullable Class<? extends Serializer<V>> getValueSerializer()
    • getPublishTimestampFunction

      @Pure public abstract @Nullable KafkaPublishTimestampFunction<ProducerRecord<K,V>> getPublishTimestampFunction()
    • isEOS

      @Pure public abstract boolean isEOS()
    • getSinkGroupId

      @Pure public abstract @Nullable String getSinkGroupId()
    • getNumShards

      @Pure public abstract int getNumShards()
    • getConsumerFactoryFn

      @Pure public abstract @Nullable SerializableFunction<Map<String,Object>,? extends Consumer<?,?>> getConsumerFactoryFn()
    • getBadRecordRouter

      @Pure public abstract BadRecordRouter getBadRecordRouter()
    • getBadRecordErrorHandler

      @Pure public abstract ErrorHandler<BadRecord,?> getBadRecordErrorHandler()
    • withBootstrapServers

      public KafkaIO.WriteRecords<K,V> withBootstrapServers(String bootstrapServers)
      Returns a new KafkaIO.Write transform with Kafka producer pointing to bootstrapServers.
    • withTopic

      public KafkaIO.WriteRecords<K,V> withTopic(String topic)
      Sets the default Kafka topic to write to. Use ProducerRecords to set topic name per published record.
    • withKeySerializer

      public KafkaIO.WriteRecords<K,V> withKeySerializer(Class<? extends Serializer<K>> keySerializer)
      Sets a Serializer for serializing key (if any) to bytes.

      A key is optional while writing to Kafka. Note when a key is set, its hash is used to determine partition in Kafka (see ProducerRecord for more details).

    • withValueSerializer

      public KafkaIO.WriteRecords<K,V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer)
      Sets a Serializer for serializing value to bytes.
    • updateProducerProperties

      @Deprecated public KafkaIO.WriteRecords<K,V> updateProducerProperties(Map<String,Object> configUpdates)
      Deprecated.
      as of version 2.13. Use withProducerConfigUpdates(Map) instead.
      Adds the given producer properties, overriding old values of properties with the same key.
    • withProducerConfigUpdates

      public KafkaIO.WriteRecords<K,V> withProducerConfigUpdates(Map<String,Object> configUpdates)
      Update configuration for the producer. Note that the default producer properties will not be completely overridden. This method only updates the value which has the same key.

      By default, the producer uses the configuration from DEFAULT_PRODUCER_PROPERTIES.

    • withProducerFactoryFn

      public KafkaIO.WriteRecords<K,V> withProducerFactoryFn(SerializableFunction<Map<String,Object>,Producer<K,V>> producerFactoryFn)
      Sets a custom function to create Kafka producer. Primarily used for tests. Default is KafkaProducer
    • withInputTimestamp

      public KafkaIO.WriteRecords<K,V> withInputTimestamp()
      The timestamp for each record being published is set to timestamp of the element in the pipeline. This is equivalent to withPublishTimestampFunction((e, ts) -> ts).
      NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is processing messages from the past, they might be deleted immediately by Kafka after being published if the timestamps are older than Kafka cluster's log.retention.hours.
    • withPublishTimestampFunction

      @Deprecated public KafkaIO.WriteRecords<K,V> withPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K,V>> timestampFunction)
      Deprecated.
      use ProducerRecords to set publish timestamp.
      A function to provide timestamp for records being published.
      NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is processing messages from the past, they might be deleted immediately by Kafka after being published if the timestamps are older than Kafka cluster's log.retention.hours.
    • withEOS

      public KafkaIO.WriteRecords<K,V> withEOS(int numShards, String sinkGroupId)
      Provides exactly-once semantics while writing to Kafka, which enables applications with end-to-end exactly-once guarantees on top of exactly-once semantics within Beam pipelines. It ensures that records written to sink are committed on Kafka exactly once, even in the case of retries during pipeline execution even when some processing is retried. Retries typically occur when workers restart (as in failure recovery), or when the work is redistributed (as in an autoscaling event).

      Beam runners typically provide exactly-once semantics for results of a pipeline, but not for side effects from user code in transform. If a transform such as Kafka sink writes to an external system, those writes might occur more than once. When EOS is enabled here, the sink transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka (version 0.11+) to ensure a record is written only once. As the implementation relies on runners checkpoint semantics, not all the runners are compatible. The sink throws an exception during initialization if the runner is not explicitly allowed. The Dataflow, Flink, and Spark runners are compatible.

      Note on performance: Exactly-once sink involves two shuffles of the records. In addition to cost of shuffling the records among workers, the records go through 2 serialization-deserialization cycles. Depending on volume and cost of serialization, the CPU cost might be noticeable. The CPU cost can be reduced by writing byte arrays (i.e. serializing them to byte before writing to Kafka sink).

      Parameters:
      numShards - Sets sink parallelism. The state metadata stored on Kafka is stored across this many virtual partitions using sinkGroupId. A good rule of thumb is to set this to be around number of partitions in Kafka topic.
      sinkGroupId - The group id used to store small amount of state as metadata on Kafka. It is similar to consumer group id used with a KafkaConsumer. Each job should use a unique group id so that restarts/updates of job preserve the state to ensure exactly-once semantics. The state is committed atomically with sink transactions on Kafka. See KafkaProducer.sendOffsetsToTransaction(Map, String) for more information. The sink performs multiple sanity checks during initialization to catch common mistakes so that it does not end up using state that does not seem to be written by the same job.
    • withConsumerFactoryFn

      public KafkaIO.WriteRecords<K,V> withConsumerFactoryFn(SerializableFunction<Map<String,Object>,? extends Consumer<?,?>> consumerFactoryFn)
      When exactly-once semantics are enabled (see withEOS(int, String)), the sink needs to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer. Similar to KafkaIO.Read.withConsumerFactoryFn(SerializableFunction), a factory function can be supplied if required in a specific case. The default is KafkaConsumer.
    • withBadRecordErrorHandler

      public KafkaIO.WriteRecords<K,V> withBadRecordErrorHandler(ErrorHandler<BadRecord,?> badRecordErrorHandler)
    • expand

      public PDone expand(PCollection<ProducerRecord<K,V>> input)
      Description copied from class: PTransform
      Override this method to specify how this PTransform should be expanded on the given InputT.

      NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

      Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

      Specified by:
      expand in class PTransform<PCollection<ProducerRecord<K,V>>,PDone>
    • validate

      public void validate(@Nullable PipelineOptions options)
      Description copied from class: PTransform
      Called before running the Pipeline to verify this transform is fully and correctly specified.

      By default, does nothing.

      Overrides:
      validate in class PTransform<PCollection<ProducerRecord<K,V>>,PDone>
    • populateDisplayData

      public void populateDisplayData(DisplayData.Builder builder)
      Description copied from class: PTransform
      Register display data for the given transform or component.

      populateDisplayData(DisplayData.Builder) is invoked by Pipeline runners to collect display data via DisplayData.from(HasDisplayData). Implementations may call super.populateDisplayData(builder) in order to register display data in the current namespace, but should otherwise use subcomponent.populateDisplayData(builder) to use the namespace of the subcomponent.

      By default, does not register any display data. Implementors may override this method to provide their own display data.

      Specified by:
      populateDisplayData in interface HasDisplayData
      Overrides:
      populateDisplayData in class PTransform<PCollection<ProducerRecord<K,V>>,PDone>
      Parameters:
      builder - The builder to populate with display data.
      See Also: