@Experimental(value=SOURCE_SINK) public class KafkaIO extends java.lang.Object
UnboundedSourceKafkaIO source returns unbounded collection of Kafka records as PCollection<KafkaRecord<K, V>>. A KafkaRecord includes basic metadata like
 topic-partition and offset, along with key and value associated with a Kafka record.
 
Although most applications consume a single topic, the source can be configured to consume
 multiple topics or even a specific set of TopicPartitions.
 
To configure a Kafka source, you must specify at the minimum Kafka bootstrapServers, one or more topics to consume, and key and value deserializers. For example:
 pipeline
   .apply(KafkaIO.<Long, String>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
      .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializer(StringDeserializer.class)
      // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
      // Rest of the settings are optional :
      // you can further customize KafkaConsumer used to read the records by adding more
      // settings for ConsumerConfig. e.g :
      .withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
      // set event times and watermark based on 'LogAppendTime'. To provide a custom
      // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
      // Use withCreateTime() with topics that have 'CreateTime' timestamps.
      .withLogAppendTime()
      // restrict reader to committed messages on Kafka (see method documentation).
      .withReadCommitted()
      // offset consumed by the pipeline can be committed back.
      .commitOffsetsInFinalize()
      // Specified a serializable function which can determine whether to stop reading from given
      // TopicPartition during runtime. Note that only {@link ReadFromKafkaDoFn} respect the
      // signal.
      .withCheckStopReadingFn(new SerializedFunction<TopicPartition, Boolean>() {})
      // finally, if you don't need Kafka metadata, you can drop it.g
      .withoutMetadata() // PCollection<KV<Long, String>>
   )
   .apply(Values.<String>create()) // PCollection<String>
    ...
 Kafka provides deserializers for common types in org.apache.kafka.common.serialization. In addition to deserializers, Beam runners need Coder to materialize key and value objects if necessary. In most cases, you don't need to
 specify Coder for key and value in the resulting collection because the coders are
 inferred from deserializer types. However, in cases when coder inference fails, they can be
 specified explicitly along with deserializers using KafkaIO.Read.withKeyDeserializerAndCoder(Class, Coder) and KafkaIO.Read.withValueDeserializerAndCoder(Class, Coder). Note that Kafka messages are interpreted using
 key and value deserializers.
 
TopicPartition dynamically and stop reading from un. KafkaIO uses WatchKafkaTopicPartitionDoFn to emit any new added TopicPartition and uses ReadFromKafkaDoFn to read from each KafkaSourceDescriptor. Dynamic read is able to solve
 2 scenarios:
 checkStopReadingFn, there are 2 more cases that dynamic read can handle:
 WatchKafkaTopicPartitionDoFn and ReadFromKafkaDoFn react to the signal from removed/stopped TopicPartition but we cannot
 guarantee that both DoFns perform related actions at the same time.
 Here is one example for failing to emit new added TopicPartition:
 
WatchKafkaTopicPartitionDoFn is configured with updating the current tracking set
       every 1 hour.
   WatchKafkaTopicPartitionDoFn at 10:00AM and
       ReadFromKafkaDoFn starts to read from TopicPartition A immediately.
   WatchKafkaTopicPartitionDoFn notices that the TopicPartition has been stopped/removed, so it stops reading from it and returns ProcessContinuation.stop().
   WatchKafkaTopicPartitionDoFn is invoked by firing timer, it doesn’t
       know that TopicPartition A has been stopped/removed. All it knows is that TopicPartition A
       is still an active TopicPartition and it will not emit TopicPartition A again.
 ReadFromKafkaDoFn is processing TopicPartition A
   ReadFromKafkaDoFn starts to process other TopicPartitions(sdf-initiated
       checkpoint or runner-issued checkpoint happens)
   WatchKafkaTopicPartitionDoFn knows that TopicPartition A is
       stopped/removed
   WatchKafkaTopicPartitionDoFn knows that TopicPartition A is added again
       and emits TopicPartition A again
   ReadFromKafkaDoFn starts to process resumed TopicPartition A but at the
       same time ReadFromKafkaDoFn is also processing the new emitted TopicPartitionA.
 
 pipeline
   .apply(KafkaIO.<Long, String>read()
      // Configure the dynamic read with 1 hour, where the pipeline will look into available
      // TopicPartitions and emit new added ones every 1 hour.
      .withDynamicRead(Duration.standardHours(1))
      .withCheckStopReadingFn(new SerializedFunction<TopicPartition, Boolean>() {})
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializer(StringDeserializer.class)
   )
   .apply(Values.<String>create()) // PCollection<String>
    ...
 Checkpointing is fully supported and each split can resume from previous checkpoint (to the
 extent supported by runner). See KafkaUnboundedSource.split(int, PipelineOptions) for
 more details on splits and checkpoint support.
 
