@Experimental(value=SOURCE_SINK) public class HadoopInputFormatIO extends java.lang.Object
HadoopInputFormatIO is a Transform for reading data from any source which implements
 Hadoop InputFormat. For example- Cassandra, Elasticsearch, HBase, Redis, Postgres etc.
 HadoopInputFormatIO has to make several performance trade-offs in connecting to InputFormat, so if there is another Beam IO Transform specifically for connecting to your data
 source of choice, we would recommend using that one, but this IO Transform allows you to connect
 to many data sources that do not yet have a Beam IO Transform.
 You will need to pass a Hadoop Configuration with parameters specifying how the read
 will occur. Many properties of the Configuration are optional, and some are required for certain
 InputFormat classes, but the following properties must be set for all InputFormats:
 
mapreduce.job.inputformat.class: The InputFormat class used to connect to
       your data source of choice.
   key.class: The key class returned by the InputFormat in mapreduce.job.inputformat.class.
   value.class: The value class returned by the InputFormat in mapreduce.job.inputformat.class.
 
 {
   Configuration myHadoopConfiguration = new Configuration(false);
   // Set Hadoop InputFormat, key and value class in configuration
   myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
      MyDbInputFormatClass, InputFormat.class);
   myHadoopConfiguration.setClass("key.class", MyDbInputFormatKeyClass, Object.class);
   myHadoopConfiguration.setClass("value.class",
      MyDbInputFormatValueClass, Object.class);
 }
 
 You will need to check to see if the key and value classes output by the InputFormat
 have a Beam Coder available. If not, you can use withKeyTranslation/withValueTranslation
 to specify a method transforming instances of those classes into another class that is supported
 by a Beam Coder. These settings are optional and you don't need to specify translation
 for both key and value. If you specify a translation, you will need to make sure the K or V of
 the read transform match the output type of the translation.
 
You will need to set appropriate InputFormat key and value class (i.e. "key.class" and
 "value.class") in Hadoop Configuration. If you set different InputFormat key or value
 class than InputFormat's actual key or value class then, it may result in an error like
 "unexpected extra bytes after decoding" while the decoding process of key/value object happens.
 Hence, it is important to set appropriate InputFormat key and value class.
 
HadoopInputFormatIO
 Pipeline p = ...; // Create pipeline.
 // Read data only with Hadoop configuration.
 p.apply("read",
     HadoopInputFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read()
              .withConfiguration(myHadoopConfiguration);
 
 // Read data with configuration and key translation (Example scenario: Beam Coder is not
 available for key class hence key translation is required.).
 SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
       new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
         public MyKeyClass apply(InputFormatKeyClass input) {
           // ...logic to transform InputFormatKeyClass to MyKeyClass
         }
 };
 
 
 p.apply("read",
     HadoopInputFormatIO.<MyKeyClass, InputFormatKeyClass>read()
              .withConfiguration(myHadoopConfiguration)
              .withKeyTranslation(myOutputKeyType);
 
 // Read data with configuration and value translation (Example scenario: Beam Coder is not available for value class hence value translation is required.).
 SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
      new SimpleFunction<InputFormatValueClass, MyValueClass>() {
          public MyValueClass apply(InputFormatValueClass input) {
            // ...logic to transform InputFormatValueClass to MyValueClass
          }
  };
 
 
 p.apply("read",
     HadoopInputFormatIO.<InputFormatKeyClass, MyValueClass>read()
              .withConfiguration(myHadoopConfiguration)
              .withValueTranslation(myOutputValueType);
 
 IMPORTANT! In case of using DBInputFormat to read data from RDBMS, Beam parallelizes
 the process by using LIMIT and OFFSET clauses of SQL query to fetch different ranges of records
 (as a split) by different workers. To guarantee the same order and proper split of results you
 need to order them by one or more keys (either PRIMARY or UNIQUE). It can be done during
 configuration step, for example:
 
 Configuration conf = new Configuration();
 conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
 conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
 conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
 | Modifier and Type | Class and Description | 
|---|---|
static class  | 
HadoopInputFormatIO.HadoopInputFormatBoundedSource<K,V>
Bounded source implementation for  
HadoopInputFormatIO. | 
static class  | 
HadoopInputFormatIO.Read<K,V>
A  
PTransform that reads from any data source which implements Hadoop InputFormat. | 
static class  | 
HadoopInputFormatIO.SerializableSplit
A wrapper to allow Hadoop  
InputSplit to be serialized using
 Java's standard serialization mechanisms. | 
| Constructor and Description | 
|---|
HadoopInputFormatIO()  | 
| Modifier and Type | Method and Description | 
|---|---|
static <K,V> HadoopInputFormatIO.Read<K,V> | 
read()
Creates an uninitialized  
HadoopInputFormatIO.Read. | 
public static <K,V> HadoopInputFormatIO.Read<K,V> read()
HadoopInputFormatIO.Read. Before use, the Read must be
 initialized with a HadoopInputFormatIO.Read#withConfiguration(HadoopConfiguration) that
 specifies the source. A key/value translation may also optionally be specified using HadoopInputFormatIO.Read.withKeyTranslation(org.apache.beam.sdk.transforms.SimpleFunction<?, K>)/ HadoopInputFormatIO.Read.withValueTranslation(org.apache.beam.sdk.transforms.SimpleFunction<?, V>).