public abstract static class KafkaIO.Read<K,V> extends PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
PTransform
to read from Kafka topics. See KafkaIO
for more information on
usage and configuration.Modifier and Type | Class and Description |
---|---|
static class |
KafkaIO.Read.External
Exposes
KafkaIO.TypedWithoutMetadata as an external transform for cross-language
usage. |
Modifier and Type | Field and Description |
---|---|
static org.apache.beam.sdk.runners.PTransformOverride |
KAFKA_READ_OVERRIDE
A
PTransformOverride for runners to swap ReadFromKafkaViaSDF to legacy Kafka
read if runners doesn't have a good support on executing unbounded Splittable DoFn. |
name, resourceHints
Constructor and Description |
---|
Read() |
Modifier and Type | Method and Description |
---|---|
KafkaIO.Read<K,V> |
commitOffsetsInFinalize()
Finalized offsets are committed to Kafka.
|
PCollection<KafkaRecord<K,V>> |
expand(PBegin input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
void |
populateDisplayData(DisplayData.Builder builder)
Register display data for the given transform or component.
|
KafkaIO.Read<K,V> |
updateConsumerProperties(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
Deprecated.
as of version 2.13. Use
withConsumerConfigUpdates(Map) instead |
KafkaIO.Read<K,V> |
withBootstrapServers(java.lang.String bootstrapServers)
Sets the bootstrap servers for the Kafka consumer.
|
KafkaIO.Read<K,V> |
withCheckStopReadingFn(SerializableFunction<TopicPartition,java.lang.Boolean> checkStopReadingFn)
A custom
SerializableFunction that determines whether the ReadFromKafkaDoFn
should stop reading from the given TopicPartition . |
KafkaIO.Read<K,V> |
withConsumerConfigUpdates(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
Update configuration for the backend main consumer.
|
KafkaIO.Read<K,V> |
withConsumerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,Consumer<byte[],byte[]>> consumerFactoryFn)
A factory to create Kafka
Consumer from consumer configuration. |
KafkaIO.Read<K,V> |
withCreateTime(Duration maxDelay)
Sets the timestamps policy based on
KafkaTimestampType.CREATE_TIME timestamp of the
records. |
KafkaIO.Read<K,V> |
withDynamicRead(Duration duration)
Configure the KafkaIO to use
WatchKafkaTopicPartitionDoFn to detect and emit any new
available TopicPartition for ReadFromKafkaDoFn to consume during pipeline
execution time. |
KafkaIO.Read<K,V> |
withKeyDeserializer(java.lang.Class<? extends Deserializer<K>> keyDeserializer)
Sets a Kafka
Deserializer to interpret key bytes read from Kafka. |
KafkaIO.Read<K,V> |
withKeyDeserializer(DeserializerProvider<K> deserializerProvider) |
KafkaIO.Read<K,V> |
withKeyDeserializerAndCoder(java.lang.Class<? extends Deserializer<K>> keyDeserializer,
Coder<K> keyCoder)
Sets a Kafka
Deserializer for interpreting key bytes read from Kafka along with a
Coder for helping the Beam runner materialize key objects at runtime if necessary. |
KafkaIO.Read<K,V> |
withLogAppendTime()
|
KafkaIO.Read<K,V> |
withMaxNumRecords(long maxNumRecords)
Similar to
Read.Unbounded.withMaxNumRecords(long) . |
KafkaIO.Read<K,V> |
withMaxReadTime(Duration maxReadTime)
Similar to
Read.Unbounded.withMaxReadTime(Duration) . |
KafkaIO.Read<K,V> |
withOffsetConsumerConfigOverrides(java.util.Map<java.lang.String,java.lang.Object> offsetConsumerConfig)
Set additional configuration for the backend offset consumer.
|
PTransform<PBegin,PCollection<KV<K,V>>> |
withoutMetadata()
Returns a
PTransform for PCollection of KV , dropping Kafka metatdata. |
KafkaIO.Read<K,V> |
withProcessingTime()
|
KafkaIO.Read<K,V> |
withReadCommitted()
Sets "isolation_level" to "read_committed" in Kafka consumer configuration.
|
KafkaIO.Read<K,V> |
withStartReadTime(Instant startReadTime)
Use timestamp to set up start offset.
|
KafkaIO.Read<K,V> |
withStopReadTime(Instant stopReadTime)
Use timestamp to set up stop offset.
|
KafkaIO.Read<K,V> |
withTimestampFn(SerializableFunction<KV<K,V>,Instant> timestampFn)
Deprecated.
as of version 2.4. Use
withTimestampPolicyFactory(TimestampPolicyFactory) instead. |
KafkaIO.Read<K,V> |
withTimestampFn2(SerializableFunction<KafkaRecord<K,V>,Instant> timestampFn)
Deprecated.
as of version 2.4. Use
withTimestampPolicyFactory(TimestampPolicyFactory) instead. |
KafkaIO.Read<K,V> |
withTimestampPolicyFactory(TimestampPolicyFactory<K,V> timestampPolicyFactory)
Provide custom
TimestampPolicyFactory to set event times and watermark for each
partition. |
KafkaIO.Read<K,V> |
withTopic(java.lang.String topic)
Sets the topic to read from.
|
KafkaIO.Read<K,V> |
withTopicPartitions(java.util.List<TopicPartition> topicPartitions)
Sets a list of partitions to read from.
|
KafkaIO.Read<K,V> |
withTopics(java.util.List<java.lang.String> topics)
Sets a list of topics to read from.
|
KafkaIO.Read<K,V> |
withValueDeserializer(java.lang.Class<? extends Deserializer<V>> valueDeserializer)
Sets a Kafka
Deserializer to interpret value bytes read from Kafka. |
KafkaIO.Read<K,V> |
withValueDeserializer(DeserializerProvider<V> deserializerProvider) |
KafkaIO.Read<K,V> |
withValueDeserializerAndCoder(java.lang.Class<? extends Deserializer<V>> valueDeserializer,
Coder<V> valueCoder)
Sets a Kafka
Deserializer for interpreting value bytes read from Kafka along with a
Coder for helping the Beam runner materialize value objects at runtime if necessary. |
KafkaIO.Read<K,V> |
withWatermarkFn(SerializableFunction<KV<K,V>,Instant> watermarkFn)
Deprecated.
as of version 2.4. Use
withTimestampPolicyFactory(TimestampPolicyFactory) instead. |
KafkaIO.Read<K,V> |
withWatermarkFn2(SerializableFunction<KafkaRecord<K,V>,Instant> watermarkFn)
Deprecated.
as of version 2.4. Use
withTimestampPolicyFactory(TimestampPolicyFactory) instead. |
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setResourceHints, toString, validate
@Internal public static final org.apache.beam.sdk.runners.PTransformOverride KAFKA_READ_OVERRIDE
PTransformOverride
for runners to swap ReadFromKafkaViaSDF
to legacy Kafka
read if runners doesn't have a good support on executing unbounded Splittable DoFn.public KafkaIO.Read<K,V> withBootstrapServers(java.lang.String bootstrapServers)
public KafkaIO.Read<K,V> withTopic(java.lang.String topic)
See KafkaUnboundedSource.split(int, PipelineOptions)
for description of how the
partitions are distributed among the splits.
public KafkaIO.Read<K,V> withTopics(java.util.List<java.lang.String> topics)
See KafkaUnboundedSource.split(int, PipelineOptions)
for description of how the
partitions are distributed among the splits.
public KafkaIO.Read<K,V> withTopicPartitions(java.util.List<TopicPartition> topicPartitions)
See KafkaUnboundedSource.split(int, PipelineOptions)
for description of how the
partitions are distributed among the splits.
public KafkaIO.Read<K,V> withKeyDeserializer(java.lang.Class<? extends Deserializer<K>> keyDeserializer)
Deserializer
to interpret key bytes read from Kafka.
In addition, Beam also needs a Coder
to serialize and deserialize key objects at
runtime. KafkaIO tries to infer a coder for the key based on the Deserializer
class,
however in case that fails, you can use withKeyDeserializerAndCoder(Class, Coder)
to
provide the key coder explicitly.
public KafkaIO.Read<K,V> withKeyDeserializerAndCoder(java.lang.Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder)
Deserializer
for interpreting key bytes read from Kafka along with a
Coder
for helping the Beam runner materialize key objects at runtime if necessary.
Use this method only if your pipeline doesn't work with plain withKeyDeserializer(Class)
.
public KafkaIO.Read<K,V> withKeyDeserializer(DeserializerProvider<K> deserializerProvider)
public KafkaIO.Read<K,V> withValueDeserializer(java.lang.Class<? extends Deserializer<V>> valueDeserializer)
Deserializer
to interpret value bytes read from Kafka.
In addition, Beam also needs a Coder
to serialize and deserialize value objects at
runtime. KafkaIO tries to infer a coder for the value based on the Deserializer
class, however in case that fails, you can use withValueDeserializerAndCoder(Class,
Coder)
to provide the value coder explicitly.
public KafkaIO.Read<K,V> withValueDeserializerAndCoder(java.lang.Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder)
Deserializer
for interpreting value bytes read from Kafka along with a
Coder
for helping the Beam runner materialize value objects at runtime if necessary.
Use this method only if your pipeline doesn't work with plain withValueDeserializer(Class)
.
public KafkaIO.Read<K,V> withValueDeserializer(DeserializerProvider<V> deserializerProvider)
public KafkaIO.Read<K,V> withConsumerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,Consumer<byte[],byte[]>> consumerFactoryFn)
Consumer
from consumer configuration. This is useful for
supporting another version of Kafka consumer. Default is KafkaConsumer
.@Deprecated public KafkaIO.Read<K,V> updateConsumerProperties(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
withConsumerConfigUpdates(Map)
insteadpublic KafkaIO.Read<K,V> withMaxNumRecords(long maxNumRecords)
Read.Unbounded.withMaxNumRecords(long)
. Mainly used
for tests and demo applications.public KafkaIO.Read<K,V> withStartReadTime(Instant startReadTime)
Note that this take priority over start offset configuration ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
and any auto committed offsets.
This results in hard failures in either of the following two cases : 1. If one or more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.
public KafkaIO.Read<K,V> withStopReadTime(Instant stopReadTime)
This results in hard failures in either of the following two cases : 1. If one or more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.
public KafkaIO.Read<K,V> withMaxReadTime(Duration maxReadTime)
Read.Unbounded.withMaxReadTime(Duration)
. Mainly
used for tests and demo applications.public KafkaIO.Read<K,V> withLogAppendTime()
TimestampPolicy
to TimestampPolicyFactory.LogAppendTimePolicy
. The
policy assigns Kafka's log append time (server side ingestion time) to each record. The
watermark for each Kafka partition is the timestamp of the last record read. If a partition
is idle, the watermark advances to couple of seconds behind wall time. Every record consumed
from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'.
In Kafka, log append time needs to be enabled for each topic, and all the subsequent records wil have their timestamp set to log append time. If a record does not have its timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous record timestamp or latest watermark, whichever is larger.
The watermark for the entire source is the oldest of each partition's watermark. If one of the readers falls behind possibly due to uneven distribution of records among Kafka partitions, it ends up holding the watermark for the entire source.
public KafkaIO.Read<K,V> withProcessingTime()
TimestampPolicy
to TimestampPolicyFactory.ProcessingTimePolicy
. This is
the default timestamp policy. It assigns processing time to each record. Specifically, this
is the timestamp when the record becomes 'current' in the reader. The watermark aways
advances to current time. If server side time (log append time) is enabled in Kafka, withLogAppendTime()
is recommended over this.public KafkaIO.Read<K,V> withCreateTime(Duration maxDelay)
KafkaTimestampType.CREATE_TIME
timestamp of the
records. It is an error if a record's timestamp type is not KafkaTimestampType.CREATE_TIME
. The timestamps within a partition are expected to be roughly
monotonically increasing with a cap on out of order delays (e.g. 'max delay' of 1 minute).
The watermark at any time is '(Min(now(), Max(event timestamp so far)) - max delay
)'.
However, watermark is never set in future and capped to 'now - max delay'. In addition,
watermark advanced to 'now - max delay' when a partition is idle.maxDelay
- For any record in the Kafka partition, the timestamp of any subsequent record
is expected to be after current record timestamp - maxDelay
.public KafkaIO.Read<K,V> withTimestampPolicyFactory(TimestampPolicyFactory<K,V> timestampPolicyFactory)
TimestampPolicyFactory
to set event times and watermark for each
partition. TimestampPolicyFactory.createTimestampPolicy(TopicPartition, Optional)
is
invoked for each partition when the reader starts.@Deprecated public KafkaIO.Read<K,V> withTimestampFn2(SerializableFunction<KafkaRecord<K,V>,Instant> timestampFn)
withTimestampPolicyFactory(TimestampPolicyFactory)
instead.@Deprecated public KafkaIO.Read<K,V> withWatermarkFn2(SerializableFunction<KafkaRecord<K,V>,Instant> watermarkFn)
withTimestampPolicyFactory(TimestampPolicyFactory)
instead.withTimestampFn(SerializableFunction)
@Deprecated public KafkaIO.Read<K,V> withTimestampFn(SerializableFunction<KV<K,V>,Instant> timestampFn)
withTimestampPolicyFactory(TimestampPolicyFactory)
instead.@Deprecated public KafkaIO.Read<K,V> withWatermarkFn(SerializableFunction<KV<K,V>,Instant> watermarkFn)
withTimestampPolicyFactory(TimestampPolicyFactory)
instead.withTimestampFn(SerializableFunction)
public KafkaIO.Read<K,V> withReadCommitted()
KafkaConsumer
for more description.public KafkaIO.Read<K,V> commitOffsetsInFinalize()
UnboundedSource.CheckpointMark.finalizeCheckpoint()
. It
helps with minimizing gaps or duplicate processing of records while restarting a pipeline
from scratch. But it does not provide hard processing guarantees. There could be a short
delay to commit after UnboundedSource.CheckpointMark.finalizeCheckpoint()
is invoked, as reader might
be blocked on reading from Kafka. Note that it is independent of 'AUTO_COMMIT' Kafka consumer
configuration. Usually either this or AUTO_COMMIT in Kafka consumer is enabled, but not both.public KafkaIO.Read<K,V> withDynamicRead(Duration duration)
WatchKafkaTopicPartitionDoFn
to detect and emit any new
available TopicPartition
for ReadFromKafkaDoFn
to consume during pipeline
execution time. The KafkaIO will regularly check the availability based on the given
duration. If the duration is not specified as null
, the default duration is 1 hour.public KafkaIO.Read<K,V> withOffsetConsumerConfigOverrides(java.util.Map<java.lang.String,java.lang.Object> offsetConsumerConfig)
In KafkaIO.read()
, there're two consumers running in the backend actually:
1. the main consumer, which reads data from kafka;
2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
offset;
By default, offset consumer inherits the configuration from main consumer, with an
auto-generated ConsumerConfig.GROUP_ID_CONFIG
. This may not work in a secured Kafka
which requires more configurations.
public KafkaIO.Read<K,V> withConsumerConfigUpdates(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
In KafkaIO.read()
, there're two consumers running in the backend actually:
1. the main consumer, which reads data from kafka;
2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
offset;
By default, main consumer uses the configuration from KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES
.
public KafkaIO.Read<K,V> withCheckStopReadingFn(SerializableFunction<TopicPartition,java.lang.Boolean> checkStopReadingFn)
SerializableFunction
that determines whether the ReadFromKafkaDoFn
should stop reading from the given TopicPartition
.public PTransform<PBegin,PCollection<KV<K,V>>> withoutMetadata()
PTransform
for PCollection of KV
, dropping Kafka metatdata.public PCollection<KafkaRecord<K,V>> expand(PBegin input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
public void populateDisplayData(DisplayData.Builder builder)
PTransform
populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect
display data via DisplayData.from(HasDisplayData)
. Implementations may call super.populateDisplayData(builder)
in order to register display data in the current namespace,
but should otherwise use subcomponent.populateDisplayData(builder)
to use the namespace
of the subcomponent.
By default, does not register any display data. Implementors may override this method to provide their own display data.
populateDisplayData
in interface HasDisplayData
populateDisplayData
in class PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
builder
- The builder to populate with display data.HasDisplayData