@Experimental(value=SOURCE_SINK) public class KafkaIO extends java.lang.Object
KafkaIO source returns unbounded collection of Kafka records as PCollection<KafkaRecord<K, V>>. A KafkaRecord includes basic metadata like
 topic-partition and offset, along with key and value associated with a Kafka record.
 
Although most applications consume a single topic, the source can be configured to consume
 multiple topics or even a specific set of TopicPartitions.
 
To configure a Kafka source, you must specify at the minimum Kafka bootstrapServers, one or more topics to consume, and key and value deserializers. For example:
 pipeline
   .apply(KafkaIO.<Long, String>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
      .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializer(StringDeserializer.class)
      // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
      // Rest of the settings are optional :
      // you can further customize KafkaConsumer used to read the records by adding more
      // settings for ConsumerConfig. e.g :
      .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
      // set event times and watermark based on 'LogAppendTime'. To provide a custom
      // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
      // Use withCreateTime() with topics that have 'CreateTime' timestamps.
      .withLogAppendTime()
      // restrict reader to committed messages on Kafka (see method documentation).
      .withReadCommitted()
      // offset consumed by the pipeline can be committed back.
      .commitOffsetsInFinalize()
      // finally, if you don't need Kafka metadata, you can drop it.g
      .withoutMetadata() // PCollection<KV<Long, String>>
   )
   .apply(Values.<String>create()) // PCollection<String>
    ...
 Kafka provides deserializers for common types in org.apache.kafka.common.serialization. In addition to deserializers, Beam runners need Coder to materialize key and value objects if necessary. In most cases, you don't need to
 specify Coder for key and value in the resulting collection because the coders are
 inferred from deserializer types. However, in cases when coder inference fails, they can be
 specified explicitly along with deserializers using KafkaIO.Read.withKeyDeserializerAndCoder(Class, Coder) and KafkaIO.Read.withValueDeserializerAndCoder(Class, Coder). Note that Kafka messages are interpreted using
 key and value deserializers.
 
Checkpointing is fully supported and each split can resume from previous checkpoint (to the
 extent supported by runner). See KafkaUnboundedSource.split(int, PipelineOptions) for
 more details on splits and checkpoint support.
 
When the pipeline starts for the first time, or without any checkpoint, the source starts
 consuming from the latest offsets. You can override this behavior to consume from the
 beginning by setting appropriate appropriate properties in ConsumerConfig, through KafkaIO.Read.updateConsumerProperties(Map). You can also enable offset auto_commit in Kafka to resume
 from last committed.
 
In summary, KafkaIO.read follows below sequence to set initial offset:
 1. KafkaCheckpointMark provided by runner;
 2. Consumer offset stored in Kafka when ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true;
 
 3. Start from latest offset by default;
 
KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the
 values or native Kafka producer records using ProducerRecord. To configure a Kafka sink, you must specify at
 the minimum Kafka bootstrapServers, the topic to write to, and key and value
 serializers. For example:
 
 PCollection<KV<Long, String>> kvColl = ...;
 kvColl.apply(KafkaIO.<Long, String>write()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("results")
      .withKeySerializer(LongSerializer.class)
      .withValueSerializer(StringSerializer.class)
      // You can further customize KafkaProducer used to write the records by adding more
      // settings for ProducerConfig. e.g, to enable compression :
      .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
      // You set publish timestamp for the Kafka records.
      .withInputTimestamp() // element timestamp is used while publishing to Kafka
      // or you can also set a custom timestamp with a function.
      .withPublishTimestampFunction((elem, elemTs) -> ...)
      // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
      .withEOS(20, "eos-sink-group-id");
   );
 Often you might want to write just values without any keys to Kafka. Use values() to
 write records with default empty(null) key:
 
 PCollection<String> strings = ...;
 strings.apply(KafkaIO.<Void, String>write()
     .withBootstrapServers("broker_1:9092,broker_2:9092")
     .withTopic("results")
     .withValueSerializer(StringSerializer.class) // just need serializer for value
     .values()
   );
 Also, if you want to write Kafka ProducerRecord then you should use writeRecords():
 
 PCollection<ProducerRecord<Long, String>> records = ...;
 records.apply(KafkaIO.<Long, String>writeRecords()
     .withBootstrapServers("broker_1:9092,broker_2:9092")
     .withTopic("results")
     .withKeySerializer(LongSerializer.class)
     .withValueSerializer(StringSerializer.class)
   );
 ConsumerConfig for source or in ProducerConfig for sink. E.g. if you would like to enable offset auto commit (for
 external monitoring or other purposes), you can set "group.id",
 "enable.auto.commit", etc.
 KafkaIO.Read.withLogAppendTime(). A custom timestamp
 policy can be provided by implementing TimestampPolicyFactory. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.
 | Modifier and Type | Class and Description | 
|---|---|
| static class  | KafkaIO.Read<K,V>A  PTransformto read from Kafka topics. | 
| static class  | KafkaIO.TypedWithoutMetadata<K,V>A  PTransformto read from Kafka topics. | 
| static class  | KafkaIO.Write<K,V>A  PTransformto write to a Kafka topic with KVs . | 
| static class  | KafkaIO.WriteRecords<K,V>A  PTransformto write to a Kafka topic with ProducerRecord's. | 
| Modifier and Type | Method and Description | 
|---|---|
| static <K,V> KafkaIO.Read<K,V> | read()Creates an uninitialized  KafkaIO.ReadPTransform. | 
| static KafkaIO.Read<byte[],byte[]> | readBytes()A specific instance of uninitialized  read()where key and values are bytes. | 
| static <K,V> KafkaIO.Write<K,V> | write()Creates an uninitialized  KafkaIO.WritePTransform. | 
| static <K,V> KafkaIO.WriteRecords<K,V> | writeRecords()Creates an uninitialized  KafkaIO.WriteRecordsPTransform. | 
public static KafkaIO.Read<byte[],byte[]> readBytes()
read() where key and values are bytes. See
 #read().public static <K,V> KafkaIO.Read<K,V> read()
KafkaIO.Read PTransform. Before use, basic Kafka configuration
 should set with KafkaIO.Read.withBootstrapServers(String) and KafkaIO.Read.withTopics(List).
 Other optional settings include key and value Deserializers, custom timestamp and
 watermark functions.public static <K,V> KafkaIO.Write<K,V> write()
KafkaIO.Write PTransform. Before use, Kafka configuration
 should be set with KafkaIO.Write.withBootstrapServers(String) and KafkaIO.Write.withTopic(java.lang.String) along
 with Deserializers for (optional) key and values.public static <K,V> KafkaIO.WriteRecords<K,V> writeRecords()
KafkaIO.WriteRecords PTransform. Before use, Kafka
 configuration should be set with KafkaIO.WriteRecords.withBootstrapServers(String) and KafkaIO.WriteRecords.withTopic(java.lang.String) along with Deserializers for (optional) key and values.