@Experimental(value=SOURCE_SINK) public class CdapIO extends java.lang.Object
CdapIO is a Transform for reading data from source or writing data to sink of a Cdap
 Plugin. It uses HadoopFormatIO for Batch and SparkReceiverIO for Streaming.
 To configure CdapIO source, you must specify Cdap Plugin, Cdap PluginConfig, key and value classes.
 
Plugin is the Wrapper class for the Cdap Plugin. It contains main information about
 the Plugin. The object of the Plugin class can be created with the Plugin.createBatch(Class, Class, Class) method. Method requires the following parameters:
 
BatchSource class
   InputFormat class
   InputFormatProvider class
 For more information about the InputFormat and InputFormatProvider, see HadoopFormatIO.
 
Every Cdap Plugin has its PluginConfig class with necessary fields to configure the
 Plugin. You can set the Map of your parameters with the ConfigWrapper.withParams(Map) method where the key is the field name.
 
For example, to create a basic read() transform:
 
 Pipeline p = ...; // Create pipeline.
 // Create PluginConfig for specific plugin
 EmployeeConfig pluginConfig =
         new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
 // Read using CDAP batch plugin
 p.apply("ReadBatch",
 CdapIO.<String, String>read()
             .withCdapPlugin(
                 Plugin.createBatch(
                     EmployeeBatchSource.class,
                     EmployeeInputFormat.class,
                     EmployeeInputFormatProvider.class))
             .withPluginConfig(pluginConfig)
             .withKeyClass(String.class)
             .withValueClass(String.class));
 To configure CdapIO sink, just as read() Cdap Plugin, Cdap
 PluginConfig, key, value classes must be specified. In addition, it's necessary to
 determine locks directory path CdapIO.Write.withLocksDirPath(String). It's used for
 HDFSSynchronization configuration for HadoopFormatIO. More info can be found in
 HadoopFormatIO documentation.
 
To create the object of the Plugin class with the Plugin.createBatch(Class,
 Class, Class) method, need to specify the following parameters:
 
BatchSink class
   OutputFormat class
   OutputFormatProvider class
 For more information about the OutputFormat and OutputFormatProvider, see HadoopFormatIO.
 
Example of write() usage:
 
 Pipeline p = ...; // Create pipeline.
 // Get or create data to write
 PCollection<KV<String, String>> input = p.apply(Create.of(data));
 // Create PluginConfig for specific plugin
 EmployeeConfig pluginConfig =
         new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
 // Write using CDAP batch plugin
 input.apply(
         "WriteBatch",
         CdapIO.<String, String>write()
             .withCdapPlugin(
                 Plugin.createBatch(
                     EmployeeBatchSink.class,
                     EmployeeOutputFormat.class,
                     EmployeeOutputFormatProvider.class))
             .withPluginConfig(pluginConfig)
             .withKeyClass(String.class)
             .withValueClass(String.class)
             .withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()));
     p.run();
 To configure CdapIO source, you must specify Cdap Plugin, Cdap PluginConfig, key and value classes.
 
Optionally you can pass pullFrequencySec which is a delay in seconds between polling
 for new records updates, you can pass startOffset which is inclusive start offset from
 which the reading should be started.
 
Plugin is the Wrapper class for the Cdap Plugin. It contains main information about
 the Plugin. The object of the Plugin class can be created with the Plugin.createStreaming(Class, SerializableFunction, Class) method. Method requires StreamingSource class, getOffsetFn which is a SerializableFunction that defines how to get Long offset from V record, Spark
 Receiver class parameters.
 
Every Cdap Plugin has its PluginConfig class with necessary fields to configure the
 Plugin. You can set the Map of your parameters with the ConfigWrapper.withParams(Map) method where the key is the field name.
 
For example, to create a basic read() transform:
 
 Pipeline p = ...; // Create pipeline.
 // Create PluginConfig for specific plugin
 EmployeeConfig pluginConfig =
         new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
 // Read using CDAP streaming plugin
 p.apply("ReadStreaming",
 CdapIO.<String, String>read()
             .withCdapPlugin(
                Plugin.createStreaming(
                     EmployeeStreamingSource.class,
                     Long::valueOf,
                     EmployeeReceiver.class))
             .withPluginConfig(pluginConfig)
             .withKeyClass(String.class)
             .withValueClass(String.class)
             .withPullFrequencySec(1L)
             .withStartOffset(10L);
 | Modifier and Type | Class and Description | 
|---|---|
| static class  | CdapIO.Read<K,V>A  PTransformto read from CDAP source. | 
| static class  | CdapIO.Write<K,V>A  PTransformto write to CDAP sink. | 
| Constructor and Description | 
|---|
| CdapIO() | 
| Modifier and Type | Method and Description | 
|---|---|
| static <K,V> CdapIO.Read<K,V> | read() | 
| static <K,V> CdapIO.Write<K,V> | write() | 
public static <K,V> CdapIO.Read<K,V> read()
public static <K,V> CdapIO.Write<K,V> write()