Class InfluxDbIO

java.lang.Object
org.apache.beam.sdk.io.influxdb.InfluxDbIO

public class InfluxDbIO extends Object
IO to read and write from InfluxDB.

Reading from InfluxDB

InfluxDB return a bounded collection of String as PCollection<String>. The String follows the line protocol (https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/). To Configure the InfluxDB source, you have to provide the connection URL, the credentials to connect to InfluxDB and the Database name


 pipeline.apply(
   InfluxDbIO.read()
   .withDataSourceConfiguration(DataSourceConfiguration.create(StaticValueProvider.of(options.getInfluxDBURL()),
                        StaticValueProvider.of(options.getInfluxDBUserName()),
                        StaticValueProvider.of(options.getInfluxDBPassword())))
   .withDatabase("metrics")
   .withQuery("select * from metric");
 

Writing to InfluxDB

InfluxDB sink supports writing data (as line protocol) to InfluxDB To configure a InfluxDB sink, you must specify a URL InfluxDBURL, userName, password, database


 pipeleine
    .apply(...)
    .appply(InfluxDbIO.write()
    .withDataSourceConfiguration(DataSourceConfiguration.create(StaticValueProvider.of(options.getInfluxDBURL()),
                         StaticValueProvider.of(options.getInfluxDBUserName()),
                         StaticValueProvider.of(options.getInfluxDBPassword())));

 

The source and sink also accepts optional configuration: withRetentionPolicy()