public class KafkaResourceManager extends TestContainerResourceManager<GenericContainer<?>> implements org.apache.beam.it.common.ResourceManager
The class supports multiple topic names per server object. A topic is created if one has not been created already.
The topic name is formed using testId. The topic name will be "{testId}-{ISO8601 time, microsecond precision}", with additional formatting.
The class is thread-safe.
| Modifier and Type | Class and Description |
|---|---|
static class |
KafkaResourceManager.Builder
Builder for
KafkaResourceManager. |
HOST_IP, port| Modifier and Type | Method and Description |
|---|---|
<K,V> KafkaConsumer<K,V> |
buildConsumer(Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Build a
KafkaConsumer for the given serializer and deserializers. |
static KafkaResourceManager.Builder |
builder(java.lang.String testId) |
<K,V> KafkaProducer<K,V> |
buildProducer(Serializer<K> keySerializer,
Serializer<V> valueSerializer)
Build a
KafkaProducer for the given serializer and deserializers. |
void |
cleanupAll()
Deletes all created resources and cleans up the Kafka client, making the manager object
unusable.
|
java.lang.String |
createTopic(java.lang.String topicName,
int partitions)
Creates a kafka topic.
|
java.lang.String |
getBootstrapServers()
Returns the kafka bootstrap server connection string.
|
java.util.Set<java.lang.String> |
getTopicNames()
Returns a list of names of the topics that this kafka manager will operate in.
|
getDockerImageName, getHost, getPortpublic static KafkaResourceManager.Builder builder(java.lang.String testId)
public java.lang.String getBootstrapServers()
public java.util.Set<java.lang.String> getTopicNames()
public java.lang.String createTopic(java.lang.String topicName,
int partitions)
throws KafkaResourceManagerException
Note: Implementations may do topic creation here, if one does not already exist.
topicName - Topic name to associate with the given kafka instance.partitions - Number of partitions on the topic.KafkaResourceManagerException - if there is an error creating the kafka topic.public <K,V> KafkaProducer<K,V> buildProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer)
KafkaProducer for the given serializer and deserializers.public <K,V> KafkaConsumer<K,V> buildConsumer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
KafkaConsumer for the given serializer and deserializers.public void cleanupAll()
throws KafkaResourceManagerException
cleanupAll in interface org.apache.beam.it.common.ResourceManagercleanupAll in class TestContainerResourceManager<GenericContainer<?>>KafkaResourceManagerException - if there is an error deleting the Kafka resources.