public abstract static class KafkaIO.WriteRecords<K,V> extends PTransform<PCollection<org.apache.kafka.clients.producer.ProducerRecord<K,V>>,PDone>
PTransform
to write to a Kafka topic with ProducerRecord's. See KafkaIO
for
more information on usage and configuration.name
Constructor and Description |
---|
WriteRecords() |
Modifier and Type | Method and Description |
---|---|
PDone |
expand(PCollection<org.apache.kafka.clients.producer.ProducerRecord<K,V>> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
void |
populateDisplayData(DisplayData.Builder builder)
Register display data for the given transform or component.
|
KafkaIO.WriteRecords<K,V> |
updateProducerProperties(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
Adds the given producer properties, overriding old values of properties with the same key.
|
void |
validate(PipelineOptions options)
Called before running the Pipeline to verify this transform is fully and correctly specified.
|
KafkaIO.WriteRecords<K,V> |
withBootstrapServers(java.lang.String bootstrapServers)
Returns a new
KafkaIO.Write transform with Kafka producer pointing to bootstrapServers . |
KafkaIO.WriteRecords<K,V> |
withConsumerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,? extends org.apache.kafka.clients.consumer.Consumer<?,?>> consumerFactoryFn)
When exactly-once semantics are enabled (see
withEOS(int, String) ), the sink needs
to fetch previously stored state with Kafka topic. |
KafkaIO.WriteRecords<K,V> |
withEOS(int numShards,
java.lang.String sinkGroupId)
Provides exactly-once semantics while writing to Kafka, which enables applications with
end-to-end exactly-once guarantees on top of exactly-once semantics within Beam
pipelines.
|
KafkaIO.WriteRecords<K,V> |
withInputTimestamp()
The timestamp for each record being published is set to timestamp of the element in the
pipeline.
|
KafkaIO.WriteRecords<K,V> |
withKeySerializer(java.lang.Class<? extends org.apache.kafka.common.serialization.Serializer<K>> keySerializer)
Sets a
Serializer for serializing key (if any) to bytes. |
KafkaIO.WriteRecords<K,V> |
withProducerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,org.apache.kafka.clients.producer.Producer<K,V>> producerFactoryFn)
Sets a custom function to create Kafka producer.
|
KafkaIO.WriteRecords<K,V> |
withPublishTimestampFunction(KafkaPublishTimestampFunction<org.apache.kafka.clients.producer.ProducerRecord<K,V>> timestampFunction)
Deprecated.
use
ProducerRecords to set publish timestamp. |
KafkaIO.WriteRecords<K,V> |
withTopic(java.lang.String topic)
Sets the default Kafka topic to write to.
|
KafkaIO.WriteRecords<K,V> |
withValueSerializer(java.lang.Class<? extends org.apache.kafka.common.serialization.Serializer<V>> valueSerializer)
Sets a
Serializer for serializing value to bytes. |
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, toString
public KafkaIO.WriteRecords<K,V> withBootstrapServers(java.lang.String bootstrapServers)
KafkaIO.Write
transform with Kafka producer pointing to bootstrapServers
.public KafkaIO.WriteRecords<K,V> withTopic(java.lang.String topic)
ProducerRecords
to set topic name per
published record.public KafkaIO.WriteRecords<K,V> withKeySerializer(java.lang.Class<? extends org.apache.kafka.common.serialization.Serializer<K>> keySerializer)
Serializer
for serializing key (if any) to bytes.
A key is optional while writing to Kafka. Note when a key is set, its hash is used to
determine partition in Kafka (see ProducerRecord
for more details).
public KafkaIO.WriteRecords<K,V> withValueSerializer(java.lang.Class<? extends org.apache.kafka.common.serialization.Serializer<V>> valueSerializer)
Serializer
for serializing value to bytes.public KafkaIO.WriteRecords<K,V> updateProducerProperties(java.util.Map<java.lang.String,java.lang.Object> configUpdates)
public KafkaIO.WriteRecords<K,V> withProducerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,org.apache.kafka.clients.producer.Producer<K,V>> producerFactoryFn)
KafkaProducer
public KafkaIO.WriteRecords<K,V> withInputTimestamp()
withPublishTimestampFunction((e, ts) -> ts)
. log.retention.hours
.@Deprecated public KafkaIO.WriteRecords<K,V> withPublishTimestampFunction(KafkaPublishTimestampFunction<org.apache.kafka.clients.producer.ProducerRecord<K,V>> timestampFunction)
ProducerRecords
to set publish timestamp.log.retention.hours
.public KafkaIO.WriteRecords<K,V> withEOS(int numShards, java.lang.String sinkGroupId)
Beam runners typically provide exactly-once semantics for results of a pipeline, but not for side effects from user code in transform. If a transform such as Kafka sink writes to an external system, those writes might occur more than once. When EOS is enabled here, the sink transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka (version 0.11+) to ensure a record is written only once. As the implementation relies on runners checkpoint semantics, not all the runners are compatible. The sink throws an exception during initialization if the runner is not whitelisted. Flink runner is one of the runners whose checkpoint semantics are not compatible with current implementation (hope to provide a solution in near future). Dataflow runner and Spark runners are whitelisted as compatible.
Note on performance: Exactly-once sink involves two shuffles of the records. In addition to cost of shuffling the records among workers, the records go through 2 serialization-deserialization cycles. Depending on volume and cost of serialization, the CPU cost might be noticeable. The CPU cost can be reduced by writing byte arrays (i.e. serializing them to byte before writing to Kafka sink).
numShards
- Sets sink parallelism. The state metadata stored on Kafka is stored across
this many virtual partitions using sinkGroupId
. A good rule of thumb is to set
this to be around number of partitions in Kafka topic.sinkGroupId
- The group id used to store small amount of state as metadata on
Kafka. It is similar to consumer group id used with a KafkaConsumer
. Each
job should use a unique group id so that restarts/updates of job preserve the state to
ensure exactly-once semantics. The state is committed atomically with sink transactions
on Kafka. See KafkaProducer.sendOffsetsToTransaction(Map, String)
for more
information. The sink performs multiple sanity checks during initialization to catch
common mistakes so that it does not end up using state that does not seem to be
written by the same job.public KafkaIO.WriteRecords<K,V> withConsumerFactoryFn(SerializableFunction<java.util.Map<java.lang.String,java.lang.Object>,? extends org.apache.kafka.clients.consumer.Consumer<?,?>> consumerFactoryFn)
withEOS(int, String)
), the sink needs
to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer.
Similar to KafkaIO.Read.withConsumerFactoryFn(SerializableFunction)
, a factory function can
be supplied if required in a specific case. The default is KafkaConsumer
.public PDone expand(PCollection<org.apache.kafka.clients.producer.ProducerRecord<K,V>> input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<org.apache.kafka.clients.producer.ProducerRecord<K,V>>,PDone>
public void validate(PipelineOptions options)
PTransform
By default, does nothing.
validate
in class PTransform<PCollection<org.apache.kafka.clients.producer.ProducerRecord<K,V>>,PDone>
public void populateDisplayData(DisplayData.Builder builder)
PTransform
populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect
display data via DisplayData.from(HasDisplayData)
. Implementations may call super.populateDisplayData(builder)
in order to register display data in the current namespace,
but should otherwise use subcomponent.populateDisplayData(builder)
to use the namespace
of the subcomponent.
By default, does not register any display data. Implementors may override this method to provide their own display data.
populateDisplayData
in interface HasDisplayData
populateDisplayData
in class PTransform<PCollection<org.apache.kafka.clients.producer.ProducerRecord<K,V>>,PDone>
builder
- The builder to populate with display data.HasDisplayData