Class KafkaIO.Read<K,V>

java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
org.apache.beam.sdk.io.kafka.KafkaIO.Read<K,V>
All Implemented Interfaces:
Serializable, HasDisplayData
Enclosing class:
KafkaIO

public abstract static class KafkaIO.Read<K,V> extends PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
A PTransform to read from Kafka topics. See KafkaIO for more information on usage and configuration.
See Also:
  • Field Details

    • AUTOVALUE_CLASS

      public static final Class<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read> AUTOVALUE_CLASS
    • KAFKA_READ_OVERRIDE

      @Internal public static final org.apache.beam.sdk.runners.PTransformOverride KAFKA_READ_OVERRIDE
      A PTransformOverride for runners to swap KafkaIO.Read.ReadFromKafkaViaSDF to legacy Kafka read if runners doesn't have a good support on executing unbounded Splittable DoFn.
  • Constructor Details

    • Read

      public Read()
  • Method Details

    • getConsumerConfig

      @Pure public abstract Map<String,Object> getConsumerConfig()
    • getTopics

      @Pure public abstract @Nullable List<String> getTopics()
    • getTopicPartitions

      @Pure public abstract @Nullable List<TopicPartition> getTopicPartitions()
    • getTopicPattern

      @Pure public abstract @Nullable Pattern getTopicPattern()
    • getKeyCoder

      @Pure public abstract @Nullable Coder<K> getKeyCoder()
    • getValueCoder

      @Pure public abstract @Nullable Coder<V> getValueCoder()
    • getConsumerFactoryFn

      @Pure public abstract SerializableFunction<Map<String,Object>,Consumer<byte[],byte[]>> getConsumerFactoryFn()
    • getWatermarkFn

      @Pure public abstract @Nullable SerializableFunction<KafkaRecord<K,V>,Instant> getWatermarkFn()
    • getMaxNumRecords

      @Pure public abstract long getMaxNumRecords()
    • getMaxReadTime

      @Pure public abstract @Nullable Duration getMaxReadTime()
    • getStartReadTime

      @Pure public abstract @Nullable Instant getStartReadTime()
    • getStopReadTime

      @Pure public abstract @Nullable Instant getStopReadTime()
    • isCommitOffsetsInFinalizeEnabled

      @Pure public abstract boolean isCommitOffsetsInFinalizeEnabled()
    • isDynamicRead

      @Pure public abstract boolean isDynamicRead()
    • isRedistributed

      @Pure public abstract boolean isRedistributed()
    • isAllowDuplicates

      @Pure public abstract boolean isAllowDuplicates()
    • getRedistributeNumKeys

      @Pure public abstract int getRedistributeNumKeys()
    • getOffsetDeduplication

      @Pure public abstract @Nullable Boolean getOffsetDeduplication()
    • getWatchTopicPartitionDuration

      @Pure public abstract @Nullable Duration getWatchTopicPartitionDuration()
    • getTimestampPolicyFactory

      @Pure public abstract TimestampPolicyFactory<K,V> getTimestampPolicyFactory()
    • getOffsetConsumerConfig

      @Pure public abstract @Nullable Map<String,Object> getOffsetConsumerConfig()
    • getKeyDeserializerProvider

      @Pure public abstract @Nullable DeserializerProvider<K> getKeyDeserializerProvider()
    • getValueDeserializerProvider

      @Pure public abstract @Nullable DeserializerProvider<V> getValueDeserializerProvider()
    • getCheckStopReadingFn

      @Pure public abstract @Nullable CheckStopReadingFn getCheckStopReadingFn()
    • getBadRecordErrorHandler

      @Pure public abstract @Nullable ErrorHandler<BadRecord,?> getBadRecordErrorHandler()
    • getConsumerPollingTimeout

      @Pure public abstract long getConsumerPollingTimeout()
    • getLogTopicVerification

      @Pure public abstract @Nullable Boolean getLogTopicVerification()
    • withBootstrapServers

      public KafkaIO.Read<K,V> withBootstrapServers(String bootstrapServers)
      Sets the bootstrap servers for the Kafka consumer.
    • withTopic

      public KafkaIO.Read<K,V> withTopic(String topic)
      Sets the topic to read from.

      See UnboundedSource.split(int, PipelineOptions) for description of how the partitions are distributed among the splits.

    • withTopics

      public KafkaIO.Read<K,V> withTopics(List<String> topics)
      Sets a list of topics to read from. All the partitions from each of the topics are read.

      See UnboundedSource.split(int, PipelineOptions) for description of how the partitions are distributed among the splits.

    • withTopicPartitions

      public KafkaIO.Read<K,V> withTopicPartitions(List<TopicPartition> topicPartitions)
      Sets a list of partitions to read from. This allows reading only a subset of partitions for one or more topics when (if ever) needed.

      See UnboundedSource.split(int, PipelineOptions) for description of how the partitions are distributed among the splits.

    • withRedistribute

      public KafkaIO.Read<K,V> withRedistribute()
      Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
    • withAllowDuplicates

      public KafkaIO.Read<K,V> withAllowDuplicates(Boolean allowDuplicates)
    • withRedistributeNumKeys

      public KafkaIO.Read<K,V> withRedistributeNumKeys(int redistributeNumKeys)
    • withOffsetDeduplication

      public KafkaIO.Read<K,V> withOffsetDeduplication(Boolean offsetDeduplication)
    • withTopicPattern

      public KafkaIO.Read<K,V> withTopicPattern(String topicPattern)
      Internally sets a Pattern of topics to read from. All the partitions from each of the matching topics are read.

      See UnboundedSource.split(int, PipelineOptions) for description of how the partitions are distributed among the splits.

    • withKeyDeserializer

      public KafkaIO.Read<K,V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer)
      Sets a Kafka Deserializer to interpret key bytes read from Kafka.

      In addition, Beam also needs a Coder to serialize and deserialize key objects at runtime. KafkaIO tries to infer a coder for the key based on the Deserializer class, however in case that fails, you can use withKeyDeserializerAndCoder(Class, Coder) to provide the key coder explicitly.

    • withKeyDeserializerAndCoder

      public KafkaIO.Read<K,V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder)
      Sets a Kafka Deserializer for interpreting key bytes read from Kafka along with a Coder for helping the Beam runner materialize key objects at runtime if necessary.

      Use this method only if your pipeline doesn't work with plain withKeyDeserializer(Class).

    • withKeyDeserializer

      public KafkaIO.Read<K,V> withKeyDeserializer(DeserializerProvider<K> deserializerProvider)
    • withKeyDeserializerProviderAndCoder

      public KafkaIO.Read<K,V> withKeyDeserializerProviderAndCoder(DeserializerProvider<K> deserializerProvider, Coder<K> keyCoder)
    • withValueDeserializer

      public KafkaIO.Read<K,V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer)
      Sets a Kafka Deserializer to interpret value bytes read from Kafka.

      In addition, Beam also needs a Coder to serialize and deserialize value objects at runtime. KafkaIO tries to infer a coder for the value based on the Deserializer class, however in case that fails, you can use withValueDeserializerAndCoder(Class, Coder) to provide the value coder explicitly.

    • withValueDeserializerAndCoder

      public KafkaIO.Read<K,V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder)
      Sets a Kafka Deserializer for interpreting value bytes read from Kafka along with a Coder for helping the Beam runner materialize value objects at runtime if necessary.

      Use this method only if your pipeline doesn't work with plain withValueDeserializer(Class).

    • withValueDeserializer

      public KafkaIO.Read<K,V> withValueDeserializer(DeserializerProvider<V> deserializerProvider)
    • withValueDeserializerProviderAndCoder

      public KafkaIO.Read<K,V> withValueDeserializerProviderAndCoder(DeserializerProvider<V> deserializerProvider, Coder<V> valueCoder)
    • withConsumerFactoryFn

      public KafkaIO.Read<K,V> withConsumerFactoryFn(SerializableFunction<Map<String,Object>,Consumer<byte[],byte[]>> consumerFactoryFn)
      A factory to create Kafka Consumer from consumer configuration. This is useful for supporting another version of Kafka consumer. Default is KafkaConsumer.
    • updateConsumerProperties

      @Deprecated public KafkaIO.Read<K,V> updateConsumerProperties(Map<String,Object> configUpdates)
      Deprecated.
      as of version 2.13. Use withConsumerConfigUpdates(Map) instead
      Update consumer configuration with new properties.
    • withMaxNumRecords

      public KafkaIO.Read<K,V> withMaxNumRecords(long maxNumRecords)
      Similar to Read.Unbounded.withMaxNumRecords(long). Mainly used for tests and demo applications.
    • withStartReadTime

      public KafkaIO.Read<K,V> withStartReadTime(Instant startReadTime)
      Use timestamp to set up start offset. It is only supported by Kafka Client 0.10.1.0 onwards and the message format version after 0.10.0.

      Note that this take priority over start offset configuration ConsumerConfig.AUTO_OFFSET_RESET_CONFIG and any auto committed offsets.

      This results in hard failures in either of the following two cases : 1. If one or more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.

    • withStopReadTime

      public KafkaIO.Read<K,V> withStopReadTime(Instant stopReadTime)
      Use timestamp to set up stop offset. It is only supported by Kafka Client 0.10.1.0 onwards and the message format version after 0.10.0.

      This results in hard failures in either of the following two cases : 1. If one or more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.

    • withMaxReadTime

      public KafkaIO.Read<K,V> withMaxReadTime(Duration maxReadTime)
      Similar to Read.Unbounded.withMaxReadTime(Duration). Mainly used for tests and demo applications.
    • withLogAppendTime

      public KafkaIO.Read<K,V> withLogAppendTime()
      Sets TimestampPolicy to TimestampPolicyFactory.LogAppendTimePolicy. The policy assigns Kafka's log append time (server side ingestion time) to each record. The watermark for each Kafka partition is the timestamp of the last record read. If a partition is idle, the watermark advances to couple of seconds behind wall time. Every record consumed from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'.

      In Kafka, log append time needs to be enabled for each topic, and all the subsequent records wil have their timestamp set to log append time. If a record does not have its timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous record timestamp or latest watermark, whichever is larger.

      The watermark for the entire source is the oldest of each partition's watermark. If one of the readers falls behind possibly due to uneven distribution of records among Kafka partitions, it ends up holding the watermark for the entire source.

    • withProcessingTime

      public KafkaIO.Read<K,V> withProcessingTime()
      Sets TimestampPolicy to TimestampPolicyFactory.ProcessingTimePolicy. This is the default timestamp policy. It assigns processing time to each record. Specifically, this is the timestamp when the record becomes 'current' in the reader. The watermark aways advances to current time. If server side time (log append time) is enabled in Kafka, withLogAppendTime() is recommended over this.
    • withCreateTime

      public KafkaIO.Read<K,V> withCreateTime(Duration maxDelay)
      Sets the timestamps policy based on KafkaTimestampType.CREATE_TIME timestamp of the records. It is an error if a record's timestamp type is not KafkaTimestampType.CREATE_TIME. The timestamps within a partition are expected to be roughly monotonically increasing with a cap on out of order delays (e.g. 'max delay' of 1 minute). The watermark at any time is '(Min(now(), Max(event timestamp so far)) - max delay)'. However, watermark is never set in future and capped to 'now - max delay'. In addition, watermark advanced to 'now - max delay' when a partition is idle.
      Parameters:
      maxDelay - For any record in the Kafka partition, the timestamp of any subsequent record is expected to be after current record timestamp - maxDelay.
    • withTimestampPolicyFactory

      public KafkaIO.Read<K,V> withTimestampPolicyFactory(TimestampPolicyFactory<K,V> timestampPolicyFactory)
      Provide custom TimestampPolicyFactory to set event times and watermark for each partition. TimestampPolicyFactory.createTimestampPolicy(TopicPartition, Optional) is invoked for each partition when the reader starts.
      See Also:
    • withTimestampFn2

      @Deprecated public KafkaIO.Read<K,V> withTimestampFn2(SerializableFunction<KafkaRecord<K,V>,Instant> timestampFn)
      Deprecated.
      A function to assign a timestamp to a record. Default is processing timestamp.
    • withWatermarkFn2

      @Deprecated public KafkaIO.Read<K,V> withWatermarkFn2(SerializableFunction<KafkaRecord<K,V>,Instant> watermarkFn)
      Deprecated.
      A function to calculate watermark after a record. Default is last record timestamp.
      See Also:
    • withTimestampFn

      @Deprecated public KafkaIO.Read<K,V> withTimestampFn(SerializableFunction<KV<K,V>,Instant> timestampFn)
      Deprecated.
      A function to assign a timestamp to a record. Default is processing timestamp.
    • withWatermarkFn

      @Deprecated public KafkaIO.Read<K,V> withWatermarkFn(SerializableFunction<KV<K,V>,Instant> watermarkFn)
      Deprecated.
      A function to calculate watermark after a record. Default is last record timestamp.
      See Also:
    • withReadCommitted

      public KafkaIO.Read<K,V> withReadCommitted()
      Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is ensures that the consumer does not read uncommitted messages. Kafka version 0.11 introduced transactional writes. Applications requiring end-to-end exactly-once semantics should only read committed messages. See JavaDoc for KafkaConsumer for more description.
    • commitOffsetsInFinalize

      public KafkaIO.Read<K,V> commitOffsetsInFinalize()
      Finalized offsets are committed to Kafka. See UnboundedSource.CheckpointMark.finalizeCheckpoint(). It helps with minimizing gaps or duplicate processing of records while restarting a pipeline from scratch. But it does not provide hard processing guarantees. There could be a short delay to commit after UnboundedSource.CheckpointMark.finalizeCheckpoint() is invoked, as reader might be blocked on reading from Kafka. Note that it is independent of 'AUTO_COMMIT' Kafka consumer configuration. Usually either this or AUTO_COMMIT in Kafka consumer is enabled, but not both.
    • withDynamicRead

      public KafkaIO.Read<K,V> withDynamicRead(Duration duration)
      Configure the KafkaIO to use WatchForKafkaTopicPartitions to detect and emit any new available TopicPartition for ReadFromKafkaDoFn to consume during pipeline execution time. The KafkaIO will regularly check the availability based on the given duration. If the duration is not specified as null, the default duration is 1 hour.
    • withOffsetConsumerConfigOverrides

      public KafkaIO.Read<K,V> withOffsetConsumerConfigOverrides(Map<String,Object> offsetConsumerConfig)
      Set additional configuration for the backend offset consumer. It may be required for a secured Kafka cluster, especially when you see similar WARN log message 'exception while fetching latest offset for partition {}. will be retried'.

      In KafkaIO.read(), there're two consumers running in the backend actually:
      1. the main consumer, which reads data from kafka;
      2. the secondary offset consumer, which is used to estimate backlog, by fetching latest offset;

      By default, offset consumer inherits the configuration from main consumer, with an auto-generated ConsumerConfig.GROUP_ID_CONFIG. This may not work in a secured Kafka which requires more configurations.

    • withConsumerConfigUpdates

      public KafkaIO.Read<K,V> withConsumerConfigUpdates(Map<String,Object> configUpdates)
      Update configuration for the backend main consumer. Note that the default consumer properties will not be completely overridden. This method only updates the value which has the same key.

      In KafkaIO.read(), there're two consumers running in the backend actually:
      1. the main consumer, which reads data from kafka;
      2. the secondary offset consumer, which is used to estimate backlog, by fetching latest offset;

      By default, main consumer uses the configuration from KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES.

    • withCheckStopReadingFn

      public KafkaIO.Read<K,V> withCheckStopReadingFn(CheckStopReadingFn checkStopReadingFn)
      A custom CheckStopReadingFn that determines whether the ReadFromKafkaDoFn should stop reading from the given TopicPartition.
    • withCheckStopReadingFn

      public KafkaIO.Read<K,V> withCheckStopReadingFn(SerializableFunction<TopicPartition,Boolean> checkStopReadingFn)
      A custom SerializableFunction that determines whether the ReadFromKafkaDoFn should stop reading from the given TopicPartition.
    • withBadRecordErrorHandler

      public KafkaIO.Read<K,V> withBadRecordErrorHandler(ErrorHandler<BadRecord,?> badRecordErrorHandler)
    • withConsumerPollingTimeout

      public KafkaIO.Read<K,V> withConsumerPollingTimeout(long duration)
      Sets the timeout time in seconds for Kafka consumer polling request in the ReadFromKafkaDoFn. A lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching any records. The default is 2 seconds.
    • withGCPApplicationDefaultCredentials

      public KafkaIO.Read<K,V> withGCPApplicationDefaultCredentials()
      Creates and sets the Application Default Credentials for a Kafka consumer. This allows the consumer to be authenticated with a Google Kafka Server using OAuth.
    • withTopicVerificationLogging

      public KafkaIO.Read<K,V> withTopicVerificationLogging(boolean logTopicVerification)
    • withoutMetadata

      public PTransform<PBegin,PCollection<KV<K,V>>> withoutMetadata()
      Returns a PTransform for PCollection of KV, dropping Kafka metatdata.
    • externalWithMetadata

      public PTransform<PBegin,PCollection<Row>> externalWithMetadata()
    • expand

      public PCollection<KafkaRecord<K,V>> expand(PBegin input)
      Description copied from class: PTransform
      Override this method to specify how this PTransform should be expanded on the given InputT.

      NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

      Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

      Specified by:
      expand in class PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
    • populateDisplayData

      public void populateDisplayData(DisplayData.Builder builder)
      Description copied from class: PTransform
      Register display data for the given transform or component.

      populateDisplayData(DisplayData.Builder) is invoked by Pipeline runners to collect display data via DisplayData.from(HasDisplayData). Implementations may call super.populateDisplayData(builder) in order to register display data in the current namespace, but should otherwise use subcomponent.populateDisplayData(builder) to use the namespace of the subcomponent.

      By default, does not register any display data. Implementors may override this method to provide their own display data.

      Specified by:
      populateDisplayData in interface HasDisplayData
      Overrides:
      populateDisplayData in class PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
      Parameters:
      builder - The builder to populate with display data.
      See Also: