Class KafkaIO.Read<K,V>
- All Implemented Interfaces:
Serializable
,HasDisplayData
- Enclosing class:
KafkaIO
PTransform
to read from Kafka topics. See KafkaIO
for more information on
usage and configuration.- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
ExposesKafkaIO.TypedWithoutMetadata
as an external transform for cross-language usage.static interface
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Class
<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read> static final org.apache.beam.sdk.runners.PTransformOverride
APTransformOverride
for runners to swapKafkaIO.Read.ReadFromKafkaViaSDF
to legacy Kafka read if runners doesn't have a good support on executing unbounded Splittable DoFn.Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionFinalized offsets are committed to Kafka.Override this method to specify how thisPTransform
should be expanded on the givenInputT
.abstract @Nullable ErrorHandler
<BadRecord, ?> abstract @Nullable CheckStopReadingFn
abstract SerializableFunction
<Map<String, Object>, Consumer<byte[], byte[]>> abstract long
abstract @Nullable DeserializerProvider
<K> abstract long
abstract int
abstract TimestampPolicyFactory
<K, V> abstract @Nullable List
<TopicPartition> abstract @Nullable DeserializerProvider
<V> abstract @Nullable SerializableFunction
<KafkaRecord<K, V>, Instant> abstract boolean
abstract boolean
abstract boolean
abstract boolean
void
populateDisplayData
(DisplayData.Builder builder) Register display data for the given transform or component.updateConsumerProperties
(Map<String, Object> configUpdates) Deprecated.as of version 2.13.withAllowDuplicates
(Boolean allowDuplicates) withBadRecordErrorHandler
(ErrorHandler<BadRecord, ?> badRecordErrorHandler) withBootstrapServers
(String bootstrapServers) Sets the bootstrap servers for the Kafka consumer.withCheckStopReadingFn
(CheckStopReadingFn checkStopReadingFn) A customCheckStopReadingFn
that determines whether theReadFromKafkaDoFn
should stop reading from the givenTopicPartition
.withCheckStopReadingFn
(SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) A customSerializableFunction
that determines whether theReadFromKafkaDoFn
should stop reading from the givenTopicPartition
.withConsumerConfigUpdates
(Map<String, Object> configUpdates) Update configuration for the backend main consumer.withConsumerFactoryFn
(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) A factory to create KafkaConsumer
from consumer configuration.withConsumerPollingTimeout
(long duration) Sets the timeout time in seconds for Kafka consumer polling request in theReadFromKafkaDoFn
.withCreateTime
(Duration maxDelay) Sets the timestamps policy based onKafkaTimestampType.CREATE_TIME
timestamp of the records.withDynamicRead
(Duration duration) Configure the KafkaIO to useWatchForKafkaTopicPartitions
to detect and emit any new availableTopicPartition
forReadFromKafkaDoFn
to consume during pipeline execution time.Creates and sets the Application Default Credentials for a Kafka consumer.withKeyDeserializer
(Class<? extends Deserializer<K>> keyDeserializer) Sets a KafkaDeserializer
to interpret key bytes read from Kafka.withKeyDeserializer
(DeserializerProvider<K> deserializerProvider) withKeyDeserializerAndCoder
(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) Sets a KafkaDeserializer
for interpreting key bytes read from Kafka along with aCoder
for helping the Beam runner materialize key objects at runtime if necessary.withKeyDeserializerProviderAndCoder
(DeserializerProvider<K> deserializerProvider, Coder<K> keyCoder) withMaxNumRecords
(long maxNumRecords) Similar toRead.Unbounded.withMaxNumRecords(long)
.withMaxReadTime
(Duration maxReadTime) Similar toRead.Unbounded.withMaxReadTime(Duration)
.withOffsetConsumerConfigOverrides
(Map<String, Object> offsetConsumerConfig) Set additional configuration for the backend offset consumer.withOffsetDeduplication
(Boolean offsetDeduplication) PTransform
<PBegin, PCollection<KV<K, V>>> Returns aPTransform
for PCollection ofKV
, dropping Kafka metatdata.Sets "isolation_level" to "read_committed" in Kafka consumer configuration.Sets redistribute transform that hints to the runner to try to redistribute the work evenly.withRedistributeNumKeys
(int redistributeNumKeys) withStartReadTime
(Instant startReadTime) Use timestamp to set up start offset.withStopReadTime
(Instant stopReadTime) Use timestamp to set up stop offset.withTimestampFn
(SerializableFunction<KV<K, V>, Instant> timestampFn) Deprecated.as of version 2.4.withTimestampFn2
(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) Deprecated.as of version 2.4.withTimestampPolicyFactory
(TimestampPolicyFactory<K, V> timestampPolicyFactory) Provide customTimestampPolicyFactory
to set event times and watermark for each partition.Sets the topic to read from.withTopicPartitions
(List<TopicPartition> topicPartitions) Sets a list of partitions to read from.withTopicPattern
(String topicPattern) Internally sets aPattern
of topics to read from.withTopics
(List<String> topics) Sets a list of topics to read from.withTopicVerificationLogging
(boolean logTopicVerification) withValueDeserializer
(Class<? extends Deserializer<V>> valueDeserializer) Sets a KafkaDeserializer
to interpret value bytes read from Kafka.withValueDeserializer
(DeserializerProvider<V> deserializerProvider) withValueDeserializerAndCoder
(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) Sets a KafkaDeserializer
for interpreting value bytes read from Kafka along with aCoder
for helping the Beam runner materialize value objects at runtime if necessary.withValueDeserializerProviderAndCoder
(DeserializerProvider<V> deserializerProvider, Coder<V> valueCoder) withWatermarkFn
(SerializableFunction<KV<K, V>, Instant> watermarkFn) Deprecated.as of version 2.4.withWatermarkFn2
(SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) Deprecated.as of version 2.4.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setDisplayData, setResourceHints, toString, validate, validate
-
Field Details
-
AUTOVALUE_CLASS
-
KAFKA_READ_OVERRIDE
APTransformOverride
for runners to swapKafkaIO.Read.ReadFromKafkaViaSDF
to legacy Kafka read if runners doesn't have a good support on executing unbounded Splittable DoFn.
-
-
Constructor Details
-
Read
public Read()
-
-
Method Details
-
getConsumerConfig
-
getTopics
-
getTopicPartitions
-
getTopicPattern
-
getKeyCoder
-
getValueCoder
-
getConsumerFactoryFn
-
getWatermarkFn
-
getMaxNumRecords
-
getMaxReadTime
-
getStartReadTime
-
getStopReadTime
-
isCommitOffsetsInFinalizeEnabled
-
isDynamicRead
-
isRedistributed
-
isAllowDuplicates
-
getRedistributeNumKeys
-
getOffsetDeduplication
-
getWatchTopicPartitionDuration
-
getTimestampPolicyFactory
-
getOffsetConsumerConfig
-
getKeyDeserializerProvider
-
getValueDeserializerProvider
-
getCheckStopReadingFn
-
getBadRecordErrorHandler
-
getConsumerPollingTimeout
-
getLogTopicVerification
-
withBootstrapServers
Sets the bootstrap servers for the Kafka consumer. -
withTopic
Sets the topic to read from.See
UnboundedSource.split(int, PipelineOptions)
for description of how the partitions are distributed among the splits. -
withTopics
Sets a list of topics to read from. All the partitions from each of the topics are read.See
UnboundedSource.split(int, PipelineOptions)
for description of how the partitions are distributed among the splits. -
withTopicPartitions
Sets a list of partitions to read from. This allows reading only a subset of partitions for one or more topics when (if ever) needed.See
UnboundedSource.split(int, PipelineOptions)
for description of how the partitions are distributed among the splits. -
withRedistribute
Sets redistribute transform that hints to the runner to try to redistribute the work evenly. -
withAllowDuplicates
-
withRedistributeNumKeys
-
withOffsetDeduplication
-
withTopicPattern
Internally sets aPattern
of topics to read from. All the partitions from each of the matching topics are read.See
UnboundedSource.split(int, PipelineOptions)
for description of how the partitions are distributed among the splits. -
withKeyDeserializer
Sets a KafkaDeserializer
to interpret key bytes read from Kafka.In addition, Beam also needs a
Coder
to serialize and deserialize key objects at runtime. KafkaIO tries to infer a coder for the key based on theDeserializer
class, however in case that fails, you can usewithKeyDeserializerAndCoder(Class, Coder)
to provide the key coder explicitly. -
withKeyDeserializerAndCoder
public KafkaIO.Read<K,V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) Sets a KafkaDeserializer
for interpreting key bytes read from Kafka along with aCoder
for helping the Beam runner materialize key objects at runtime if necessary.Use this method only if your pipeline doesn't work with plain
withKeyDeserializer(Class)
. -
withKeyDeserializer
-
withKeyDeserializerProviderAndCoder
public KafkaIO.Read<K,V> withKeyDeserializerProviderAndCoder(DeserializerProvider<K> deserializerProvider, Coder<K> keyCoder) -
withValueDeserializer
Sets a KafkaDeserializer
to interpret value bytes read from Kafka.In addition, Beam also needs a
Coder
to serialize and deserialize value objects at runtime. KafkaIO tries to infer a coder for the value based on theDeserializer
class, however in case that fails, you can usewithValueDeserializerAndCoder(Class, Coder)
to provide the value coder explicitly. -
withValueDeserializerAndCoder
public KafkaIO.Read<K,V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) Sets a KafkaDeserializer
for interpreting value bytes read from Kafka along with aCoder
for helping the Beam runner materialize value objects at runtime if necessary.Use this method only if your pipeline doesn't work with plain
withValueDeserializer(Class)
. -
withValueDeserializer
-
withValueDeserializerProviderAndCoder
public KafkaIO.Read<K,V> withValueDeserializerProviderAndCoder(DeserializerProvider<V> deserializerProvider, Coder<V> valueCoder) -
withConsumerFactoryFn
public KafkaIO.Read<K,V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) A factory to create KafkaConsumer
from consumer configuration. This is useful for supporting another version of Kafka consumer. Default isKafkaConsumer
. -
updateConsumerProperties
Deprecated.as of version 2.13. UsewithConsumerConfigUpdates(Map)
insteadUpdate consumer configuration with new properties. -
withMaxNumRecords
Similar toRead.Unbounded.withMaxNumRecords(long)
. Mainly used for tests and demo applications. -
withStartReadTime
Use timestamp to set up start offset. It is only supported by Kafka Client 0.10.1.0 onwards and the message format version after 0.10.0.Note that this take priority over start offset configuration
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
and any auto committed offsets.This results in hard failures in either of the following two cases : 1. If one or more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.
-
withStopReadTime
Use timestamp to set up stop offset. It is only supported by Kafka Client 0.10.1.0 onwards and the message format version after 0.10.0.This results in hard failures in either of the following two cases : 1. If one or more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.
-
withMaxReadTime
Similar toRead.Unbounded.withMaxReadTime(Duration)
. Mainly used for tests and demo applications. -
withLogAppendTime
SetsTimestampPolicy
toTimestampPolicyFactory.LogAppendTimePolicy
. The policy assigns Kafka's log append time (server side ingestion time) to each record. The watermark for each Kafka partition is the timestamp of the last record read. If a partition is idle, the watermark advances to couple of seconds behind wall time. Every record consumed from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'.In Kafka, log append time needs to be enabled for each topic, and all the subsequent records wil have their timestamp set to log append time. If a record does not have its timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous record timestamp or latest watermark, whichever is larger.
The watermark for the entire source is the oldest of each partition's watermark. If one of the readers falls behind possibly due to uneven distribution of records among Kafka partitions, it ends up holding the watermark for the entire source.
-
withProcessingTime
SetsTimestampPolicy
toTimestampPolicyFactory.ProcessingTimePolicy
. This is the default timestamp policy. It assigns processing time to each record. Specifically, this is the timestamp when the record becomes 'current' in the reader. The watermark aways advances to current time. If server side time (log append time) is enabled in Kafka,withLogAppendTime()
is recommended over this. -
withCreateTime
Sets the timestamps policy based onKafkaTimestampType.CREATE_TIME
timestamp of the records. It is an error if a record's timestamp type is notKafkaTimestampType.CREATE_TIME
. The timestamps within a partition are expected to be roughly monotonically increasing with a cap on out of order delays (e.g. 'max delay' of 1 minute). The watermark at any time is '(Min(now(), Max(event timestamp so far)) - max delay
)'. However, watermark is never set in future and capped to 'now - max delay'. In addition, watermark advanced to 'now - max delay' when a partition is idle.- Parameters:
maxDelay
- For any record in the Kafka partition, the timestamp of any subsequent record is expected to be aftercurrent record timestamp - maxDelay
.
-
withTimestampPolicyFactory
public KafkaIO.Read<K,V> withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) Provide customTimestampPolicyFactory
to set event times and watermark for each partition.TimestampPolicyFactory.createTimestampPolicy(TopicPartition, Optional)
is invoked for each partition when the reader starts.- See Also:
-
withTimestampFn2
@Deprecated public KafkaIO.Read<K,V> withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) Deprecated.as of version 2.4. UsewithTimestampPolicyFactory(TimestampPolicyFactory)
instead.A function to assign a timestamp to a record. Default is processing timestamp. -
withWatermarkFn2
@Deprecated public KafkaIO.Read<K,V> withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) Deprecated.as of version 2.4. UsewithTimestampPolicyFactory(TimestampPolicyFactory)
instead.A function to calculate watermark after a record. Default is last record timestamp.- See Also:
-
withTimestampFn
@Deprecated public KafkaIO.Read<K,V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) Deprecated.as of version 2.4. UsewithTimestampPolicyFactory(TimestampPolicyFactory)
instead.A function to assign a timestamp to a record. Default is processing timestamp. -
withWatermarkFn
@Deprecated public KafkaIO.Read<K,V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) Deprecated.as of version 2.4. UsewithTimestampPolicyFactory(TimestampPolicyFactory)
instead.A function to calculate watermark after a record. Default is last record timestamp.- See Also:
-
withReadCommitted
Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is ensures that the consumer does not read uncommitted messages. Kafka version 0.11 introduced transactional writes. Applications requiring end-to-end exactly-once semantics should only read committed messages. See JavaDoc forKafkaConsumer
for more description. -
commitOffsetsInFinalize
Finalized offsets are committed to Kafka. SeeUnboundedSource.CheckpointMark.finalizeCheckpoint()
. It helps with minimizing gaps or duplicate processing of records while restarting a pipeline from scratch. But it does not provide hard processing guarantees. There could be a short delay to commit afterUnboundedSource.CheckpointMark.finalizeCheckpoint()
is invoked, as reader might be blocked on reading from Kafka. Note that it is independent of 'AUTO_COMMIT' Kafka consumer configuration. Usually either this or AUTO_COMMIT in Kafka consumer is enabled, but not both. -
withDynamicRead
Configure the KafkaIO to useWatchForKafkaTopicPartitions
to detect and emit any new availableTopicPartition
forReadFromKafkaDoFn
to consume during pipeline execution time. The KafkaIO will regularly check the availability based on the given duration. If the duration is not specified asnull
, the default duration is 1 hour. -
withOffsetConsumerConfigOverrides
Set additional configuration for the backend offset consumer. It may be required for a secured Kafka cluster, especially when you see similar WARN log message 'exception while fetching latest offset for partition {}. will be retried'.In
KafkaIO.read()
, there're two consumers running in the backend actually:
1. the main consumer, which reads data from kafka;
2. the secondary offset consumer, which is used to estimate backlog, by fetching latest offset;
By default, offset consumer inherits the configuration from main consumer, with an auto-generated
ConsumerConfig.GROUP_ID_CONFIG
. This may not work in a secured Kafka which requires more configurations. -
withConsumerConfigUpdates
Update configuration for the backend main consumer. Note that the default consumer properties will not be completely overridden. This method only updates the value which has the same key.In
KafkaIO.read()
, there're two consumers running in the backend actually:
1. the main consumer, which reads data from kafka;
2. the secondary offset consumer, which is used to estimate backlog, by fetching latest offset;
By default, main consumer uses the configuration from
KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES
. -
withCheckStopReadingFn
A customCheckStopReadingFn
that determines whether theReadFromKafkaDoFn
should stop reading from the givenTopicPartition
. -
withCheckStopReadingFn
public KafkaIO.Read<K,V> withCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) A customSerializableFunction
that determines whether theReadFromKafkaDoFn
should stop reading from the givenTopicPartition
. -
withBadRecordErrorHandler
-
withConsumerPollingTimeout
Sets the timeout time in seconds for Kafka consumer polling request in theReadFromKafkaDoFn
. A lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching any records. The default is 2 seconds. -
withGCPApplicationDefaultCredentials
Creates and sets the Application Default Credentials for a Kafka consumer. This allows the consumer to be authenticated with a Google Kafka Server using OAuth. -
withTopicVerificationLogging
-
withoutMetadata
Returns aPTransform
for PCollection ofKV
, dropping Kafka metatdata. -
externalWithMetadata
-
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PBegin,
PCollection<KafkaRecord<K, V>>>
-
populateDisplayData
Description copied from class:PTransform
Register display data for the given transform or component.populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect display data viaDisplayData.from(HasDisplayData)
. Implementations may callsuper.populateDisplayData(builder)
in order to register display data in the current namespace, but should otherwise usesubcomponent.populateDisplayData(builder)
to use the namespace of the subcomponent.By default, does not register any display data. Implementors may override this method to provide their own display data.
- Specified by:
populateDisplayData
in interfaceHasDisplayData
- Overrides:
populateDisplayData
in classPTransform<PBegin,
PCollection<KafkaRecord<K, V>>> - Parameters:
builder
- The builder to populate with display data.- See Also:
-