@Experimental(value=PORTABILITY) public abstract static class KafkaIO.ReadSourceDescriptors<K,V> extends PTransform<PCollection<KafkaSourceDescriptor>,PCollection<KafkaRecord<K,V>>>
PTransform
to read from KafkaSourceDescriptor
. See KafkaIO
for more
information on usage and configuration. See ReadFromKafkaDoFn
for more implementation
details.
During expansion, if isCommitOffsetEnabled()
is true
,
the transform will expand to:
PCollection<KafkaSourceDescriptor> --> ParDo(ReadFromKafkaDoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord>>) --> Reshuffle() --> Map(output KafkaRecord)
|
--> KafkaCommitOffset
. Note that this expansion is not supported when running with x-lang on Dataflow.name
Constructor and Description |
---|
ReadSourceDescriptors() |
Modifier and Type | Method and Description |
---|---|
KafkaIO.ReadSourceDescriptors<K,V> |
commitOffsets()
Enable committing record offset.
|
PCollection<KafkaRecord<K,V>> |
expand(PCollection<KafkaSourceDescriptor> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
static <K,V> KafkaIO.ReadSourceDescriptors<K,V> |
read() |
KafkaIO.ReadSourceDescriptors<K,V> |
withBootstrapServers(java.lang.String bootstrapServers)
Sets the bootstrap servers to use for the Kafka consumer if unspecified via
KafkaSourceDescriptor#getBootStrapServers()}.
|
KafkaIO.ReadSourceDescriptors<K,V> |
withConsumerConfigOverrides(java.util.Map<java.lang.String,java.lang.Object> consumerConfig)
Replaces the configuration for the main consumer.
|
KafkaIO.ReadSourceDescriptors<K,V> |
withConsumerConfigUpdates(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
Updates configuration for the main consumer.
|
KafkaIO.ReadSourceDescriptors<K,V> |
withConsumerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,Consumer<byte[],byte[]>> consumerFactoryFn)
A factory to create Kafka
Consumer from consumer configuration. |
KafkaIO.ReadSourceDescriptors<K,V> |
withCreateTime()
Use the creation time of
KafkaRecord as the output timestamp. |
KafkaIO.ReadSourceDescriptors<K,V> |
withCreatWatermarkEstimatorFn(SerializableFunction<Instant,WatermarkEstimator<Instant>> fn)
A function to create a
WatermarkEstimator . |
KafkaIO.ReadSourceDescriptors<K,V> |
withExtractOutputTimestampFn(SerializableFunction<KafkaRecord<K,V>,Instant> fn)
A function to calculate output timestamp for a given
KafkaRecord . |
KafkaIO.ReadSourceDescriptors<K,V> |
withKeyDeserializer(java.lang.Class<? extends Deserializer<K>> keyDeserializer)
Sets a Kafka
Deserializer to interpret key bytes read from Kafka. |
KafkaIO.ReadSourceDescriptors<K,V> |
withKeyDeserializerAndCoder(java.lang.Class<? extends Deserializer<K>> keyDeserializer,
Coder<K> keyCoder)
Sets a Kafka
Deserializer for interpreting key bytes read from Kafka along with a
Coder for helping the Beam runner materialize key objects at runtime if necessary. |
KafkaIO.ReadSourceDescriptors<K,V> |
withKeyDeserializerProvider(org.apache.beam.sdk.io.kafka.DeserializerProvider<K> deserializerProvider) |
KafkaIO.ReadSourceDescriptors<K,V> |
withLogAppendTime()
Use the log append time as the output timestamp.
|
KafkaIO.ReadSourceDescriptors<K,V> |
withManualWatermarkEstimator()
Use the
WatermarkEstimators.Manual as the watermark estimator. |
KafkaIO.ReadSourceDescriptors<K,V> |
withMonotonicallyIncreasingWatermarkEstimator()
Use the
WatermarkEstimators.MonotonicallyIncreasing as the watermark estimator. |
KafkaIO.ReadSourceDescriptors<K,V> |
withOffsetConsumerConfigOverrides(java.util.Map<java.lang.String,java.lang.Object> offsetConsumerConfig)
Set additional configuration for the offset consumer.
|
KafkaIO.ReadSourceDescriptors<K,V> |
withProcessingTime()
Use the processing time as the output timestamp.
|
KafkaIO.ReadSourceDescriptors<K,V> |
withReadCommitted()
Sets "isolation_level" to "read_committed" in Kafka consumer configuration.
|
KafkaIO.ReadSourceDescriptors<K,V> |
withValueDeserializer(java.lang.Class<? extends Deserializer<V>> valueDeserializer)
Sets a Kafka
Deserializer to interpret value bytes read from Kafka. |
KafkaIO.ReadSourceDescriptors<K,V> |
withValueDeserializerAndCoder(java.lang.Class<? extends Deserializer<V>> valueDeserializer,
Coder<V> valueCoder)
Sets a Kafka
Deserializer for interpreting value bytes read from Kafka along with a
Coder for helping the Beam runner materialize value objects at runtime if necessary. |
KafkaIO.ReadSourceDescriptors<K,V> |
withValueDeserializerProvider(org.apache.beam.sdk.io.kafka.DeserializerProvider<V> deserializerProvider) |
KafkaIO.ReadSourceDescriptors<K,V> |
withWallTimeWatermarkEstimator()
Use the
WatermarkEstimators.WallTime as the watermark estimator. |
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, populateDisplayData, toString, validate
public static <K,V> KafkaIO.ReadSourceDescriptors<K,V> read()
public KafkaIO.ReadSourceDescriptors<K,V> withBootstrapServers(java.lang.String bootstrapServers)
public KafkaIO.ReadSourceDescriptors<K,V> withKeyDeserializerProvider(org.apache.beam.sdk.io.kafka.DeserializerProvider<K> deserializerProvider)
public KafkaIO.ReadSourceDescriptors<K,V> withValueDeserializerProvider(org.apache.beam.sdk.io.kafka.DeserializerProvider<V> deserializerProvider)
public KafkaIO.ReadSourceDescriptors<K,V> withKeyDeserializer(java.lang.Class<? extends Deserializer<K>> keyDeserializer)
Deserializer
to interpret key bytes read from Kafka.
In addition, Beam also needs a Coder
to serialize and deserialize key objects at
runtime. KafkaIO tries to infer a coder for the key based on the Deserializer
class,
however in case that fails, you can use withKeyDeserializerAndCoder(Class, Coder)
to
provide the key coder explicitly.
public KafkaIO.ReadSourceDescriptors<K,V> withValueDeserializer(java.lang.Class<? extends Deserializer<V>> valueDeserializer)
Deserializer
to interpret value bytes read from Kafka.
In addition, Beam also needs a Coder
to serialize and deserialize value objects at
runtime. KafkaIO tries to infer a coder for the value based on the Deserializer
class, however in case that fails, you can use withValueDeserializerAndCoder(Class,
Coder)
to provide the value coder explicitly.
public KafkaIO.ReadSourceDescriptors<K,V> withKeyDeserializerAndCoder(java.lang.Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder)
Deserializer
for interpreting key bytes read from Kafka along with a
Coder
for helping the Beam runner materialize key objects at runtime if necessary.
Use this method to override the coder inference performed within withKeyDeserializer(Class)
.
public KafkaIO.ReadSourceDescriptors<K,V> withValueDeserializerAndCoder(java.lang.Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder)
Deserializer
for interpreting value bytes read from Kafka along with a
Coder
for helping the Beam runner materialize value objects at runtime if necessary.
Use this method to override the coder inference performed within withValueDeserializer(Class)
.
public KafkaIO.ReadSourceDescriptors<K,V> withConsumerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,Consumer<byte[],byte[]>> consumerFactoryFn)
Consumer
from consumer configuration. This is useful for
supporting another version of Kafka consumer. Default is KafkaConsumer
.public KafkaIO.ReadSourceDescriptors<K,V> withConsumerConfigUpdates(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES
as the starting
configuration.
In ReadFromKafkaDoFn
, there're two consumers running in the backend:
See withConsumerConfigOverrides(java.util.Map<java.lang.String, java.lang.Object>)
for overriding the configuration instead of
updating it.
See withOffsetConsumerConfigOverrides(java.util.Map<java.lang.String, java.lang.Object>)
for configuring the secondary offset
consumer.
public KafkaIO.ReadSourceDescriptors<K,V> withExtractOutputTimestampFn(SerializableFunction<KafkaRecord<K,V>,Instant> fn)
KafkaRecord
. The default value
is withProcessingTime()
.public KafkaIO.ReadSourceDescriptors<K,V> withCreatWatermarkEstimatorFn(SerializableFunction<Instant,WatermarkEstimator<Instant>> fn)
WatermarkEstimator
. The default value is WatermarkEstimators.MonotonicallyIncreasing
.public KafkaIO.ReadSourceDescriptors<K,V> withLogAppendTime()
public KafkaIO.ReadSourceDescriptors<K,V> withProcessingTime()
public KafkaIO.ReadSourceDescriptors<K,V> withCreateTime()
KafkaRecord
as the output timestamp.public KafkaIO.ReadSourceDescriptors<K,V> withWallTimeWatermarkEstimator()
WatermarkEstimators.WallTime
as the watermark estimator.public KafkaIO.ReadSourceDescriptors<K,V> withMonotonicallyIncreasingWatermarkEstimator()
WatermarkEstimators.MonotonicallyIncreasing
as the watermark estimator.public KafkaIO.ReadSourceDescriptors<K,V> withManualWatermarkEstimator()
WatermarkEstimators.Manual
as the watermark estimator.public KafkaIO.ReadSourceDescriptors<K,V> withReadCommitted()
KafkaConsumer
for more description.public KafkaIO.ReadSourceDescriptors<K,V> commitOffsets()
withReadCommitted()
or ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
is set together with commitOffsets()
,
commitOffsets()
will be ignored.public KafkaIO.ReadSourceDescriptors<K,V> withOffsetConsumerConfigOverrides(java.util.Map<java.lang.String,java.lang.Object> offsetConsumerConfig)
exception while fetching
latest offset for partition {}. will be retried
.
In ReadFromKafkaDoFn
, there are two consumers running in the backend:
By default, offset consumer inherits the configuration from main consumer, with an
auto-generated ConsumerConfig.GROUP_ID_CONFIG
. This may not work in a secured Kafka
which requires additional configuration.
See withConsumerConfigUpdates(java.util.Map<java.lang.String, java.lang.Object>)
for configuring the main consumer.
public KafkaIO.ReadSourceDescriptors<K,V> withConsumerConfigOverrides(java.util.Map<java.lang.String,java.lang.Object> consumerConfig)
In ReadFromKafkaDoFn
, there are two consumers running in the backend:
By default, main consumer uses the configuration from KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES
.
See withConsumerConfigUpdates(java.util.Map<java.lang.String, java.lang.Object>)
for updating the configuration instead of
overriding it.
public PCollection<KafkaRecord<K,V>> expand(PCollection<KafkaSourceDescriptor> input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<KafkaSourceDescriptor>,PCollection<KafkaRecord<K,V>>>