When the pipeline starts for the first time, or without any checkpoint, the source starts
 consuming from the latest offsets. You can override this behavior to consume from the
 beginning by setting properties appropriately in ConsumerConfig, through KafkaIO.Read.withConsumerConfigUpdates(Map). You can also enable offset auto_commit in Kafka to resume
 from last committed.
 
In summary, KafkaIO.read follows below sequence to set initial offset:
 1. KafkaCheckpointMark provided by runner;
 2. Consumer offset stored in Kafka when ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true;
 
 3. Start from latest offset by default;
 
Seek to initial offset is a blocking operation in Kafka API, which can block forever for certain versions of Kafka client library. This is resolved by KIP-266 which provides `default.api.timeout.ms` consumer config setting to control such timeouts. KafkaIO.read implements timeout itself, to not to block forever in case older Kafka client is used. It does recognize `default.api.timeout.ms` setting and will honor the timeout value if it is passes in consumer config.
If you want to deserialize the keys and/or values based on a schema available in Confluent
 Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
 for deserialization. A Coder will be inferred automatically based on the respective
 Deserializer.
 
For an Avro schema it will return a PCollection of KafkaRecords where key
 and/or value will be typed as GenericRecord. In this case, users
 don't need to specify key or/and value deserializers and coders since they will be set to KafkaAvroDeserializer and AvroCoder by default accordingly.
 
