@Experimental(value=SOURCE_SINK) public class KafkaIO extends java.lang.Object
KafkaIO source returns unbounded collection of Kafka records as
PCollection<KafkaRecord<K, V>>
. A KafkaRecord
includes basic
metadata like topic-partition and offset, along with key and value associated with a Kafka
record.
Although most applications consume a single topic, the source can be configured to consume
multiple topics or even a specific set of TopicPartition
s.
To configure a Kafka source, you must specify at the minimum Kafka bootstrapServers, one or more topics to consume, and key and value deserializers. For example:
pipeline
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 1024 * 1024))
// custom function for calculating record timestamp (default is processing time)
.withTimestampFn(new MyTypestampFunction())
// custom function for watermark (default is record timestamp)
.withWatermarkFn(new MyWatermarkFunction())
// finally, if you don't need Kafka metadata, you can drop it
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
...
Kafka provides deserializers for common types in
org.apache.kafka.common.serialization
. In addition to deserializers, Beam runners need
Coder
to materialize key and value objects if necessary.
In most cases, you don't need to specify Coder
for key and value in the resulting
collection because the coders are inferred from deserializer types. However, in cases when
coder inference fails, they can be specified explicitly along with deserializers using
KafkaIO.Read.withKeyDeserializerAndCoder(Class, Coder)
and
KafkaIO.Read.withValueDeserializerAndCoder(Class, Coder)
. Note that Kafka messages are
interpreted using key and value deserializers.
Checkpointing is fully supported and each split can resume from previous checkpoint
(to the extent supported by runner).
See UnboundedKafkaSource#split(int, PipelineOptions)
for more details on
splits and checkpoint support.
When the pipeline starts for the first time, or without any checkpoint, the source starts
consuming from the latest offsets. You can override this behavior to consume from the
beginning by setting appropriate appropriate properties in ConsumerConfig
, through
KafkaIO.Read.updateConsumerProperties(Map)
.
You can also enable offset auto_commit in Kafka to resume from last committed.
In summary, KafkaIO.read follows below sequence to set initial offset:
1. KafkaCheckpointMark
provided by runner;
2. Consumer offset stored in Kafka when
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true
;
3. Start from latest offset by default;
KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the values. To configure a Kafka sink, you must specify at the minimum Kafka bootstrapServers, the topic to write to, and key and value serializers. For example:
PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.<Long, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
// you can further customize KafkaProducer used to write the records by adding more
// settings for ProducerConfig. e.g, to enable compression :
.updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
);
Often you might want to write just values without any keys to Kafka. Use values()
to
write records with default empty(null) key:
PCollection<String> strings = ...;
strings.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withValueSerializer(new StringSerializer()) // just need serializer for value
.values()
);
ConsumerConfig
for source or in
ProducerConfig
for sink. E.g. if you would like to enable offset
auto commit (for external monitoring or other purposes), you can set
"group.id", "enable.auto.commit", etc.
WatermarkFn
with
KafkaIO.Read.withWatermarkFn(SerializableFunction)
, and TimestampFn
with
KafkaIO.Read.withTimestampFn(SerializableFunction)
.KafkaRecord.getTimestamp()
reflects timestamp provided by Kafka if any,
otherwise it is set to processing time.Modifier and Type | Class and Description |
---|---|
static class |
KafkaIO.Read<K,V>
A
PTransform to read from Kafka topics. |
static class |
KafkaIO.TypedWithoutMetadata<K,V>
A
PTransform to read from Kafka topics. |
static class |
KafkaIO.Write<K,V>
A
PTransform to write to a Kafka topic. |
Modifier and Type | Method and Description |
---|---|
static <K,V> KafkaIO.Read<K,V> |
read()
Creates an uninitialized
KafkaIO.Read PTransform . |
static KafkaIO.Read<byte[],byte[]> |
readBytes()
Creates an uninitialized
KafkaIO.Read PTransform . |
static <K,V> KafkaIO.Write<K,V> |
write()
Creates an uninitialized
KafkaIO.Write PTransform . |
public static KafkaIO.Read<byte[],byte[]> readBytes()
KafkaIO.Read
PTransform
. Before use, basic Kafka
configuration should set with KafkaIO.Read.withBootstrapServers(String)
and
KafkaIO.Read.withTopics(List)
. Other optional settings include key and value
Deserializer
s, custom timestamp and watermark functions.public static <K,V> KafkaIO.Read<K,V> read()
KafkaIO.Read
PTransform
. Before use, basic Kafka
configuration should set with KafkaIO.Read.withBootstrapServers(String)
and
KafkaIO.Read.withTopics(List)
. Other optional settings include key and value
Deserializer
s, custom timestamp and watermark functions.public static <K,V> KafkaIO.Write<K,V> write()
KafkaIO.Write
PTransform
. Before use, Kafka configuration
should be set with KafkaIO.Write.withBootstrapServers(String)
and KafkaIO.Write.withTopic(java.lang.String)
along with Deserializer
s for (optional) key and values.