public class KafkaIO
extends java.lang.Object
UnboundedSource
KafkaIO source returns unbounded collection of Kafka records as PCollection<KafkaRecord<K, V>>
. A KafkaRecord
includes basic metadata like
topic-partition and offset, along with key and value associated with a Kafka record.
Although most applications consume a single topic, the source can be configured to consume
multiple topics or even a specific set of TopicPartition
s.
To configure a Kafka source, you must specify at the minimum Kafka bootstrapServers, one or more topics to consume, and key and value deserializers. For example:
pipeline
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on 'LogAppendTime'. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
// Use withCreateTime() with topics that have 'CreateTime' timestamps.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// Specified a serializable function which can determine whether to stop reading from given
// TopicPartition during runtime. Note that only {@link ReadFromKafkaDoFn} respect the
// signal.
.withCheckStopReadingFn(new SerializedFunction<TopicPartition, Boolean>() {})
//If you would like to send messages that fail to be parsed from Kafka to an alternate sink,
//use the error handler pattern as defined in {@link ErrorHandler}
.withBadRecordErrorHandler(errorHandler)
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
...
Kafka provides deserializers for common types in org.apache.kafka.common.serialization
. In addition to deserializers, Beam runners need Coder
to materialize key and value objects if necessary. In most cases, you don't need to
specify Coder
for key and value in the resulting collection because the coders are
inferred from deserializer types. However, in cases when coder inference fails, they can be
specified explicitly along with deserializers using KafkaIO.Read.withKeyDeserializerAndCoder(Class, Coder)
and KafkaIO.Read.withValueDeserializerAndCoder(Class, Coder)
. Note that Kafka messages are interpreted using
key and value deserializers.
TopicPartition
dynamically and stop reading from un. KafkaIO uses WatchForKafkaTopicPartitions
to emit any new added TopicPartition
and uses ReadFromKafkaDoFn
to read from each KafkaSourceDescriptor
. Dynamic read is able to solve
2 scenarios:
checkStopReadingFn
, there are 2 more cases that dynamic read can handle:
WatchForKafkaTopicPartitions
and ReadFromKafkaDoFn
react to the signal from removed/stopped TopicPartition
but we cannot
guarantee that both DoFns perform related actions at the same time.
Here is one example for failing to emit new added TopicPartition
:
WatchForKafkaTopicPartitions
is configured with updating the current tracking set
every 1 hour.
WatchForKafkaTopicPartitions
at 10:00AM and
ReadFromKafkaDoFn
starts to read from TopicPartition A immediately.
WatchForKafkaTopicPartitions
notices that the TopicPartition
has been stopped/removed, so it stops reading from it and returns ProcessContinuation.stop()
.
WatchForKafkaTopicPartitions
is invoked by firing timer, it doesn't
know that TopicPartition A has been stopped/removed. All it knows is that TopicPartition A
is still an active TopicPartition and it will not emit TopicPartition A again.
ReadFromKafkaDoFn
is processing TopicPartition A
ReadFromKafkaDoFn
starts to process other TopicPartitions(sdf-initiated
checkpoint or runner-issued checkpoint happens)
WatchForKafkaTopicPartitions
knows that TopicPartition A is
stopped/removed
WatchForKafkaTopicPartitions
knows that TopicPartition A is added again
and emits TopicPartition A again
ReadFromKafkaDoFn
starts to process resumed TopicPartition A but at the
same time ReadFromKafkaDoFn
is also processing the new emitted TopicPartitionA.
pipeline
.apply(KafkaIO.<Long, String>read()
// Configure the dynamic read with 1 hour, where the pipeline will look into available
// TopicPartitions and emit new added ones every 1 hour.
.withDynamicRead(Duration.standardHours(1))
.withCheckStopReadingFn(new SerializedFunction<TopicPartition, Boolean>() {})
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
)
.apply(Values.<String>create()) // PCollection<String>
...
Checkpointing is fully supported and each split can resume from previous checkpoint (to the
extent supported by runner). See KafkaUnboundedSource.split(int, PipelineOptions)
for
more details on splits and checkpoint support.
When the pipeline starts for the first time, or without any checkpoint, the source starts
consuming from the latest offsets. You can override this behavior to consume from the
beginning by setting properties appropriately in ConsumerConfig
, through KafkaIO.Read.withConsumerConfigUpdates(Map)
. You can also enable offset auto_commit in Kafka to resume
from last committed.
In summary, KafkaIO.read follows below sequence to set initial offset:
1. KafkaCheckpointMark
provided by runner;
2. Consumer offset stored in Kafka when ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true
;
3. Start from latest offset by default;
Seek to initial offset is a blocking operation in Kafka API, which can block forever for certain versions of Kafka client library. This is resolved by KIP-266 which provides `default.api.timeout.ms` consumer config setting to control such timeouts. KafkaIO.read implements timeout itself, to not to block forever in case older Kafka client is used. It does recognize `default.api.timeout.ms` setting and will honor the timeout value if it is passes in consumer config.
If you want to deserialize the keys and/or values based on a schema available in Confluent
Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
for deserialization. A Coder
will be inferred automatically based on the respective
Deserializer
.
For an Avro schema it will return a PCollection
of KafkaRecord
s where key
and/or value will be typed as GenericRecord
. In this case, users
don't need to specify key or/and value deserializers and coders since they will be set to KafkaAvroDeserializer
and AvroCoder
by default accordingly.
For example, below topic values are serialized with Avro schema stored in Schema Registry,
keys are typed as Long
:
PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
.apply(KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
// Use Confluent Schema Registry, specify schema registry URL and value subject
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
...
You can also pass properties to the schema registry client allowing you to configure authentication
ImmutableMap<String, Object> csrConfig =
ImmutableMap.<String, Object>builder()
.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,"USER_INFO")
.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,"<username>:<password>")
.build();
PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
.apply(KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
// Use Confluent Schema Registry, specify schema registry URL, value subject and schema registry client configuration
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("https://localhost:8081", "my_topic-value", null, csrConfig))
...
DoFn
KafkaIO.ReadSourceDescriptors
is the PTransform
that takes a PCollection of KafkaSourceDescriptor
as input and outputs a PCollection of KafkaRecord
. The core
implementation is based on SplittableDoFn
. For more details about the concept of SplittableDoFn
, please refer to the blog post and design doc. The major difference from KafkaIO.Read
is, KafkaIO.ReadSourceDescriptors
doesn't require source descriptions(e.g., KafkaIO.Read.getTopicPattern()
, KafkaIO.Read.getTopicPartitions()
, KafkaIO.Read.getTopics()
, KafkaIO.Read.getStartReadTime()
, etc.) during the pipeline
construction time. Instead, the pipeline can populate these source descriptions during runtime.
For example, the pipeline can query Kafka topics from a BigQuery table and read these topics via
KafkaIO.ReadSourceDescriptors
.
Most Kafka consumer configurations are similar to KafkaIO.Read
:
KafkaIO.ReadSourceDescriptors.getConsumerConfig()
is the same as KafkaIO.Read.getConsumerConfig()
.
KafkaIO.ReadSourceDescriptors.getConsumerFactoryFn()
is the same as KafkaIO.Read.getConsumerFactoryFn()
.
KafkaIO.ReadSourceDescriptors.getOffsetConsumerConfig()
is the same as KafkaIO.Read.getOffsetConsumerConfig()
.
KafkaIO.ReadSourceDescriptors.getKeyCoder()
is the same as KafkaIO.Read.getKeyCoder()
.
KafkaIO.ReadSourceDescriptors.getValueCoder()
is the same as KafkaIO.Read.getValueCoder()
.
KafkaIO.ReadSourceDescriptors.getKeyDeserializerProvider()
is the same as KafkaIO.Read.getKeyDeserializerProvider()
.
KafkaIO.ReadSourceDescriptors.getValueDeserializerProvider()
is the same as KafkaIO.Read.getValueDeserializerProvider()
.
KafkaIO.ReadSourceDescriptors.isCommitOffsetEnabled()
has the same meaning as KafkaIO.Read.isCommitOffsetsInFinalizeEnabled()
.
For example, to create a basic KafkaIO.ReadSourceDescriptors
transform:
pipeline
.apply(Create.of(
KafkaSourceDescriptor.of(
new TopicPartition("topic", 1),
null,
null,
null,
null,
null)))
.apply(
KafkaIO.<Long, String>readSourceDescriptors()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class));
Note that the bootstrapServers
can also be populated from the KafkaSourceDescriptor
:
pipeline
.apply(Create.of(
KafkaSourceDescriptor.of(
new TopicPartition("topic", 1),
null,
null,
null,
null,
ImmutableList.of("broker_1:9092", "broker_2:9092"))))
.apply(KafkaIO.<Long, String>readSourceDescriptors()
.withKeyDeserializer(LongDeserializer.class).
.withValueDeserializer(StringDeserializer.class));
KafkaIO.ReadSourceDescriptors
Except configurations of Kafka Consumer, there are some other configurations which are related to processing records.
KafkaIO.ReadSourceDescriptors.commitOffsets()
enables committing offset after processing the
record. Note that if ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
is set in the consumer
config, the KafkaIO.ReadSourceDescriptors.commitOffsets()
will be ignored.
KafkaIO.ReadSourceDescriptors.withExtractOutputTimestampFn(SerializableFunction)
is used to
compute the output timestamp
for a given KafkaRecord
and controls the watermark
advancement. There are three built-in types:
KafkaIO.ReadSourceDescriptors.withProcessingTime()
KafkaIO.ReadSourceDescriptors.withCreateTime()
KafkaIO.ReadSourceDescriptors.withLogAppendTime()
For example, to create a KafkaIO.ReadSourceDescriptors
with this additional configuration:
pipeline
.apply(Create.of(
KafkaSourceDescriptor.of(
new TopicPartition("topic", 1),
null,
null,
null,
null,
ImmutableList.of("broker_1:9092", "broker_2:9092"))))
.apply(KafkaIO.<Long, String>readSourceDescriptors()
.withKeyDeserializer(LongDeserializer.class).
.withValueDeserializer(StringDeserializer.class)
.withProcessingTime()
.commitOffsets());
KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the
values or native Kafka producer records using ProducerRecord
. To configure a Kafka sink, you must specify at
the minimum Kafka bootstrapServers, the topic to write to, and key and value
serializers. For example:
PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.<Long, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
// You can further customize KafkaProducer used to write the records by adding more
// settings for ProducerConfig. e.g, to enable compression :
.withProducerConfigUpdates(ImmutableMap.of("compression.type", "gzip"))
// You set publish timestamp for the Kafka records.
.withInputTimestamp() // element timestamp is used while publishing to Kafka
// or you can also set a custom timestamp with a function.
.withPublishTimestampFunction((elem, elemTs) -> ...)
// Optionally, records that fail to serialize can be sent to an error handler
// See {@link ErrorHandler} for details of for details of configuring a bad record error
// handler
.withBadRecordErrorHandler(errorHandler)
// Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
.withEOS(20, "eos-sink-group-id");
);
To produce Avro values you can use class KafkaAvroSerializer
. To make this class work with write()
and method withValueSerializer() make sure to erase the generic types by casting
to (Class) as shown in the following example:
KafkaIO.<Long, String>write()
...
.withValueSerializer((Class)KafkaAvroSerializer.class)
.withProducerConfigUpdates( <Map with schema registry configuration details> )
...
Often you might want to write just values without any keys to Kafka. Use values()
to
write records with default empty(null) key:
PCollection<String> strings = ...;
strings.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withValueSerializer(StringSerializer.class) // just need serializer for value
.values()
);
Also, if you want to write Kafka ProducerRecord
then you should use writeRecords()
:
PCollection<ProducerRecord<Long, String>> records = ...;
records.apply(KafkaIO.<Long, String>writeRecords()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
);
ConsumerConfig
for source or in ProducerConfig
for sink. E.g. if you would like to enable offset auto commit (for
external monitoring or other purposes), you can set "group.id",
"enable.auto.commit", etc.
KafkaIO.Read.withLogAppendTime()
. A custom timestamp
policy can be provided by implementing TimestampPolicyFactory
. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory)
for more information.
Modifier and Type | Class and Description |
---|---|
static class |
KafkaIO.Read<K,V>
A
PTransform to read from Kafka topics. |
static class |
KafkaIO.ReadSourceDescriptors<K,V>
A
PTransform to read from KafkaSourceDescriptor . |
static class |
KafkaIO.TypedWithoutMetadata<K,V>
A
PTransform to read from Kafka topics. |
static class |
KafkaIO.Write<K,V>
A
PTransform to write to a Kafka topic with KVs . |
static class |
KafkaIO.WriteRecords<K,V>
A
PTransform to write to a Kafka topic with ProducerRecord's. |
Modifier and Type | Method and Description |
---|---|
static <K,V> KafkaIO.Read<K,V> |
read()
Creates an uninitialized
KafkaIO.Read PTransform . |
static KafkaIO.Read<byte[],byte[]> |
readBytes()
A specific instance of uninitialized
read() where key and values are bytes. |
static <K,V> KafkaIO.ReadSourceDescriptors<K,V> |
readSourceDescriptors()
Creates an uninitialized
KafkaIO.ReadSourceDescriptors PTransform . |
static <K,V> KafkaIO.Write<K,V> |
write()
Creates an uninitialized
KafkaIO.Write PTransform . |
static <K,V> KafkaIO.WriteRecords<K,V> |
writeRecords()
Creates an uninitialized
KafkaIO.WriteRecords PTransform . |
public static KafkaIO.Read<byte[],byte[]> readBytes()
read()
where key and values are bytes. See
#read().public static <K,V> KafkaIO.Read<K,V> read()
KafkaIO.Read
PTransform
. Before use, basic Kafka configuration
should set with KafkaIO.Read.withBootstrapServers(String)
and KafkaIO.Read.withTopics(List)
.
Other optional settings include key and value Deserializer
s, custom timestamp,
watermark functions.public static <K,V> KafkaIO.ReadSourceDescriptors<K,V> readSourceDescriptors()
KafkaIO.ReadSourceDescriptors
PTransform
. Different from
KafkaIO.Read
, setting up topics
and bootstrapServers
is not required during
construction time. But the bootstrapServers
still can be configured KafkaIO.ReadSourceDescriptors.withBootstrapServers(String)
. Please refer to KafkaIO.ReadSourceDescriptors
for more details.public static <K,V> KafkaIO.Write<K,V> write()
KafkaIO.Write
PTransform
. Before use, Kafka configuration
should be set with KafkaIO.Write.withBootstrapServers(String)
and KafkaIO.Write.withTopic(java.lang.String)
along
with Deserializer
s for (optional) key and values.public static <K,V> KafkaIO.WriteRecords<K,V> writeRecords()
KafkaIO.WriteRecords
PTransform
. Before use, Kafka
configuration should be set with KafkaIO.WriteRecords.withBootstrapServers(String)
and KafkaIO.WriteRecords.withTopic(java.lang.String)
along with Deserializer
s for (optional) key and values.