Class MqttIO

java.lang.Object
org.apache.beam.sdk.io.mqtt.MqttIO

public class MqttIO extends Object
An unbounded source for MQTT broker.

Reading from a MQTT broker

MqttIO source returns an unbounded PCollection containing MQTT message payloads (as byte[]).

To configure a MQTT source, you have to provide a MQTT connection configuration including ClientId, a ServerURI, a Topic pattern, and optionally username and password to connect to the MQTT broker. The following example illustrates various options for configuring the source:


 pipeline.apply(
   MqttIO.read()
    .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
      "tcp://host:11883",
      "my_topic"))

 

Reading with Metadata from a MQTT broker

The readWithMetadata method extends the functionality of the basic read method by returning a PCollection of metadata that includes both the topic name and the payload. The metadata is encapsulated in a container class MqttRecord that includes the topic name and payload. This allows you to implement business logic that can differ depending on the topic from which the message was received.


 PCollection<MqttRecord> records = pipeline.apply(
   MqttIO.readWithMetadata()
    .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
      "tcp://host:11883",
      "my_topic_pattern"))

 

By using the topic information, you can apply different processing logic depending on the source topic, enhancing the flexibility of message processing.

Example


 pipeline
   .apply(MqttIO.readWithMetadata()
     .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
       "tcp://host:1883", "my_topic_pattern")))
   .apply(ParDo.of(new DoFn<MqttRecord, Void>() {
     @ProcessElement
     public void processElement(ProcessContext c) {
       MqttRecord record = c.element();
       String topic = record.getTopic();
       byte[] payload = record.getPayload();
       // Apply business logic based on the topic
       if (topic.equals("important_topic")) {
         // Special processing for important_topic
       }
     }
   }));

 

Writing to a MQTT broker

MqttIO sink supports writing byte[] to a topic on a MQTT broker.

To configure a MQTT sink, as for the read, you have to specify a MQTT connection configuration with ServerURI, Topic, ...

The MqttIO only fully supports QoS 1 (at least once). It's the only QoS level guaranteed due to potential retries on bundles.

For instance:


 pipeline
   .apply(...) // provide PCollection<byte[]>
   .MqttIO.write()
     .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
       "tcp://host:11883",
       "my_topic"))

 

Dynamic Writing to a MQTT Broker

MqttIO also supports dynamic writing to multiple topics based on the data. You can specify a function to determine the target topic for each message. The following example demonstrates how to configure dynamic topic writing:


 pipeline
   .apply(...)  // Provide PCollection<InputT>
   .apply(
     MqttIO.<InputT>dynamicWrite()
       .withConnectionConfiguration(
         MqttIO.ConnectionConfiguration.create("tcp://host:11883"))
       .withTopicFn(<Function to determine the topic dynamically>)
       .withPayloadFn(<Function to extract the payload>));
 

This dynamic writing capability allows for more flexible MQTT message routing based on the message content, enabling scenarios where messages are directed to different topics.