public abstract static class KafkaIO.Read<K,V> extends PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
PTransform
to read from Kafka topics. See KafkaIO
for more information on
usage and configuration.name
Constructor and Description |
---|
Read() |
Modifier and Type | Method and Description |
---|---|
KafkaIO.Read<K,V> |
commitOffsetsInFinalize()
Finalized offsets are committed to Kafka.
|
PCollection<KafkaRecord<K,V>> |
expand(PBegin input)
Override this method to specify how this
PTransform should be expanded
on the given InputT . |
void |
populateDisplayData(DisplayData.Builder builder)
Register display data for the given transform or component.
|
KafkaIO.Read<K,V> |
updateConsumerProperties(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
Update consumer configuration with new properties.
|
KafkaIO.Read<K,V> |
withBootstrapServers(java.lang.String bootstrapServers)
Sets the bootstrap servers for the Kafka consumer.
|
KafkaIO.Read<K,V> |
withConsumerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,org.apache.kafka.clients.consumer.Consumer<byte[],byte[]>> consumerFactoryFn)
A factory to create Kafka
Consumer from consumer configuration. |
KafkaIO.Read<K,V> |
withCreateTime(Duration maxDelay)
Sets the timestamps policy based on
KafkaTimestampType.CREATE_TIME timestamp of the
records. |
KafkaIO.Read<K,V> |
withKeyDeserializer(java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializer)
Sets a Kafka
Deserializer to interpret key bytes read from Kafka. |
KafkaIO.Read<K,V> |
withKeyDeserializerAndCoder(java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializer,
Coder<K> keyCoder)
Sets a Kafka
Deserializer for interpreting key bytes read from Kafka along with a
Coder for helping the Beam runner materialize key objects at runtime if necessary. |
KafkaIO.Read<K,V> |
withLogAppendTime()
|
KafkaIO.Read<K,V> |
withMaxNumRecords(long maxNumRecords)
Similar to
Read.Unbounded.withMaxNumRecords(long) . |
KafkaIO.Read<K,V> |
withMaxReadTime(Duration maxReadTime)
Similar to
Read.Unbounded.withMaxReadTime(Duration) . |
PTransform<PBegin,PCollection<KV<K,V>>> |
withoutMetadata()
Returns a
PTransform for PCollection of KV , dropping Kafka metatdata. |
KafkaIO.Read<K,V> |
withProcessingTime()
|
KafkaIO.Read<K,V> |
withReadCommitted()
Sets "isolation_level" to "read_committed" in Kafka consumer configuration.
|
KafkaIO.Read<K,V> |
withStartReadTime(Instant startReadTime)
Use timestamp to set up start offset.
|
KafkaIO.Read<K,V> |
withTimestampFn(SerializableFunction<KV<K,V>,Instant> timestampFn)
Deprecated.
as of version 2.4.
Use
withTimestampPolicyFactory(TimestampPolicyFactory) instead. |
KafkaIO.Read<K,V> |
withTimestampFn2(SerializableFunction<KafkaRecord<K,V>,Instant> timestampFn)
Deprecated.
as of version 2.4.
Use
withTimestampPolicyFactory(TimestampPolicyFactory) instead. |
KafkaIO.Read<K,V> |
withTimestampPolicyFactory(TimestampPolicyFactory<K,V> timestampPolicyFactory)
Provide custom
TimestampPolicyFactory to set event times and watermark for each
partition. |
KafkaIO.Read<K,V> |
withTopic(java.lang.String topic)
Sets the topic to read from.
|
KafkaIO.Read<K,V> |
withTopicPartitions(java.util.List<org.apache.kafka.common.TopicPartition> topicPartitions)
Sets a list of partitions to read from.
|
KafkaIO.Read<K,V> |
withTopics(java.util.List<java.lang.String> topics)
Sets a list of topics to read from.
|
KafkaIO.Read<K,V> |
withValueDeserializer(java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializer)
Sets a Kafka
Deserializer to interpret value bytes read from Kafka. |
KafkaIO.Read<K,V> |
withValueDeserializerAndCoder(java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializer,
Coder<V> valueCoder)
Sets a Kafka
Deserializer for interpreting value bytes read from Kafka along with a
Coder for helping the Beam runner materialize value objects at runtime if necessary. |
KafkaIO.Read<K,V> |
withWatermarkFn(SerializableFunction<KV<K,V>,Instant> watermarkFn)
Deprecated.
as of version 2.4.
Use
withTimestampPolicyFactory(TimestampPolicyFactory) instead. |
KafkaIO.Read<K,V> |
withWatermarkFn2(SerializableFunction<KafkaRecord<K,V>,Instant> watermarkFn)
Deprecated.
as of version 2.4.
Use
withTimestampPolicyFactory(TimestampPolicyFactory) instead. |
getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, toString, validate
public KafkaIO.Read<K,V> withBootstrapServers(java.lang.String bootstrapServers)
public KafkaIO.Read<K,V> withTopic(java.lang.String topic)
See KafkaUnboundedSource.split(int, PipelineOptions)
for description
of how the partitions are distributed among the splits.
public KafkaIO.Read<K,V> withTopics(java.util.List<java.lang.String> topics)
See KafkaUnboundedSource.split(int, PipelineOptions)
for description
of how the partitions are distributed among the splits.
public KafkaIO.Read<K,V> withTopicPartitions(java.util.List<org.apache.kafka.common.TopicPartition> topicPartitions)
See KafkaUnboundedSource.split(int, PipelineOptions)
for description
of how the partitions are distributed among the splits.
public KafkaIO.Read<K,V> withKeyDeserializer(java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializer)
Deserializer
to interpret key bytes read from Kafka.
In addition, Beam also needs a Coder
to serialize and deserialize key objects at
runtime. KafkaIO tries to infer a coder for the key based on the Deserializer
class,
however in case that fails, you can use withKeyDeserializerAndCoder(Class, Coder)
to
provide the key coder explicitly.
public KafkaIO.Read<K,V> withKeyDeserializerAndCoder(java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializer, Coder<K> keyCoder)
Deserializer
for interpreting key bytes read from Kafka along with a
Coder
for helping the Beam runner materialize key objects at runtime if necessary.
Use this method only if your pipeline doesn't work with plain withKeyDeserializer(Class)
.
public KafkaIO.Read<K,V> withValueDeserializer(java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializer)
Deserializer
to interpret value bytes read from Kafka.
In addition, Beam also needs a Coder
to serialize and deserialize value objects at
runtime. KafkaIO tries to infer a coder for the value based on the Deserializer
class, however in case that fails, you can use withValueDeserializerAndCoder(Class,
Coder)
to provide the value coder explicitly.
public KafkaIO.Read<K,V> withValueDeserializerAndCoder(java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializer, Coder<V> valueCoder)
Deserializer
for interpreting value bytes read from Kafka along with a
Coder
for helping the Beam runner materialize value objects at runtime if necessary.
Use this method only if your pipeline doesn't work with plain withValueDeserializer(Class)
.
public KafkaIO.Read<K,V> withConsumerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,org.apache.kafka.clients.consumer.Consumer<byte[],byte[]>> consumerFactoryFn)
Consumer
from consumer configuration.
This is useful for supporting another version of Kafka consumer.
Default is KafkaConsumer
.public KafkaIO.Read<K,V> updateConsumerProperties(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
public KafkaIO.Read<K,V> withMaxNumRecords(long maxNumRecords)
Read.Unbounded.withMaxNumRecords(long)
.
Mainly used for tests and demo applications.public KafkaIO.Read<K,V> withStartReadTime(Instant startReadTime)
Note that this take priority over start offset configuration
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
and any auto committed offsets.
This results in hard failures in either of the following two cases : 1. If one of more partitions do not contain any messages with timestamp larger than or equal to desired timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps.
public KafkaIO.Read<K,V> withMaxReadTime(Duration maxReadTime)
Read.Unbounded.withMaxReadTime(Duration)
.
Mainly used for tests and demo
applications.public KafkaIO.Read<K,V> withLogAppendTime()
TimestampPolicy
to TimestampPolicyFactory.LogAppendTimePolicy
.
The policy assigns Kafka's log append time (server side ingestion time)
to each record. The watermark for each Kafka partition is the timestamp of the last record
read. If a partition is idle, the watermark advances to couple of seconds behind wall time.
Every record consumed from Kafka is expected to have its timestamp type set to
'LOG_APPEND_TIME'.
In Kafka, log append time needs to be enabled for each topic, and all the subsequent records wil have their timestamp set to log append time. If a record does not have its timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous record timestamp or latest watermark, whichever is larger.
The watermark for the entire source is the oldest of each partition's watermark. If one of the readers falls behind possibly due to uneven distribution of records among Kafka partitions, it ends up holding the watermark for the entire source.
public KafkaIO.Read<K,V> withProcessingTime()
TimestampPolicy
to TimestampPolicyFactory.ProcessingTimePolicy
.
This is the default timestamp policy. It assigns processing time to each record.
Specifically, this is the timestamp when the record becomes 'current' in the reader.
The watermark aways advances to current time. If server side time (log append time) is
enabled in Kafka, withLogAppendTime()
is recommended over this.public KafkaIO.Read<K,V> withCreateTime(Duration maxDelay)
KafkaTimestampType.CREATE_TIME
timestamp of the
records. It is an error if a record's timestamp type is not
KafkaTimestampType.CREATE_TIME
. The timestamps within a partition are expected to
be roughly monotonically increasing with a cap on out of order delays (e.g. 'max delay' of
1 minute). The watermark at any time is
'(Min(now(), Max(event timestamp so far)) - max delay
)'. However, watermark is never
set in future and capped to 'now - max delay'. In addition, watermark advanced to
'now - max delay' when a partition is idle.maxDelay
- For any record in the Kafka partition, the timestamp of any subsequent
record is expected to be after current record timestamp - maxDelay
.public KafkaIO.Read<K,V> withTimestampPolicyFactory(TimestampPolicyFactory<K,V> timestampPolicyFactory)
TimestampPolicyFactory
to set event times and watermark for each
partition. TimestampPolicyFactory.createTimestampPolicy(TopicPartition, Optional)
is invoked for each partition when the reader starts.@Deprecated public KafkaIO.Read<K,V> withTimestampFn2(SerializableFunction<KafkaRecord<K,V>,Instant> timestampFn)
withTimestampPolicyFactory(TimestampPolicyFactory)
instead.@Deprecated public KafkaIO.Read<K,V> withWatermarkFn2(SerializableFunction<KafkaRecord<K,V>,Instant> watermarkFn)
withTimestampPolicyFactory(TimestampPolicyFactory)
instead.withTimestampFn(SerializableFunction)
@Deprecated public KafkaIO.Read<K,V> withTimestampFn(SerializableFunction<KV<K,V>,Instant> timestampFn)
withTimestampPolicyFactory(TimestampPolicyFactory)
instead.@Deprecated public KafkaIO.Read<K,V> withWatermarkFn(SerializableFunction<KV<K,V>,Instant> watermarkFn)
withTimestampPolicyFactory(TimestampPolicyFactory)
instead.withTimestampFn(SerializableFunction)
public KafkaIO.Read<K,V> withReadCommitted()
KafkaConsumer
for more description.public KafkaIO.Read<K,V> commitOffsetsInFinalize()
UnboundedSource.CheckpointMark.finalizeCheckpoint()
.
It helps with minimizing gaps or duplicate processing of records while restarting a pipeline
from scratch. But it does not provide hard processing guarantees. There
could be a short delay to commit after UnboundedSource.CheckpointMark.finalizeCheckpoint()
is
invoked, as reader might be blocked on reading from Kafka. Note that it is independent of
'AUTO_COMMIT' Kafka consumer configuration. Usually either this or AUTO_COMMIT in Kafka
consumer is enabled, but not both.public PTransform<PBegin,PCollection<KV<K,V>>> withoutMetadata()
PTransform
for PCollection of KV
, dropping Kafka metatdata.public PCollection<KafkaRecord<K,V>> expand(PBegin input)
PTransform
PTransform
should be expanded
on the given InputT
.
NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
public void populateDisplayData(DisplayData.Builder builder)
PTransform
populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect
display data via DisplayData.from(HasDisplayData)
. Implementations may call
super.populateDisplayData(builder)
in order to register display data in the current
namespace, but should otherwise use subcomponent.populateDisplayData(builder)
to use
the namespace of the subcomponent.
By default, does not register any display data. Implementors may override this method to provide their own display data.
populateDisplayData
in interface HasDisplayData
populateDisplayData
in class PTransform<PBegin,PCollection<KafkaRecord<K,V>>>
builder
- The builder to populate with display data.HasDisplayData