Class HadoopFormatIO
HadoopFormatIO is a Transform for reading data from any source or writing data to any
 sink which implements Hadoop InputFormat or OutputFormat. For example: Cassandra,
 Elasticsearch, HBase, Redis, Postgres etc. HadoopFormatIO has to make several performance
 trade-offs in connecting to InputFormat or OutputFormat, so if there is another
 Beam IO Transform specifically for connecting to your data source of choice, we would recommend
 using that one, but this IO Transform allows you to connect to many data sources/sinks that do
 not yet have a Beam IO Transform.
 Reading using Hadoop HadoopFormatIO
 You will need to pass a Hadoop Configuration with parameters specifying how the read
 will occur. Many properties of the Configuration are optional, and some are required for certain
 InputFormat classes, but the following properties must be set for all InputFormats:
 
mapreduce.job.inputformat.class: TheInputFormatclass used to connect to your data source of choice.key.class: The key class returned by theInputFormatinmapreduce.job.inputformat.class.value.class: The value class returned by theInputFormatinmapreduce.job.inputformat.class.
 {
   Configuration myHadoopConfiguration = new Configuration(false);
   // Set Hadoop InputFormat, key and value class in configuration
   myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
      MyDbInputFormatClass, InputFormat.class);
   myHadoopConfiguration.setClass("key.class", MyDbInputFormatKeyClass, Object.class);
   myHadoopConfiguration.setClass("value.class",
      MyDbInputFormatValueClass, Object.class);
 }
 
 You will need to check to see if the key and value classes output by the InputFormat
 have a Beam Coder available. If not, you can use withKeyTranslation/withValueTranslation
 to specify a method transforming instances of those classes into another class that is supported
 by a Beam Coder. These settings are optional and you don't need to specify translation
 for both key and value. If you specify a translation, you will need to make sure the K or V of
 the read transform match the output type of the translation.
 
