Class KafkaIO.Read<K,V>
- All Implemented Interfaces:
Serializable,HasDisplayData
- Enclosing class:
KafkaIO
PTransform to read from Kafka topics. See KafkaIO for more information on
usage and configuration.- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classExposesKafkaIO.TypedWithoutMetadataas an external transform for cross-language usage.static interface -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Class<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read> static final org.apache.beam.sdk.runners.PTransformOverrideAPTransformOverridefor runners to swapKafkaIO.Read.ReadFromKafkaViaSDFto legacy Kafka read if runners doesn't have a good support on executing unbounded Splittable DoFn.Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionFinalized offsets are committed to Kafka.Override this method to specify how thisPTransformshould be expanded on the givenInputT.abstract @Nullable ErrorHandler<BadRecord, ?> abstract @Nullable CheckStopReadingFnabstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> abstract longabstract @Nullable DeserializerProvider<K> abstract longabstract intabstract TimestampPolicyFactory<K, V> abstract @Nullable List<TopicPartition> abstract @Nullable DeserializerProvider<V> abstract @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> abstract booleanabstract booleanabstract booleanabstract booleanvoidpopulateDisplayData(DisplayData.Builder builder) Register display data for the given transform or component.updateConsumerProperties(Map<String, Object> configUpdates) Deprecated.as of version 2.13.withAllowDuplicates(Boolean allowDuplicates) Hints to the runner that it can relax exactly-once processing guarantees, allowing duplicates in at-least-once processing mode of Kafka inputs.withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) withBootstrapServers(String bootstrapServers) Sets the bootstrap servers for the Kafka consumer.withCheckStopReadingFn(CheckStopReadingFn checkStopReadingFn) A customCheckStopReadingFnthat determines whether theReadFromKafkaDoFnshould stop reading from the givenTopicPartition.withCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) A customSerializableFunctionthat determines whether theReadFromKafkaDoFnshould stop reading from the givenTopicPartition.withConsumerConfigUpdates(Map<String, Object> configUpdates) Update configuration for the backend main consumer.withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) A factory to create KafkaConsumerfrom consumer configuration.withConsumerPollingTimeout(long duration) Sets the timeout time in seconds for Kafka consumer polling request in theReadFromKafkaDoFn.withCreateTime(Duration maxDelay) Sets the timestamps policy based onKafkaTimestampType.CREATE_TIMEtimestamp of the records.withDynamicRead(Duration duration) Configure the KafkaIO to useWatchForKafkaTopicPartitionsto detect and emit any new availableTopicPartitionforReadFromKafkaDoFnto consume during pipeline execution time.Creates and sets the Application Default Credentials for a Kafka consumer.withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) Sets a KafkaDeserializerto interpret key bytes read from Kafka.withKeyDeserializer(DeserializerProvider<K> deserializerProvider) withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) Sets a KafkaDeserializerfor interpreting key bytes read from Kafka along with aCoderfor helping the Beam runner materialize key objects at runtime if necessary.withKeyDeserializerProviderAndCoder(DeserializerProvider<K> deserializerProvider, Coder<K> keyCoder) withMaxNumRecords(long maxNumRecords) Similar toRead.Unbounded.withMaxNumRecords(long).withMaxReadTime(Duration maxReadTime) Similar toRead.Unbounded.withMaxReadTime(Duration).withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) Set additional configuration for the backend offset consumer.withOffsetDeduplication(Boolean offsetDeduplication) Hints to the runner to optimize the redistribute by minimizing the amount of data required for persistence as part of the redistribute operation.PTransform<PBegin, PCollection<KV<K, V>>> Returns aPTransformfor PCollection ofKV, dropping Kafka metatdata.Sets "isolation_level" to "read_committed" in Kafka consumer configuration.Sets redistribute transform that hints to the runner to try to redistribute the work evenly.withRedistributeByRecordKey(Boolean redistributeByRecordKey) withRedistributeNumKeys(int redistributeNumKeys) Redistributes Kafka messages into a distinct number of keys for processing in subsequent steps.withStartReadTime(Instant startReadTime) Use timestamp to set up start offset.withStopReadTime(Instant stopReadTime) Use timestamp to set up stop offset.withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) Deprecated.as of version 2.4.withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) Deprecated.as of version 2.4.withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) Provide customTimestampPolicyFactoryto set event times and watermark for each partition.Sets the topic to read from.withTopicPartitions(List<TopicPartition> topicPartitions) Sets a list of partitions to read from.withTopicPattern(String topicPattern) Internally sets aPatternof topics to read from.withTopics(List<String> topics) Sets a list of topics to read from.withTopicVerificationLogging(boolean logTopicVerification) withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) Sets a KafkaDeserializerto interpret value bytes read from Kafka.withValueDeserializer(DeserializerProvider<V> deserializerProvider) withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) Sets a KafkaDeserializerfor interpreting value bytes read from Kafka along with aCoderfor helping the Beam runner materialize value objects at runtime if necessary.withValueDeserializerProviderAndCoder(DeserializerProvider<V> deserializerProvider, Coder<V> valueCoder) withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) Deprecated.as of version 2.4.withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) Deprecated.as of version 2.4.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setDisplayData, setResourceHints, toString, validate, validate
-
Field Details
-
AUTOVALUE_CLASS
-
KAFKA_READ_OVERRIDE
APTransformOverridefor runners to swapKafkaIO.Read.ReadFromKafkaViaSDFto legacy Kafka read if runners doesn't have a good support on executing unbounded Splittable DoFn.
-
-
Constructor Details
-
Read
public Read()
-
-
Method Details
-
getConsumerConfig
-
getTopics
-
getTopicPartitions
-
getTopicPattern
-
getKeyCoder
-
getValueCoder
-
getConsumerFactoryFn
-
getWatermarkFn
-
getMaxNumRecords
-
getMaxReadTime
-
getStartReadTime
-
getStopReadTime
-
isCommitOffsetsInFinalizeEnabled
-
isDynamicRead
-
isRedistributed
-
isAllowDuplicates
-
getRedistributeNumKeys
-
getOffsetDeduplication
-
getRedistributeByRecordKey
-
getWatchTopicPartitionDuration
-
getTimestampPolicyFactory
-
getOffsetConsumerConfig
-
getKeyDeserializerProvider
-
getValueDeserializerProvider
-
getCheckStopReadingFn
-
getBadRecordErrorHandler
-
getConsumerPollingTimeout
-
getLogTopicVerification
-
withBootstrapServers
Sets the bootstrap servers for the Kafka consumer. -
withTopic
Sets the topic to read from.See
UnboundedSource.split(int, PipelineOptions)for description of how the partitions are distributed among the splits. -
withTopics
Sets a list of topics to read from. All the partitions from each of the topics are read.See
UnboundedSource.split(int, PipelineOptions)for description of how the partitions are distributed among the splits. -
withTopicPartitions
Sets a list of partitions to read from. This allows reading only a subset of partitions for one or more topics when (if ever) needed.See
UnboundedSource.split(int, PipelineOptions)for description of how the partitions are distributed among the splits. -
withRedistribute
Sets redistribute transform that hints to the runner to try to redistribute the work evenly.- Returns:
- an updated
KafkaIO.Readtransform.
-
withAllowDuplicates
Hints to the runner that it can relax exactly-once processing guarantees, allowing duplicates in at-least-once processing mode of Kafka inputs.Must be used with
.invalid reference
KafkaIO#withRedistribute()Not compatible with
.invalid reference
KafkaIO#withOffsetDeduplication()- Parameters:
allowDuplicates- specifies whether to allow duplicates.- Returns:
- an updated
KafkaIO.Readtransform.
-
withRedistributeNumKeys
Redistributes Kafka messages into a distinct number of keys for processing in subsequent steps.If unset, defaults to
KafkaIO.DEFAULT_REDISTRIBUTE_NUM_KEYS.Use zero to disable bucketing into a distinct number of keys.
Must be used with
withRedistribute().- Parameters:
redistributeNumKeys- specifies the total number of keys for redistributing inputs.- Returns:
- an updated
KafkaIO.Readtransform.
-
withOffsetDeduplication
Hints to the runner to optimize the redistribute by minimizing the amount of data required for persistence as part of the redistribute operation.Must be used with
.invalid reference
KafkaIO#withRedistribute()Not compatible with
.invalid reference
KafkaIO#withAllowDuplicates()- Parameters:
offsetDeduplication- specifies whether to enable offset-based deduplication.- Returns:
- an updated
KafkaIO.Readtransform.
-
withRedistributeByRecordKey
-
withTopicPattern
Internally sets aPatternof topics to read from. All the partitions from each of the matching topics are read.See
UnboundedSource.split(int, PipelineOptions)for description of how the partitions are distributed among the splits. -
withKeyDeserializer
Sets a KafkaDeserializerto interpret key bytes read from Kafka.In addition, Beam also needs a
Coderto serialize and deserialize key objects at runtime. KafkaIO tries to infer a coder for the key based on theDeserializerclass, however in case that fails, you can usewithKeyDeserializerAndCoder(Class, Coder)to provide the key coder explicitly. -
withKeyDeserializerAndCoder
public KafkaIO.Read<K,V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) Sets a KafkaDeserializerfor interpreting key bytes read from Kafka along with aCoderfor helping the Beam runner materialize key objects at runtime if necessary.Use this method only if your pipeline doesn't work with plain
withKeyDeserializer(Class). -
withKeyDeserializer
-
withKeyDeserializerProviderAndCoder
public KafkaIO.Read<K,V> withKeyDeserializerProviderAndCoder(DeserializerProvider<K> deserializerProvider, Coder<K> keyCoder) -
withValueDeserializer
Sets a KafkaDeserializerto interpret value bytes read from Kafka.In addition, Beam also needs a
Coderto serialize and deserialize value objects at runtime. KafkaIO tries to infer a coder for the value based on theDeserializerclass, however in case that fails, you can usewithValueDeserializerAndCoder(Class, Coder)to provide the value coder explicitly. -
withValueDeserializerAndCoder
public KafkaIO.Read<K,V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) Sets a KafkaDeserializerfor interpreting value bytes read from Kafka along with aCoderfor helping the Beam runner materialize value objects at runtime if necessary.Use this method only if your pipeline doesn't work with plain
withValueDeserializer(Class). -
withValueDeserializer
-
withValueDeserializerProviderAndCoder
public KafkaIO.Read<K,V> withValueDeserializerProviderAndCoder(DeserializerProvider<V> deserializerProvider, Coder<V> valueCoder) -
withConsumerFactoryFn
public KafkaIO.Read<K,V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) A factory to create KafkaConsumerfrom consumer configuration. This is useful for supporting another version of Kafka consumer. Default isKafkaConsumer. -
updateConsumerProperties
Deprecated.as of version 2.13. UsewithConsumerConfigUpdates(Map)insteadUpdate consumer configuration with new properties. -
withMaxNumRecords
Similar toRead.Unbounded.withMaxNumRecords(long). Mainly used for tests and demo applications. -
withStartReadTime
Use timestamp to set up start offset. It is only supported by Kafka Client 0.10.1.0 onwards and the message format version after 0.10.0.Note that this take priority over start offset configuration
ConsumerConfig.AUTO_OFFSET_RESET_CONFIGand any auto committed offsets.This results in hard failures in either of the following two cases : 1. If one or more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.
-
withStopReadTime
Use timestamp to set up stop offset. It is only supported by Kafka Client 0.10.1.0 onwards and the message format version after 0.10.0.This results in hard failures in either of the following two cases : 1. If one or more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.
-
withMaxReadTime
Similar toRead.Unbounded.withMaxReadTime(Duration). Mainly used for tests and demo applications. -
withLogAppendTime
SetsTimestampPolicytoTimestampPolicyFactory.LogAppendTimePolicy. The policy assigns Kafka's log append time (server side ingestion time) to each record. The watermark for each Kafka partition is the timestamp of the last record read. If a partition is idle, the watermark advances to couple of seconds behind wall time. Every record consumed from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'.In Kafka, log append time needs to be enabled for each topic, and all the subsequent records wil have their timestamp set to log append time. If a record does not have its timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous record timestamp or latest watermark, whichever is larger.
The watermark for the entire source is the oldest of each partition's watermark. If one of the readers falls behind possibly due to uneven distribution of records among Kafka partitions, it ends up holding the watermark for the entire source.
-
withProcessingTime
SetsTimestampPolicytoTimestampPolicyFactory.ProcessingTimePolicy. This is the default timestamp policy. It assigns processing time to each record. Specifically, this is the timestamp when the record becomes 'current' in the reader. The watermark aways advances to current time. If server side time (log append time) is enabled in Kafka,withLogAppendTime()is recommended over this. -
withCreateTime
Sets the timestamps policy based onKafkaTimestampType.CREATE_TIMEtimestamp of the records. It is an error if a record's timestamp type is notKafkaTimestampType.CREATE_TIME. The timestamps within a partition are expected to be roughly monotonically increasing with a cap on out of order delays (e.g. 'max delay' of 1 minute). The watermark at any time is '(Min(now(), Max(event timestamp so far)) - max delay)'. However, watermark is never set in future and capped to 'now - max delay'. In addition, watermark advanced to 'now - max delay' when a partition is idle.- Parameters:
maxDelay- For any record in the Kafka partition, the timestamp of any subsequent record is expected to be aftercurrent record timestamp - maxDelay.
-
withTimestampPolicyFactory
public KafkaIO.Read<K,V> withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) Provide customTimestampPolicyFactoryto set event times and watermark for each partition.TimestampPolicyFactory.createTimestampPolicy(TopicPartition, Optional)is invoked for each partition when the reader starts.- See Also:
-
withTimestampFn2
@Deprecated public KafkaIO.Read<K,V> withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) Deprecated.as of version 2.4. UsewithTimestampPolicyFactory(TimestampPolicyFactory)instead.A function to assign a timestamp to a record. Default is processing timestamp. -
withWatermarkFn2
@Deprecated public KafkaIO.Read<K,V> withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) Deprecated.as of version 2.4. UsewithTimestampPolicyFactory(TimestampPolicyFactory)instead.A function to calculate watermark after a record. Default is last record timestamp.- See Also:
-
withTimestampFn
@Deprecated public KafkaIO.Read<K,V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) Deprecated.as of version 2.4. UsewithTimestampPolicyFactory(TimestampPolicyFactory)instead.A function to assign a timestamp to a record. Default is processing timestamp. -
withWatermarkFn
@Deprecated public KafkaIO.Read<K,V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) Deprecated.as of version 2.4. UsewithTimestampPolicyFactory(TimestampPolicyFactory)instead.A function to calculate watermark after a record. Default is last record timestamp.- See Also:
-
withReadCommitted
Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is ensures that the consumer does not read uncommitted messages. Kafka version 0.11 introduced transactional writes. Applications requiring end-to-end exactly-once semantics should only read committed messages. See JavaDoc forKafkaConsumerfor more description. -
commitOffsetsInFinalize
Finalized offsets are committed to Kafka. SeeUnboundedSource.CheckpointMark.finalizeCheckpoint(). It helps with minimizing gaps or duplicate processing of records while restarting a pipeline from scratch. But it does not provide hard processing guarantees. There could be a short delay to commit afterUnboundedSource.CheckpointMark.finalizeCheckpoint()is invoked, as reader might be blocked on reading from Kafka. Note that it is independent of 'AUTO_COMMIT' Kafka consumer configuration. Usually either this or AUTO_COMMIT in Kafka consumer is enabled, but not both. -
withDynamicRead
Configure the KafkaIO to useWatchForKafkaTopicPartitionsto detect and emit any new availableTopicPartitionforReadFromKafkaDoFnto consume during pipeline execution time. The KafkaIO will regularly check the availability based on the given duration. If the duration is not specified asnull, the default duration is 1 hour. -
withOffsetConsumerConfigOverrides
Set additional configuration for the backend offset consumer. It may be required for a secured Kafka cluster, especially when you see similar WARN log message 'exception while fetching latest offset for partition {}. will be retried'.In
KafkaIO.read(), there're two consumers running in the backend actually:
1. the main consumer, which reads data from kafka;
2. the secondary offset consumer, which is used to estimate backlog, by fetching latest offset;
By default, offset consumer inherits the configuration from main consumer, with an auto-generated
ConsumerConfig.GROUP_ID_CONFIG. This may not work in a secured Kafka which requires more configurations. -
withConsumerConfigUpdates
Update configuration for the backend main consumer. Note that the default consumer properties will not be completely overridden. This method only updates the value which has the same key.In
KafkaIO.read(), there're two consumers running in the backend actually:
1. the main consumer, which reads data from kafka;
2. the secondary offset consumer, which is used to estimate backlog, by fetching latest offset;
By default, main consumer uses the configuration from
KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES. -
withCheckStopReadingFn
A customCheckStopReadingFnthat determines whether theReadFromKafkaDoFnshould stop reading from the givenTopicPartition. -
withCheckStopReadingFn
public KafkaIO.Read<K,V> withCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) A customSerializableFunctionthat determines whether theReadFromKafkaDoFnshould stop reading from the givenTopicPartition. -
withBadRecordErrorHandler
-
withConsumerPollingTimeout
Sets the timeout time in seconds for Kafka consumer polling request in theReadFromKafkaDoFn. A lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching any records. The default is 2 seconds. -
withGCPApplicationDefaultCredentials
Creates and sets the Application Default Credentials for a Kafka consumer. This allows the consumer to be authenticated with a Google Kafka Server using OAuth. -
withTopicVerificationLogging
-
withoutMetadata
Returns aPTransformfor PCollection ofKV, dropping Kafka metatdata. -
externalWithMetadata
-
expand
Description copied from class:PTransformOverride this method to specify how thisPTransformshould be expanded on the givenInputT.NOTE: This method should not be called directly. Instead apply the
PTransformshould be applied to theInputTusing theapplymethod.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expandin classPTransform<PBegin,PCollection<KafkaRecord<K, V>>>
-
populateDisplayData
Description copied from class:PTransformRegister display data for the given transform or component.populateDisplayData(DisplayData.Builder)is invoked by Pipeline runners to collect display data viaDisplayData.from(HasDisplayData). Implementations may callsuper.populateDisplayData(builder)in order to register display data in the current namespace, but should otherwise usesubcomponent.populateDisplayData(builder)to use the namespace of the subcomponent.By default, does not register any display data. Implementors may override this method to provide their own display data.
- Specified by:
populateDisplayDatain interfaceHasDisplayData- Overrides:
populateDisplayDatain classPTransform<PBegin,PCollection<KafkaRecord<K, V>>> - Parameters:
builder- The builder to populate with display data.- See Also:
-