public class CdapIO
extends java.lang.Object
CdapIO
is a Transform for reading data from source or writing data to sink of a Cdap
Plugin. It uses HadoopFormatIO
for Batch and SparkReceiverIO for Streaming.
To configure CdapIO
source, you must specify Cdap Plugin
, Cdap PluginConfig
, key and value classes.
Plugin
is the Wrapper class for the Cdap Plugin. It contains main information about
the Plugin. The object of the Plugin
class can be created with the Plugin.createBatch(Class, Class, Class)
method. Method requires the following parameters:
BatchSource
class
InputFormat
class
InputFormatProvider
class
For more information about the InputFormat and InputFormatProvider, see HadoopFormatIO
.
Every Cdap Plugin has its PluginConfig
class with necessary fields to configure the
Plugin. You can set the Map
of your parameters with the ConfigWrapper.withParams(Map)
method where the key is the field name.
For example, to create a basic read()
transform:
Pipeline p = ...; // Create pipeline.
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Read using CDAP batch plugin
p.apply("ReadBatch",
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createBatch(
EmployeeBatchSource.class,
EmployeeInputFormat.class,
EmployeeInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class));
To configure CdapIO
sink, just as read()
Cdap Plugin
, Cdap
PluginConfig
, key, value classes must be specified. In addition, it's necessary to
determine locks directory path CdapIO.Write.withLocksDirPath(String)
. It's used for
HDFSSynchronization
configuration for HadoopFormatIO
. More info can be found in
HadoopFormatIO
documentation.
To create the object of the Plugin
class with the Plugin.createBatch(Class,
Class, Class)
method, need to specify the following parameters:
BatchSink
class
OutputFormat
class
OutputFormatProvider
class
For more information about the OutputFormat and OutputFormatProvider, see HadoopFormatIO
.
Example of write()
usage:
Pipeline p = ...; // Create pipeline.
// Get or create data to write
PCollection<KV<String, String>> input = p.apply(Create.of(data));
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Write using CDAP batch plugin
input.apply(
"WriteBatch",
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.createBatch(
EmployeeBatchSink.class,
EmployeeOutputFormat.class,
EmployeeOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()));
p.run();
To configure CdapIO
source, you must specify Cdap Plugin
, Cdap PluginConfig
, key and value classes.
Optionally you can pass pullFrequencySec
which is a delay in seconds between polling
for new records updates, you can pass startOffset
which is inclusive start offset from
which the reading should be started. Also, you can pass startPollTimeoutSec
which is
delay in seconds before start polling.
Plugin
is the Wrapper class for the Cdap Plugin. It contains main information about
the Plugin. The object of the Plugin
class can be created with the Plugin.createStreaming(Class, SerializableFunction, Class)
method. Method requires StreamingSource
class, getOffsetFn
which is a SerializableFunction
that defines how to get Long offset
from V record
, Spark
Receiver
class parameters.
Every Cdap Plugin has its PluginConfig
class with necessary fields to configure the
Plugin. You can set the Map
of your parameters with the ConfigWrapper.withParams(Map)
method where the key is the field name.
For example, to create a basic read()
transform:
Pipeline p = ...; // Create pipeline.
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Read using CDAP streaming plugin
p.apply("ReadStreaming",
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createStreaming(
EmployeeStreamingSource.class,
Long::valueOf,
EmployeeReceiver.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withPullFrequencySec(1L)
.withStartPollTimeoutSec(2L)
.withStartOffset(10L);
Modifier and Type | Class and Description |
---|---|
static class |
CdapIO.Read<K,V>
A
PTransform to read from CDAP source. |
static class |
CdapIO.Write<K,V>
A
PTransform to write to CDAP sink. |
Constructor and Description |
---|
CdapIO() |
Modifier and Type | Method and Description |
---|---|
static <K,V> CdapIO.Read<K,V> |
read() |
static <K,V> CdapIO.Write<K,V> |
write() |
public static <K,V> CdapIO.Read<K,V> read()
public static <K,V> CdapIO.Write<K,V> write()