public class CdapIO
extends java.lang.Object
CdapIO is a Transform for reading data from source or writing data to sink of a Cdap
Plugin. It uses HadoopFormatIO for Batch and SparkReceiverIO for Streaming.
To configure CdapIO source, you must specify Cdap Plugin, Cdap PluginConfig, key and value classes.
Plugin is the Wrapper class for the Cdap Plugin. It contains main information about
the Plugin. The object of the Plugin class can be created with the Plugin.createBatch(Class, Class, Class) method. Method requires the following parameters:
BatchSource class
InputFormat class
InputFormatProvider class
For more information about the InputFormat and InputFormatProvider, see HadoopFormatIO.
Every Cdap Plugin has its PluginConfig class with necessary fields to configure the
Plugin. You can set the Map of your parameters with the ConfigWrapper.withParams(Map) method where the key is the field name.
For example, to create a basic read() transform:
Pipeline p = ...; // Create pipeline.
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Read using CDAP batch plugin
p.apply("ReadBatch",
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createBatch(
EmployeeBatchSource.class,
EmployeeInputFormat.class,
EmployeeInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class));
To configure CdapIO sink, just as read() Cdap Plugin, Cdap
PluginConfig, key, value classes must be specified. In addition, it's necessary to
determine locks directory path CdapIO.Write.withLocksDirPath(String). It's used for
HDFSSynchronization configuration for HadoopFormatIO. More info can be found in
HadoopFormatIO documentation.
To create the object of the Plugin class with the Plugin.createBatch(Class,
Class, Class) method, need to specify the following parameters:
BatchSink class
OutputFormat class
OutputFormatProvider class
For more information about the OutputFormat and OutputFormatProvider, see HadoopFormatIO.
Example of write() usage:
Pipeline p = ...; // Create pipeline.
// Get or create data to write
PCollection<KV<String, String>> input = p.apply(Create.of(data));
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Write using CDAP batch plugin
input.apply(
"WriteBatch",
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.createBatch(
EmployeeBatchSink.class,
EmployeeOutputFormat.class,
EmployeeOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()));
p.run();
To configure CdapIO source, you must specify Cdap Plugin, Cdap PluginConfig, key and value classes.
Optionally you can pass pullFrequencySec which is a delay in seconds between polling
for new records updates, you can pass startOffset which is inclusive start offset from
which the reading should be started. Also, you can pass startPollTimeoutSec which is
delay in seconds before start polling.
Plugin is the Wrapper class for the Cdap Plugin. It contains main information about
the Plugin. The object of the Plugin class can be created with the Plugin.createStreaming(Class, SerializableFunction, Class) method. Method requires StreamingSource class, getOffsetFn which is a SerializableFunction that defines how to get Long offset from V record, Spark
Receiver class parameters.
Every Cdap Plugin has its PluginConfig class with necessary fields to configure the
Plugin. You can set the Map of your parameters with the ConfigWrapper.withParams(Map) method where the key is the field name.
For example, to create a basic read() transform:
Pipeline p = ...; // Create pipeline.
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Read using CDAP streaming plugin
p.apply("ReadStreaming",
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createStreaming(
EmployeeStreamingSource.class,
Long::valueOf,
EmployeeReceiver.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withPullFrequencySec(1L)
.withStartPollTimeoutSec(2L)
.withStartOffset(10L);
| Modifier and Type | Class and Description |
|---|---|
static class |
CdapIO.Read<K,V>
A
PTransform to read from CDAP source. |
static class |
CdapIO.Write<K,V>
A
PTransform to write to CDAP sink. |
| Constructor and Description |
|---|
CdapIO() |
| Modifier and Type | Method and Description |
|---|---|
static <K,V> CdapIO.Read<K,V> |
read() |
static <K,V> CdapIO.Write<K,V> |
write() |
public static <K,V> CdapIO.Read<K,V> read()
public static <K,V> CdapIO.Write<K,V> write()