public class KafkaResourceManager extends TestContainerResourceManager<GenericContainer<?>> implements org.apache.beam.it.common.ResourceManager
The class supports multiple topic names per server object. A topic is created if one has not been created already.
The topic name is formed using testId. The topic name will be "{testId}-{ISO8601 time, microsecond precision}", with additional formatting.
The class is thread-safe.
Modifier and Type | Class and Description |
---|---|
static class |
KafkaResourceManager.Builder
Builder for
KafkaResourceManager . |
HOST_IP, port
Modifier and Type | Method and Description |
---|---|
<K,V> KafkaConsumer<K,V> |
buildConsumer(Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Build a
KafkaConsumer for the given serializer and deserializers. |
static KafkaResourceManager.Builder |
builder(java.lang.String testId) |
<K,V> KafkaProducer<K,V> |
buildProducer(Serializer<K> keySerializer,
Serializer<V> valueSerializer)
Build a
KafkaProducer for the given serializer and deserializers. |
void |
cleanupAll()
Deletes all created resources and cleans up the Kafka client, making the manager object
unusable.
|
java.lang.String |
createTopic(java.lang.String topicName,
int partitions)
Creates a kafka topic.
|
java.lang.String |
getBootstrapServers()
Returns the kafka bootstrap server connection string.
|
java.util.Set<java.lang.String> |
getTopicNames()
Returns a list of names of the topics that this kafka manager will operate in.
|
getDockerImageName, getHost, getPort
public static KafkaResourceManager.Builder builder(java.lang.String testId)
public java.lang.String getBootstrapServers()
public java.util.Set<java.lang.String> getTopicNames()
public java.lang.String createTopic(java.lang.String topicName, int partitions) throws KafkaResourceManagerException
Note: Implementations may do topic creation here, if one does not already exist.
topicName
- Topic name to associate with the given kafka instance.partitions
- Number of partitions on the topic.KafkaResourceManagerException
- if there is an error creating the kafka topic.public <K,V> KafkaProducer<K,V> buildProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer)
KafkaProducer
for the given serializer and deserializers.public <K,V> KafkaConsumer<K,V> buildConsumer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
KafkaConsumer
for the given serializer and deserializers.public void cleanupAll() throws KafkaResourceManagerException
cleanupAll
in interface org.apache.beam.it.common.ResourceManager
cleanupAll
in class TestContainerResourceManager<GenericContainer<?>>
KafkaResourceManagerException
- if there is an error deleting the Kafka resources.