Class MqttIO
Reading from a MQTT broker
MqttIO source returns an unbounded PCollection containing MQTT message payloads (as
byte[]).
To configure a MQTT source, you have to provide a MQTT connection configuration including
ClientId, a ServerURI, a Topic pattern, and optionally username
and password to connect to the MQTT broker. The following example illustrates various
options for configuring the source:
pipeline.apply(
MqttIO.read()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:11883",
"my_topic"))
Reading with Metadata from a MQTT broker
The readWithMetadata method extends the functionality of the basic read method
by returning a PCollection of metadata that includes both the topic name and the payload.
The metadata is encapsulated in a container class MqttRecord that includes the topic name
and payload. This allows you to implement business logic that can differ depending on the topic
from which the message was received.
PCollection<MqttRecord> records = pipeline.apply(
MqttIO.readWithMetadata()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:11883",
"my_topic_pattern"))
By using the topic information, you can apply different processing logic depending on the source topic, enhancing the flexibility of message processing.
Example
pipeline
.apply(MqttIO.readWithMetadata()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:1883", "my_topic_pattern")))
.apply(ParDo.of(new DoFn<MqttRecord, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
MqttRecord record = c.element();
String topic = record.getTopic();
byte[] payload = record.getPayload();
// Apply business logic based on the topic
if (topic.equals("important_topic")) {
// Special processing for important_topic
}
}
}));
Writing to a MQTT broker
MqttIO sink supports writing byte[] to a topic on a MQTT broker.
To configure a MQTT sink, as for the read, you have to specify a MQTT connection configuration
with ServerURI, Topic, ...
The MqttIO only fully supports QoS 1 (at least once). It's the only QoS level guaranteed due to potential retries on bundles.
For instance:
pipeline
.apply(...) // provide PCollection<byte[]>
.MqttIO.write()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:11883",
"my_topic"))
Dynamic Writing to a MQTT Broker
MqttIO also supports dynamic writing to multiple topics based on the data. You can specify a function to determine the target topic for each message. The following example demonstrates how to configure dynamic topic writing:
pipeline
.apply(...) // Provide PCollection<InputT>
.apply(
MqttIO.<InputT>dynamicWrite()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create("tcp://host:11883"))
.withTopicFn(<Function to determine the topic dynamically>)
.withPayloadFn(<Function to extract the payload>));
This dynamic writing capability allows for more flexible MQTT message routing based on the message content, enabling scenarios where messages are directed to different topics.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classA POJO describing a MQTT connection.static classAPTransformto read from a MQTT broker.static classAPTransformto write and send a message to a MQTT server. -
Method Summary
Modifier and TypeMethodDescriptionstatic <InputT> MqttIO.Write<InputT> static MqttIO.Read<byte[]> read()static MqttIO.Read<MqttRecord> static MqttIO.Write<byte[]> write()
-
Method Details
-
read
-
readWithMetadata
-
write
-
dynamicWrite
-