You will need to set appropriate InputFormat key and value class (i.e. "key.class" and
 "value.class") in Hadoop Configuration. If you set different InputFormat key or value
 class than InputFormat's actual key or value class then, it may result in an error like
 "unexpected extra bytes after decoding" while the decoding process of key/value object happens.
 Hence, it is important to set appropriate InputFormat key and value class.
 
 Pipeline p = ...; // Create pipeline.
 // Read data only with Hadoop configuration.
 p.apply("read",
     HadoopFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read()
              .withConfiguration(myHadoopConfiguration);
 
 // Read data with configuration and key translation (Example scenario: Beam Coder is not
 available for key class hence key translation is required.).
 SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
       new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
         public MyKeyClass apply(InputFormatKeyClass input) {
           // ...logic to transform InputFormatKeyClass to MyKeyClass
         }
 };
 
 
 p.apply("read",
     HadoopFormatIO.<MyKeyClass, InputFormatKeyClass>read()
              .withConfiguration(myHadoopConfiguration)
              .withKeyTranslation(myOutputKeyType);
 
 // Read data with configuration and value translation (Example scenario: Beam Coder is not available for value class hence value translation is required.).
 SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
      new SimpleFunction<InputFormatValueClass, MyValueClass>() {
          public MyValueClass apply(InputFormatValueClass input) {
            // ...logic to transform InputFormatValueClass to MyValueClass
          }
  };
 
 
 p.apply("read",
     HadoopFormatIO.<InputFormatKeyClass, MyValueClass>read()
              .withConfiguration(myHadoopConfiguration)
              .withValueTranslation(myOutputValueType);
 
 Hadoop formats typically work with Writable data structures which are mutable and instances are reused by the input format reader. Therefore, to not to have elements which can change value after they are emitted from read, this IO will clone each key value read from underlying hadoop input format (unless they are in the list of well known immutable types). However, in cases where used input format does not reuse instances for key/value or translation functions are used which already output immutable types, such clone of values can be needless penalty. In these cases IO can be instructed to skip key/value cloning.
 HadoopFormatIO.Read<InputFormatKeyClass, MyValueClass> read = ...
 p.apply("read", read
     .withSkipKeyClone(true)
     .withSkipValueClone(true));
 
 IMPORTANT! In case of using DBInputFormat to read data from RDBMS, Beam parallelizes
 the process by using LIMIT and OFFSET clauses of SQL query to fetch different ranges of records
 (as a split) by different workers. To guarantee the same order and proper split of results you
 need to order them by one or more keys (either PRIMARY or UNIQUE). It can be done during
 configuration step, for example:
 
 Configuration conf = new Configuration();
 conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
 conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
 conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
 
 Writing using Hadoop HadoopFormatIO
 You will need to pass a Hadoop Configuration with parameters specifying how the write
 will occur. Many properties of the Configuration are optional, and some are required for certain
 OutputFormat classes, but the following properties must be set for all OutputFormats:
 
mapreduce.job.id: The identifier of the write job. E.g.: end timestamp of window.mapreduce.job.outputformat.class: TheOutputFormatclass used to connect to your data sink of choice.mapreduce.job.output.key.class: The key class passed to theOutputFormatinmapreduce.job.outputformat.class.mapreduce.job.output.value.class: The value class passed to theOutputFormatinmapreduce.job.outputformat.class.mapreduce.job.reduces: Number of reduce tasks. Value is equal to number of write tasks which will be generated. This property is not required forHadoopFormatIO.Write.PartitionedWriterBuilder.withoutPartitioning()write.mapreduce.job.partitioner.class: Hadoop partitioner class which will be used for distributing of records among partitions. This property is not required forHadoopFormatIO.Write.PartitionedWriterBuilder.withoutPartitioning()write.
OUTPUT_FORMAT_CLASS_ATTR.
 For example:
 Configuration myHadoopConfiguration = new Configuration(false);
 // Set Hadoop OutputFormat, key and value class in configuration
 myHadoopConfiguration.setClass("mapreduce.job.outputformat.class",
    MyDbOutputFormatClass, OutputFormat.class);
 myHadoopConfiguration.setClass("mapreduce.job.output.key.class",
    MyDbOutputFormatKeyClass, Object.class);
 myHadoopConfiguration.setClass("mapreduce.job.output.value.class",
    MyDbOutputFormatValueClass, Object.class);
 myHadoopConfiguration.setClass("mapreduce.job.partitioner.class",
    MyPartitionerClass, Object.class);
 myHadoopConfiguration.setInt("mapreduce.job.reduces", 2);
 
 You will need to set OutputFormat key and value class (i.e. "mapreduce.job.output.key.class"
 and "mapreduce.job.output.value.class") in Hadoop Configuration which are equal to 
 KeyT and ValueT. If you set different OutputFormat key or value class than
 OutputFormat's actual key or value class then, it will throw IllegalArgumentException
 
Batch writing
 //Data which will we want to write
 PCollection<KV<Text, LongWritable>> boundedWordsCount = ...
 //Hadoop configuration for write
 //We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
 Configuration myHadoopConfiguration = ...
 //path to directory with locks
 String locksDirPath = ...;
 boundedWordsCount.apply(
     "writeBatch",
     HadoopFormatIO.<Text, LongWritable>write()
         .withConfiguration(myHadoopConfiguration)
         .withPartitioning()
         .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
 
 Stream writing
    // Data which will we want to write
   PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;
   // Transformation which transforms data of one window into one hadoop configuration
   PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
       configTransform = ...;
   unboundedWordsCount.apply(
       "writeStream",
       HadoopFormatIO.<Text, LongWritable>write()
           .withConfigurationTransform(configTransform)
           .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
 
 }- 
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classBounded source implementation forHadoopFormatIO.static classAPTransformthat reads from any data source which implements Hadoop InputFormat.static classA wrapper to allow HadoopInputSplitto be serialized using Java's standard serialization mechanisms.static classAPTransformthat writes to any data sink which implements Hadoop OutputFormat. - 
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final StringMRJobConfig.ID.static final StringMRJobConfig.NUM_REDUCES.static final StringMRJobConfig.MAPREDUCE_JOB_DIR.static final StringMRJobConfig.OUTPUT_FORMAT_CLASS_ATTR.static final StringMRJobConfig.OUTPUT_KEY_CLASS.static final StringMRJobConfig.OUTPUT_VALUE_CLASS.static final StringMRJobConfig.PARTITIONER_CLASS_ATTR. - 
Constructor Summary
Constructors - 
Method Summary
Modifier and TypeMethodDescriptionstatic <K,V> HadoopFormatIO.Read <K, V> read()Creates an uninitializedHadoopFormatIO.Read.static <KeyT,ValueT> 
HadoopFormatIO.Write.WriteBuilder<KeyT, ValueT> write()Creates anHadoopFormatIO.Write.Builderfor creation of Write Transformation. 
- 
Field Details
- 
OUTPUT_FORMAT_CLASS_ATTR
MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR.- See Also:
 
 - 
OUTPUT_KEY_CLASS
MRJobConfig.OUTPUT_KEY_CLASS.- See Also:
 
 - 
OUTPUT_VALUE_CLASS
MRJobConfig.OUTPUT_VALUE_CLASS.- See Also:
 
 - 
NUM_REDUCES
MRJobConfig.NUM_REDUCES.- See Also:
 
 - 
PARTITIONER_CLASS_ATTR
MRJobConfig.PARTITIONER_CLASS_ATTR.- See Also:
 
 - 
JOB_ID
MRJobConfig.ID.- See Also:
 
 - 
OUTPUT_DIR
MRJobConfig.MAPREDUCE_JOB_DIR.- See Also:
 
 
 - 
 - 
Constructor Details
- 
HadoopFormatIO
public HadoopFormatIO() 
 - 
 - 
Method Details
- 
read
Creates an uninitializedHadoopFormatIO.Read. Before use, theReadmust be initialized with a HadoopFormatIO.Read#withConfiguration(HadoopConfiguration) that specifies the source. A key/value translation may also optionally be specified usingHadoopFormatIO.Read.withKeyTranslation(org.apache.beam.sdk.transforms.SimpleFunction<?, K>)/HadoopFormatIO.Read.withValueTranslation(org.apache.beam.sdk.transforms.SimpleFunction<?, V>). - 
write
Creates anHadoopFormatIO.Write.Builderfor creation of Write Transformation. Before creation of the transformation, chain of builders must be set.- Type Parameters:
 KeyT- Type of keys to be written.ValueT- Type of values to be written.- Returns:
 - Write builder
 
 
 -