Class MqttIO
Reading from a MQTT broker
MqttIO source returns an unbounded PCollection
containing MQTT message payloads (as
byte[]
).
To configure a MQTT source, you have to provide a MQTT connection configuration including
ClientId
, a ServerURI
, a Topic
pattern, and optionally username
and password
to connect to the MQTT broker. The following example illustrates various
options for configuring the source:
pipeline.apply(
MqttIO.read()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:11883",
"my_topic"))
Reading with Metadata from a MQTT broker
The readWithMetadata
method extends the functionality of the basic read
method
by returning a PCollection
of metadata that includes both the topic name and the payload.
The metadata is encapsulated in a container class MqttRecord
that includes the topic name
and payload. This allows you to implement business logic that can differ depending on the topic
from which the message was received.
PCollection<MqttRecord> records = pipeline.apply(
MqttIO.readWithMetadata()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:11883",
"my_topic_pattern"))
By using the topic information, you can apply different processing logic depending on the source topic, enhancing the flexibility of message processing.
Example
pipeline
.apply(MqttIO.readWithMetadata()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:1883", "my_topic_pattern")))
.apply(ParDo.of(new DoFn<MqttRecord, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
MqttRecord record = c.element();
String topic = record.getTopic();
byte[] payload = record.getPayload();
// Apply business logic based on the topic
if (topic.equals("important_topic")) {
// Special processing for important_topic
}
}
}));
Writing to a MQTT broker
MqttIO sink supports writing byte[]
to a topic on a MQTT broker.
To configure a MQTT sink, as for the read, you have to specify a MQTT connection configuration
with ServerURI
, Topic
, ...
The MqttIO only fully supports QoS 1 (at least once). It's the only QoS level guaranteed due to potential retries on bundles.
For instance:
pipeline
.apply(...) // provide PCollection<byte[]>
.MqttIO.write()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:11883",
"my_topic"))
Dynamic Writing to a MQTT Broker
MqttIO also supports dynamic writing to multiple topics based on the data. You can specify a function to determine the target topic for each message. The following example demonstrates how to configure dynamic topic writing:
pipeline
.apply(...) // Provide PCollection<InputT>
.apply(
MqttIO.<InputT>dynamicWrite()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create("tcp://host:11883"))
.withTopicFn(<Function to determine the topic dynamically>)
.withPayloadFn(<Function to extract the payload>));
This dynamic writing capability allows for more flexible MQTT message routing based on the message content, enabling scenarios where messages are directed to different topics.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
A POJO describing a MQTT connection.static class
APTransform
to read from a MQTT broker.static class
APTransform
to write and send a message to a MQTT server. -
Method Summary
Modifier and TypeMethodDescriptionstatic <InputT> MqttIO.Write
<InputT> static MqttIO.Read
<byte[]> read()
static MqttIO.Read
<MqttRecord> static MqttIO.Write
<byte[]> write()
-
Method Details
-
read
-
readWithMetadata
-
write
-
dynamicWrite
-