@Experimental(value=SOURCE_SINK) public class KafkaIO extends java.lang.Object
KafkaIO source returns unbounded collection of Kafka records as
 PCollection<KafkaRecord<K, V>>. A KafkaRecord includes basic
 metadata like topic-partition and offset, along with key and value associated with a Kafka
 record.
 
Although most applications consume a single topic, the source can be configured to consume
 multiple topics or even a specific set of TopicPartitions.
 
To configure a Kafka source, you must specify at the minimum Kafka bootstrapServers, one or more topics to consume, and key and value deserializers. For example:
  pipeline
    .apply(KafkaIO.<Long, String>read()
       .withBootstrapServers("broker_1:9092,broker_2:9092")
       .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
       .withKeyDeserializer(LongDeserializer.class)
       .withValueDeserializer(StringDeserializer.class)
       // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
       // Rest of the settings are optional :
       // you can further customize KafkaConsumer used to read the records by adding more
       // settings for ConsumerConfig. e.g :
       .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
       // set event times and watermark based on LogAppendTime. To provide a custom
       // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
       .withLogAppendTime()
       // restrict reader to committed messages on Kafka (see method documentation).
       .withReadCommitted()
       // offset consumed by the pipeline can be committed back.
       .commitOffsetsInFinalize()
       // finally, if you don't need Kafka metadata, you can drop it.g
       .withoutMetadata() // PCollection<KV<Long, String>>
    )
    .apply(Values.<String>create()) // PCollection<String>
     ...
 Kafka provides deserializers for common types in
 org.apache.kafka.common.serialization. In addition to deserializers, Beam runners need
 Coder to materialize key and value objects if necessary.
 In most cases, you don't need to specify Coder for key and value in the resulting
 collection because the coders are inferred from deserializer types. However, in cases when
 coder inference fails, they can be specified explicitly along with deserializers using
 KafkaIO.Read.withKeyDeserializerAndCoder(Class, Coder) and
 KafkaIO.Read.withValueDeserializerAndCoder(Class, Coder). Note that Kafka messages are
 interpreted using key and value deserializers.
 
Checkpointing is fully supported and each split can resume from previous checkpoint
 (to the extent supported by runner).
 See KafkaUnboundedSource.split(int, PipelineOptions) for more details on
 splits and checkpoint support.
 
When the pipeline starts for the first time, or without any checkpoint, the source starts
 consuming from the latest offsets. You can override this behavior to consume from the
 beginning by setting appropriate appropriate properties in ConsumerConfig, through
 KafkaIO.Read.updateConsumerProperties(Map).
 You can also enable offset auto_commit in Kafka to resume from last committed.
 
In summary, KafkaIO.read follows below sequence to set initial offset:
 1. KafkaCheckpointMark provided by runner;
 2. Consumer offset stored in Kafka when
 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true;
 3. Start from latest offset by default;
 
KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the values. To configure a Kafka sink, you must specify at the minimum Kafka bootstrapServers, the topic to write to, and key and value serializers. For example:
  PCollection<KV<Long, String>> kvColl = ...;
  kvColl.apply(KafkaIO.<Long, String>write()
       .withBootstrapServers("broker_1:9092,broker_2:9092")
       .withTopic("results")
       .withKeySerializer(LongSerializer.class)
       .withValueSerializer(StringSerializer.class)
       // you can further customize KafkaProducer used to write the records by adding more
       // settings for ProducerConfig. e.g, to enable compression :
       .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
       // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
       .withEOS(20, "eos-sink-group-id");
    );
 Often you might want to write just values without any keys to Kafka. Use values() to
 write records with default empty(null) key:
 
  PCollection<String> strings = ...;
  strings.apply(KafkaIO.<Void, String>write()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("results")
      .withValueSerializer(StringSerializer.class) // just need serializer for value
      .values()
    );
 ConsumerConfig for source or in
 ProducerConfig for sink. E.g. if you would like to enable offset
 auto commit (for external monitoring or other purposes), you can set
 "group.id", "enable.auto.commit", etc.
 KafkaIO.Read.withLogAppendTime().
 A custom timestamp policy can be provided by implementing TimestampPolicyFactory. See
 KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.| Modifier and Type | Class and Description | 
|---|---|
| static class  | KafkaIO.Read<K,V>A  PTransformto read from Kafka topics. | 
| static class  | KafkaIO.TypedWithoutMetadata<K,V>A  PTransformto read from Kafka topics. | 
| static class  | KafkaIO.Write<K,V>A  PTransformto write to a Kafka topic. | 
| Modifier and Type | Method and Description | 
|---|---|
| static <K,V> KafkaIO.Read<K,V> | read()Creates an uninitialized  KafkaIO.ReadPTransform. | 
| static KafkaIO.Read<byte[],byte[]> | readBytes()Creates an uninitialized  KafkaIO.ReadPTransform. | 
| static <K,V> KafkaIO.Write<K,V> | write()Creates an uninitialized  KafkaIO.WritePTransform. | 
public static KafkaIO.Read<byte[],byte[]> readBytes()
KafkaIO.Read PTransform. Before use, basic Kafka
 configuration should set with KafkaIO.Read.withBootstrapServers(String) and
 KafkaIO.Read.withTopics(List). Other optional settings include key and value
 Deserializers, custom timestamp and watermark functions.public static <K,V> KafkaIO.Read<K,V> read()
KafkaIO.Read PTransform. Before use, basic Kafka
 configuration should set with KafkaIO.Read.withBootstrapServers(String) and
 KafkaIO.Read.withTopics(List). Other optional settings include key and value
 Deserializers, custom timestamp and watermark functions.public static <K,V> KafkaIO.Write<K,V> write()
KafkaIO.Write PTransform. Before use, Kafka configuration
 should be set with KafkaIO.Write.withBootstrapServers(String) and KafkaIO.Write.withTopic(java.lang.String)
 along with Deserializers for (optional) key and values.