Class KafkaIO.ReadSourceDescriptors<K,V>
- All Implemented Interfaces:
Serializable
,HasDisplayData
- Enclosing class:
KafkaIO
PTransform
to read from KafkaSourceDescriptor
. See KafkaIO
for more
information on usage and configuration. See ReadFromKafkaDoFn
for more implementation
details.
During expansion, if isCommitOffsetEnabled()
is true
,
the transform will expand to:
PCollection<KafkaSourceDescriptor> --> ParDo(ReadFromKafkaDoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord>>) --> Map(output KafkaRecord)
|
--> KafkaCommitOffset
. Note that this expansion is not supported when running with x-lang on Dataflow.- See Also:
-
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionEnable committing record offset.expand
(PCollection<KafkaSourceDescriptor> input) Override this method to specify how thisPTransform
should be expanded on the givenInputT
.static <K,
V> KafkaIO.ReadSourceDescriptors <K, V> read()
withBadRecordErrorHandler
(ErrorHandler<BadRecord, ?> errorHandler) withBootstrapServers
(String bootstrapServers) Sets the bootstrap servers to use for the Kafka consumer if unspecified via KafkaSourceDescriptor#getBootStrapServers()}.withCheckStopReadingFn
(@Nullable CheckStopReadingFn checkStopReadingFn) A customCheckStopReadingFn
that determines whether theReadFromKafkaDoFn
should stop reading from the givenTopicPartition
.withCheckStopReadingFn
(@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) A customSerializableFunction
that determines whether theReadFromKafkaDoFn
should stop reading from the givenTopicPartition
.withConsumerConfigOverrides
(Map<String, Object> consumerConfig) Replaces the configuration for the main consumer.withConsumerConfigUpdates
(Map<String, Object> configUpdates) Updates configuration for the main consumer.withConsumerFactoryFn
(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) A factory to create KafkaConsumer
from consumer configuration.withConsumerPollingTimeout
(long duration) Sets the timeout time in seconds for Kafka consumer polling request in theReadFromKafkaDoFn
.Use the creation time ofKafkaRecord
as the output timestamp.A function to create aWatermarkEstimator
.A function to calculate output timestamp for a givenKafkaRecord
.withKeyDeserializer
(Class<? extends Deserializer<K>> keyDeserializer) Sets a KafkaDeserializer
to interpret key bytes read from Kafka.withKeyDeserializerAndCoder
(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) Sets a KafkaDeserializer
for interpreting key bytes read from Kafka along with aCoder
for helping the Beam runner materialize key objects at runtime if necessary.withKeyDeserializerProvider
(@Nullable DeserializerProvider<K> deserializerProvider) withKeyDeserializerProviderAndCoder
(@Nullable DeserializerProvider<K> deserializerProvider, Coder<K> keyCoder) Use the log append time as the output timestamp.Use theWatermarkEstimators.Manual
as the watermark estimator.Use theWatermarkEstimators.MonotonicallyIncreasing
as the watermark estimator.withOffsetConsumerConfigOverrides
(@Nullable Map<String, Object> offsetConsumerConfig) Set additional configuration for the offset consumer.Use the processing time as the output timestamp.Sets "isolation_level" to "read_committed" in Kafka consumer configuration.Enable Redistribute.withRedistributeNumKeys
(int redistributeNumKeys) withValueDeserializer
(Class<? extends Deserializer<V>> valueDeserializer) Sets a KafkaDeserializer
to interpret value bytes read from Kafka.withValueDeserializerAndCoder
(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) Sets a KafkaDeserializer
for interpreting value bytes read from Kafka along with aCoder
for helping the Beam runner materialize value objects at runtime if necessary.withValueDeserializerProvider
(@Nullable DeserializerProvider<V> deserializerProvider) withValueDeserializerProviderAndCoder
(@Nullable DeserializerProvider<V> deserializerProvider, Coder<V> valueCoder) Use theWatermarkEstimators.WallTime
as the watermark estimator.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate, validate
-
Constructor Details
-
ReadSourceDescriptors
public ReadSourceDescriptors()
-
-
Method Details
-
read
-
withBootstrapServers
Sets the bootstrap servers to use for the Kafka consumer if unspecified via KafkaSourceDescriptor#getBootStrapServers()}. -
withKeyDeserializer
public KafkaIO.ReadSourceDescriptors<K,V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) Sets a KafkaDeserializer
to interpret key bytes read from Kafka.In addition, Beam also needs a
Coder
to serialize and deserialize key objects at runtime. KafkaIO tries to infer a coder for the key based on theDeserializer
class, however in case that fails, you can usewithKeyDeserializerAndCoder(Class, Coder)
to provide the key coder explicitly. -
withKeyDeserializerAndCoder
public KafkaIO.ReadSourceDescriptors<K,V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) Sets a KafkaDeserializer
for interpreting key bytes read from Kafka along with aCoder
for helping the Beam runner materialize key objects at runtime if necessary.Use this method to override the coder inference performed within
withKeyDeserializer(Class)
. -
withKeyDeserializerProvider
public KafkaIO.ReadSourceDescriptors<K,V> withKeyDeserializerProvider(@Nullable DeserializerProvider<K> deserializerProvider) -
withKeyDeserializerProviderAndCoder
public KafkaIO.ReadSourceDescriptors<K,V> withKeyDeserializerProviderAndCoder(@Nullable DeserializerProvider<K> deserializerProvider, Coder<K> keyCoder) -
withValueDeserializer
public KafkaIO.ReadSourceDescriptors<K,V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) Sets a KafkaDeserializer
to interpret value bytes read from Kafka.In addition, Beam also needs a
Coder
to serialize and deserialize value objects at runtime. KafkaIO tries to infer a coder for the value based on theDeserializer
class, however in case that fails, you can usewithValueDeserializerAndCoder(Class, Coder)
to provide the value coder explicitly. -
withValueDeserializerAndCoder
public KafkaIO.ReadSourceDescriptors<K,V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) Sets a KafkaDeserializer
for interpreting value bytes read from Kafka along with aCoder
for helping the Beam runner materialize value objects at runtime if necessary.Use this method to override the coder inference performed within
withValueDeserializer(Class)
. -
withValueDeserializerProvider
public KafkaIO.ReadSourceDescriptors<K,V> withValueDeserializerProvider(@Nullable DeserializerProvider<V> deserializerProvider) -
withValueDeserializerProviderAndCoder
public KafkaIO.ReadSourceDescriptors<K,V> withValueDeserializerProviderAndCoder(@Nullable DeserializerProvider<V> deserializerProvider, Coder<V> valueCoder) -
withConsumerFactoryFn
public KafkaIO.ReadSourceDescriptors<K,V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) A factory to create KafkaConsumer
from consumer configuration. This is useful for supporting another version of Kafka consumer. Default isKafkaConsumer
. -
withCheckStopReadingFn
public KafkaIO.ReadSourceDescriptors<K,V> withCheckStopReadingFn(@Nullable CheckStopReadingFn checkStopReadingFn) A customCheckStopReadingFn
that determines whether theReadFromKafkaDoFn
should stop reading from the givenTopicPartition
. -
withCheckStopReadingFn
public KafkaIO.ReadSourceDescriptors<K,V> withCheckStopReadingFn(@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) A customSerializableFunction
that determines whether theReadFromKafkaDoFn
should stop reading from the givenTopicPartition
. -
withConsumerConfigUpdates
public KafkaIO.ReadSourceDescriptors<K,V> withConsumerConfigUpdates(Map<String, Object> configUpdates) Updates configuration for the main consumer. This method merges updates from the provided map with any prior updates usingKafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES
as the starting configuration.In
ReadFromKafkaDoFn
, there're two consumers running in the backend:- the main consumer which reads data from kafka.
- the secondary offset consumer which is used to estimate the backlog by fetching the latest offset.
See
withConsumerConfigOverrides(java.util.Map<java.lang.String, java.lang.Object>)
for overriding the configuration instead of updating it.See
withOffsetConsumerConfigOverrides(java.util.Map<java.lang.String, java.lang.Object>)
for configuring the secondary offset consumer. -
withExtractOutputTimestampFn
public KafkaIO.ReadSourceDescriptors<K,V> withExtractOutputTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn) A function to calculate output timestamp for a givenKafkaRecord
. The default value iswithProcessingTime()
. -
withCreatWatermarkEstimatorFn
public KafkaIO.ReadSourceDescriptors<K,V> withCreatWatermarkEstimatorFn(SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) A function to create aWatermarkEstimator
. The default value isWatermarkEstimators.MonotonicallyIncreasing
. -
withLogAppendTime
Use the log append time as the output timestamp. -
withProcessingTime
Use the processing time as the output timestamp. -
withRedistribute
Enable Redistribute. -
withAllowDuplicates
-
withRedistributeNumKeys
-
withCreateTime
Use the creation time ofKafkaRecord
as the output timestamp. -
withWallTimeWatermarkEstimator
Use theWatermarkEstimators.WallTime
as the watermark estimator. -
withMonotonicallyIncreasingWatermarkEstimator
Use theWatermarkEstimators.MonotonicallyIncreasing
as the watermark estimator. -
withManualWatermarkEstimator
Use theWatermarkEstimators.Manual
as the watermark estimator. -
withReadCommitted
Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This ensures that the consumer does not read uncommitted messages. Kafka version 0.11 introduced transactional writes. Applications requiring end-to-end exactly-once semantics should only read committed messages. See JavaDoc forKafkaConsumer
for more description. -
commitOffsets
Enable committing record offset. IfConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
is set together withcommitOffsets()
,commitOffsets()
will be ignored. -
withOffsetConsumerConfigOverrides
public KafkaIO.ReadSourceDescriptors<K,V> withOffsetConsumerConfigOverrides(@Nullable Map<String, Object> offsetConsumerConfig) Set additional configuration for the offset consumer. It may be required for a secured Kafka cluster, especially when you see similar WARN log messageexception while fetching latest offset for partition {}. will be retried
.In
ReadFromKafkaDoFn
, there are two consumers running in the backend:- the main consumer which reads data from kafka.
- the secondary offset consumer which is used to estimate the backlog by fetching the latest offset.
By default, offset consumer inherits the configuration from main consumer, with an auto-generated
ConsumerConfig.GROUP_ID_CONFIG
. This may not work in a secured Kafka which requires additional configuration.See
withConsumerConfigUpdates(java.util.Map<java.lang.String, java.lang.Object>)
for configuring the main consumer. -
withConsumerConfigOverrides
public KafkaIO.ReadSourceDescriptors<K,V> withConsumerConfigOverrides(Map<String, Object> consumerConfig) Replaces the configuration for the main consumer.In
ReadFromKafkaDoFn
, there are two consumers running in the backend:- the main consumer which reads data from kafka.
- the secondary offset consumer which is used to estimate the backlog by fetching the latest offset.
By default, main consumer uses the configuration from
KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES
.See
withConsumerConfigUpdates(java.util.Map<java.lang.String, java.lang.Object>)
for updating the configuration instead of overriding it. -
withBadRecordErrorHandler
public KafkaIO.ReadSourceDescriptors<K,V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler) -
withConsumerPollingTimeout
Sets the timeout time in seconds for Kafka consumer polling request in theReadFromKafkaDoFn
. A lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching any records. The default is 2 seconds. -
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<KafkaSourceDescriptor>,
PCollection<KafkaRecord<K, V>>>
-