For example, below topic values are serialized with Avro schema stored in Schema Registry,
 keys are typed as Long:
 
 PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
   .apply(KafkaIO.<Long, GenericRecord>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")
      .withKeyDeserializer(LongDeserializer.class)
      // Use Confluent Schema Registry, specify schema registry URL and value subject
      .withValueDeserializer(
          ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
    ...
 You can also pass properties to the schema registry client allowing you to configure authentication
 ImmutableMap<String, Object> csrConfig =
     ImmutableMap.<String, Object>builder()
         .put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,"USER_INFO")
         .put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,"<username>:<password>")
         .build();
 PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
   .apply(KafkaIO.<Long, GenericRecord>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")
      .withKeyDeserializer(LongDeserializer.class)
      // Use Confluent Schema Registry, specify schema registry URL, value subject and schema registry client configuration
      .withValueDeserializer(
          ConfluentSchemaRegistryDeserializerProvider.of("https://localhost:8081", "my_topic-value", null, csrConfig))
    ...
 DoFnKafkaIO.ReadSourceDescriptors is the PTransform that takes a PCollection of KafkaSourceDescriptor as input and outputs a PCollection of KafkaRecord. The core
 implementation is based on SplittableDoFn. For more details about the concept of SplittableDoFn, please refer to the blog post and design doc. The major difference from KafkaIO.Read is, KafkaIO.ReadSourceDescriptors doesn't require source descriptions(e.g., KafkaIO.Read.getTopicPartitions(), KafkaIO.Read.getTopics(), KafkaIO.Read.getStartReadTime(), etc.) during the pipeline construction time. Instead, the
 pipeline can populate these source descriptions during runtime. For example, the pipeline can
 query Kafka topics from a BigQuery table and read these topics via KafkaIO.ReadSourceDescriptors.
 Most Kafka consumer configurations are similar to KafkaIO.Read:
 
KafkaIO.ReadSourceDescriptors.getConsumerConfig() is the same as KafkaIO.Read.getConsumerConfig().
   KafkaIO.ReadSourceDescriptors.getConsumerFactoryFn() is the same as KafkaIO.Read.getConsumerFactoryFn().
   KafkaIO.ReadSourceDescriptors.getOffsetConsumerConfig() is the same as KafkaIO.Read.getOffsetConsumerConfig().
   KafkaIO.ReadSourceDescriptors.getKeyCoder() is the same as KafkaIO.Read.getKeyCoder().
   KafkaIO.ReadSourceDescriptors.getValueCoder() is the same as KafkaIO.Read.getValueCoder().
   KafkaIO.ReadSourceDescriptors.getKeyDeserializerProvider() is the same as KafkaIO.Read.getKeyDeserializerProvider().
   KafkaIO.ReadSourceDescriptors.getValueDeserializerProvider() is the same as KafkaIO.Read.getValueDeserializerProvider().
   KafkaIO.ReadSourceDescriptors.isCommitOffsetEnabled() has the same meaning as KafkaIO.Read.isCommitOffsetsInFinalizeEnabled().
 For example, to create a basic KafkaIO.ReadSourceDescriptors transform:
 
 pipeline
  .apply(Create.of(KafkaSourceDescriptor.of(new TopicPartition("topic", 1)))
  .apply(KafkaIO.readAll()
          .withBootstrapServers("broker_1:9092,broker_2:9092")
          .withKeyDeserializer(LongDeserializer.class).
          .withValueDeserializer(StringDeserializer.class));
 bootstrapServers can also be populated from the KafkaSourceDescriptor:
 
 pipeline
  .apply(Create.of(
    KafkaSourceDescriptor.of(
      new TopicPartition("topic", 1),
      null,
      null,
      ImmutableList.of("broker_1:9092", "broker_2:9092"))
  .apply(KafkaIO.readAll()
         .withKeyDeserializer(LongDeserializer.class).
         .withValueDeserializer(StringDeserializer.class));
 KafkaIO.ReadSourceDescriptorsExcept configurations of Kafka Consumer, there are some other configurations which are related to processing records.
KafkaIO.ReadSourceDescriptors.commitOffsets() enables committing offset after processing the
 record. Note that if the isolation.level is set to "read_committed" or ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG is set in the consumer config, the KafkaIO.ReadSourceDescriptors.commitOffsets() will be ignored.
 
KafkaIO.ReadSourceDescriptors.withExtractOutputTimestampFn(SerializableFunction) is used to
 compute the output timestamp for a given KafkaRecord and controls the watermark
 advancement. There are three built-in types:
 
KafkaIO.ReadSourceDescriptors.withProcessingTime()
   KafkaIO.ReadSourceDescriptors.withCreateTime()
   KafkaIO.ReadSourceDescriptors.withLogAppendTime()
 For example, to create a KafkaIO.ReadSourceDescriptors with this additional configuration:
 
 pipeline
 .apply(Create.of(
    KafkaSourceDescriptor.of(
      new TopicPartition("topic", 1),
      null,
      null,
      ImmutableList.of("broker_1:9092", "broker_2:9092"))
 .apply(KafkaIO.readAll()
          .withKeyDeserializer(LongDeserializer.class).
          .withValueDeserializer(StringDeserializer.class)
          .withProcessingTime()
          .commitOffsets());
 KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the
 values or native Kafka producer records using ProducerRecord. To configure a Kafka sink, you must specify at
 the minimum Kafka bootstrapServers, the topic to write to, and key and value
 serializers. For example:
 
 PCollection<KV<Long, String>> kvColl = ...;
 kvColl.apply(KafkaIO.<Long, String>write()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("results")
      .withKeySerializer(LongSerializer.class)
      .withValueSerializer(StringSerializer.class)
      // You can further customize KafkaProducer used to write the records by adding more
      // settings for ProducerConfig. e.g, to enable compression :
      .withProducerConfigUpdates(ImmutableMap.of("compression.type", "gzip"))
      // You set publish timestamp for the Kafka records.
      .withInputTimestamp() // element timestamp is used while publishing to Kafka
      // or you can also set a custom timestamp with a function.
      .withPublishTimestampFunction((elem, elemTs) -> ...)
      // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
      .withEOS(20, "eos-sink-group-id");
   );
 Often you might want to write just values without any keys to Kafka. Use values() to
 write records with default empty(null) key:
 
 PCollection<String> strings = ...;
 strings.apply(KafkaIO.<Void, String>write()
     .withBootstrapServers("broker_1:9092,broker_2:9092")
     .withTopic("results")
     .withValueSerializer(StringSerializer.class) // just need serializer for value
     .values()
   );
 Also, if you want to write Kafka ProducerRecord then you should use writeRecords():
 
 PCollection<ProducerRecord<Long, String>> records = ...;
 records.apply(KafkaIO.<Long, String>writeRecords()
     .withBootstrapServers("broker_1:9092,broker_2:9092")
     .withTopic("results")
     .withKeySerializer(LongSerializer.class)
     .withValueSerializer(StringSerializer.class)
   );
 ConsumerConfig for source or in ProducerConfig for sink. E.g. if you would like to enable offset auto commit (for
 external monitoring or other purposes), you can set "group.id",
 "enable.auto.commit", etc.
 KafkaIO.Read.withLogAppendTime(). A custom timestamp
 policy can be provided by implementing TimestampPolicyFactory. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.
 | Modifier and Type | Class and Description | 
|---|---|
| static class  | KafkaIO.Read<K,V>A  PTransformto read from Kafka topics. | 
| static class  | KafkaIO.ReadSourceDescriptors<K,V>A  PTransformto read fromKafkaSourceDescriptor. | 
| static class  | KafkaIO.TypedWithoutMetadata<K,V>A  PTransformto read from Kafka topics. | 
| static class  | KafkaIO.Write<K,V>A  PTransformto write to a Kafka topic with KVs . | 
| static class  | KafkaIO.WriteRecords<K,V>A  PTransformto write to a Kafka topic with ProducerRecord's. | 
| Modifier and Type | Method and Description | 
|---|---|
| static <K,V> KafkaIO.Read<K,V> | read()Creates an uninitialized  KafkaIO.ReadPTransform. | 
| static KafkaIO.Read<byte[],byte[]> | readBytes()A specific instance of uninitialized  read()where key and values are bytes. | 
| static <K,V> KafkaIO.ReadSourceDescriptors<K,V> | readSourceDescriptors()Creates an uninitialized  KafkaIO.ReadSourceDescriptorsPTransform. | 
| static <K,V> KafkaIO.Write<K,V> | write()Creates an uninitialized  KafkaIO.WritePTransform. | 
| static <K,V> KafkaIO.WriteRecords<K,V> | writeRecords()Creates an uninitialized  KafkaIO.WriteRecordsPTransform. | 
public static KafkaIO.Read<byte[],byte[]> readBytes()
read() where key and values are bytes. See
 #read().public static <K,V> KafkaIO.Read<K,V> read()
KafkaIO.Read PTransform. Before use, basic Kafka configuration
 should set with KafkaIO.Read.withBootstrapServers(String) and KafkaIO.Read.withTopics(List).
 Other optional settings include key and value Deserializers, custom timestamp,
 watermark functions.public static <K,V> KafkaIO.ReadSourceDescriptors<K,V> readSourceDescriptors()
KafkaIO.ReadSourceDescriptors PTransform. Different from
 KafkaIO.Read, setting up topics and bootstrapServers is not required during
 construction time. But the bootstrapServers still can be configured KafkaIO.ReadSourceDescriptors.withBootstrapServers(String). Please refer to KafkaIO.ReadSourceDescriptors for more details.public static <K,V> KafkaIO.Write<K,V> write()
KafkaIO.Write PTransform. Before use, Kafka configuration
 should be set with KafkaIO.Write.withBootstrapServers(String) and KafkaIO.Write.withTopic(java.lang.String) along
 with Deserializers for (optional) key and values.public static <K,V> KafkaIO.WriteRecords<K,V> writeRecords()
KafkaIO.WriteRecords PTransform. Before use, Kafka
 configuration should be set with KafkaIO.WriteRecords.withBootstrapServers(String) and KafkaIO.WriteRecords.withTopic(java.lang.String) along with Deserializers for (optional) key and values.