Class CdapIO
CdapIO is a Transform for reading data from source or writing data to sink of a Cdap
Plugin. It uses HadoopFormatIO for Batch and SparkReceiverIO for Streaming.
Read from Cdap Plugin Bounded Source
To configure CdapIO source, you must specify Cdap Plugin, Cdap PluginConfig, key and value classes.
Plugin is the Wrapper class for the Cdap Plugin. It contains main information about
the Plugin. The object of the Plugin class can be created with the Plugin.createBatch(Class, Class, Class) method. Method requires the following parameters:
BatchSourceclassInputFormatclassInputFormatProviderclass
For more information about the InputFormat and InputFormatProvider, see HadoopFormatIO.
Every Cdap Plugin has its PluginConfig class with necessary fields to configure the
Plugin. You can set the Map of your parameters with the ConfigWrapper.withParams(Map) method where the key is the field name.
For example, to create a basic read() transform:
Pipeline p = ...; // Create pipeline.
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Read using CDAP batch plugin
p.apply("ReadBatch",
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createBatch(
EmployeeBatchSource.class,
EmployeeInputFormat.class,
EmployeeInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class));
Write to Cdap Plugin Bounded Sink
To configure CdapIO sink, just as read() Cdap Plugin, Cdap
PluginConfig, key, value classes must be specified. In addition, it's necessary to
determine locks directory path CdapIO.Write.withLocksDirPath(String). It's used for
HDFSSynchronization configuration for HadoopFormatIO. More info can be found in
HadoopFormatIO documentation.
To create the object of the Plugin class with the Plugin.createBatch(Class, Class, Class) method, need to specify the following parameters:
BatchSinkclassOutputFormatclassOutputFormatProviderclass
For more information about the OutputFormat and OutputFormatProvider, see HadoopFormatIO.
Example of write() usage:
Pipeline p = ...; // Create pipeline.
// Get or create data to write
PCollection<KV<String, String>> input = p.apply(Create.of(data));
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Write using CDAP batch plugin
input.apply(
"WriteBatch",
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.createBatch(
EmployeeBatchSink.class,
EmployeeOutputFormat.class,
EmployeeOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()));
p.run();
Read from Cdap Plugin Streaming Source
To configure CdapIO source, you must specify Cdap Plugin, Cdap PluginConfig, key and value classes.
Optionally you can pass pullFrequencySec which is a delay in seconds between polling
for new records updates, you can pass startOffset which is inclusive start offset from
which the reading should be started. Also, you can pass startPollTimeoutSec which is
delay in seconds before start polling.
Plugin is the Wrapper class for the Cdap Plugin. It contains main information about
the Plugin. The object of the Plugin class can be created with the Plugin.createStreaming(Class, SerializableFunction, Class) method. Method requires StreamingSource class, getOffsetFn which is a SerializableFunction that defines how to get Long offset from V record, Spark
Receiver class parameters.
Every Cdap Plugin has its PluginConfig class with necessary fields to configure the
Plugin. You can set the Map of your parameters with the ConfigWrapper.withParams(Map) method where the key is the field name.
For example, to create a basic read() transform:
Pipeline p = ...; // Create pipeline.
// Create PluginConfig for specific plugin
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
// Read using CDAP streaming plugin
p.apply("ReadStreaming",
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createStreaming(
EmployeeStreamingSource.class,
Long::valueOf,
EmployeeReceiver.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withPullFrequencySec(1L)
.withStartPollTimeoutSec(2L)
.withStartOffset(10L);
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classAPTransformto read from CDAP source.static classAPTransformto write to CDAP sink. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <K,V> CdapIO.Read <K, V> read()static <K,V> CdapIO.Write <K, V> write()
-
Constructor Details
-
CdapIO
public CdapIO()
-
-
Method Details
-
read
-
write
-