public class MqttIO
extends java.lang.Object
MqttIO source returns an unbounded PCollection
containing MQTT message payloads (as
byte[]
).
To configure a MQTT source, you have to provide a MQTT connection configuration including
ClientId
, a ServerURI
, a Topic
pattern, and optionally username
and password
to connect to the MQTT broker. The following example illustrates various
options for configuring the source:
pipeline.apply(
MqttIO.read()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:11883",
"my_topic"))
MqttIO sink supports writing byte[]
to a topic on a MQTT broker.
To configure a MQTT sink, as for the read, you have to specify a MQTT connection configuration
with ServerURI
, Topic
, ...
The MqttIO only fully supports QoS 1 (at least once). It's the only QoS level guaranteed due to potential retries on bundles.
For instance:
pipeline
.apply(...) // provide PCollection<byte[]>
.MqttIO.write()
.withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
"tcp://host:11883",
"my_topic"))
MqttIO also supports dynamic writing to multiple topics based on the data. You can specify a function to determine the target topic for each message. The following example demonstrates how to configure dynamic topic writing:
pipeline
.apply(...) // Provide PCollection<InputT>
.apply(
MqttIO.<InputT>dynamicWrite()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create("tcp://host:11883"))
.withTopicFn(<Function to determine the topic dynamically>)
.withPayloadFn(<Function to extract the payload>));
This dynamic writing capability allows for more flexible MQTT message routing based on the message content, enabling scenarios where messages are directed to different topics.
Modifier and Type | Class and Description |
---|---|
static class |
MqttIO.ConnectionConfiguration
A POJO describing a MQTT connection.
|
static class |
MqttIO.Read
A
PTransform to read from a MQTT broker. |
static class |
MqttIO.Write<InputT>
A
PTransform to write and send a message to a MQTT server. |
Modifier and Type | Method and Description |
---|---|
static <InputT> MqttIO.Write<InputT> |
dynamicWrite() |
static MqttIO.Read |
read() |
static MqttIO.Write<byte[]> |
write() |
public static MqttIO.Read read()
public static MqttIO.Write<byte[]> write()
public static <InputT> MqttIO.Write<InputT> dynamicWrite()