Class CdapIO

java.lang.Object
org.apache.beam.sdk.io.cdap.CdapIO

public class CdapIO extends Object
A CdapIO is a Transform for reading data from source or writing data to sink of a Cdap Plugin. It uses HadoopFormatIO for Batch and SparkReceiverIO for Streaming.

Read from Cdap Plugin Bounded Source

To configure CdapIO source, you must specify Cdap Plugin, Cdap PluginConfig, key and value classes.

Plugin is the Wrapper class for the Cdap Plugin. It contains main information about the Plugin. The object of the Plugin class can be created with the Plugin.createBatch(Class, Class, Class) method. Method requires the following parameters:

For more information about the InputFormat and InputFormatProvider, see HadoopFormatIO.

Every Cdap Plugin has its PluginConfig class with necessary fields to configure the Plugin. You can set the Map of your parameters with the ConfigWrapper.withParams(Map) method where the key is the field name.

For example, to create a basic read() transform:


 Pipeline p = ...; // Create pipeline.

 // Create PluginConfig for specific plugin
 EmployeeConfig pluginConfig =
         new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();

 // Read using CDAP batch plugin
 p.apply("ReadBatch",
 CdapIO.<String, String>read()
             .withCdapPlugin(
                 Plugin.createBatch(
                     EmployeeBatchSource.class,
                     EmployeeInputFormat.class,
                     EmployeeInputFormatProvider.class))
             .withPluginConfig(pluginConfig)
             .withKeyClass(String.class)
             .withValueClass(String.class));
 

Write to Cdap Plugin Bounded Sink

To configure CdapIO sink, just as read() Cdap Plugin, Cdap PluginConfig, key, value classes must be specified. In addition, it's necessary to determine locks directory path CdapIO.Write.withLocksDirPath(String). It's used for HDFSSynchronization configuration for HadoopFormatIO. More info can be found in HadoopFormatIO documentation.

To create the object of the Plugin class with the Plugin.createBatch(Class, Class, Class) method, need to specify the following parameters:

For more information about the OutputFormat and OutputFormatProvider, see HadoopFormatIO.

Example of write() usage:


 Pipeline p = ...; // Create pipeline.

 // Get or create data to write
 PCollection<KV<String, String>> input = p.apply(Create.of(data));

 // Create PluginConfig for specific plugin
 EmployeeConfig pluginConfig =
         new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();

 // Write using CDAP batch plugin
 input.apply(
         "WriteBatch",
         CdapIO.<String, String>write()
             .withCdapPlugin(
                 Plugin.createBatch(
                     EmployeeBatchSink.class,
                     EmployeeOutputFormat.class,
                     EmployeeOutputFormatProvider.class))
             .withPluginConfig(pluginConfig)
             .withKeyClass(String.class)
             .withValueClass(String.class)
             .withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()));
     p.run();
 

Read from Cdap Plugin Streaming Source

To configure CdapIO source, you must specify Cdap Plugin, Cdap PluginConfig, key and value classes.

Optionally you can pass pullFrequencySec which is a delay in seconds between polling for new records updates, you can pass startOffset which is inclusive start offset from which the reading should be started. Also, you can pass startPollTimeoutSec which is delay in seconds before start polling.

Plugin is the Wrapper class for the Cdap Plugin. It contains main information about the Plugin. The object of the Plugin class can be created with the Plugin.createStreaming(Class, SerializableFunction, Class) method. Method requires StreamingSource class, getOffsetFn which is a SerializableFunction that defines how to get Long offset from V record, Spark Receiver class parameters.

Every Cdap Plugin has its PluginConfig class with necessary fields to configure the Plugin. You can set the Map of your parameters with the ConfigWrapper.withParams(Map) method where the key is the field name.

For example, to create a basic read() transform:


 Pipeline p = ...; // Create pipeline.

 // Create PluginConfig for specific plugin
 EmployeeConfig pluginConfig =
         new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();

 // Read using CDAP streaming plugin
 p.apply("ReadStreaming",
 CdapIO.<String, String>read()
             .withCdapPlugin(
                Plugin.createStreaming(
                     EmployeeStreamingSource.class,
                     Long::valueOf,
                     EmployeeReceiver.class))
             .withPluginConfig(pluginConfig)
             .withKeyClass(String.class)
             .withValueClass(String.class)
             .withPullFrequencySec(1L)
             .withStartPollTimeoutSec(2L)
             .withStartOffset(10L);
 
  • Constructor Details

    • CdapIO

      public CdapIO()
  • Method Details