Hadoop Input/Output Format IO
IMPORTANT! Previous implementation of Hadoop Input Format IO, called
HadoopInputFormatIO, is deprecated starting from Apache Beam 2.10. Please, use currentHadoopFormatIOwhich supports bothInputFormatandOutputFormat.
A HadoopFormatIO is a transform for reading data from any source or writing data to any sink that implements Hadoop’s InputFormat or OutputFormat accordingly. For example, Cassandra, Elasticsearch, HBase, Redis, Postgres, etc.
HadoopFormatIO allows you to connect to many data sources/sinks that do not yet have a Beam IO transform. However, HadoopFormatIO has to make several performance trade-offs in connecting to InputFormat or OutputFormat. So, if there is another Beam IO transform for connecting specifically to your data source/sink of choice, we recommend you use that one.
Reading using HadoopFormatIO
You will need to pass a Hadoop Configuration with parameters specifying how the read will occur. Many properties of the Configuration are optional and some are required for certain InputFormat classes, but the following properties must be set for all InputFormat classes:
- mapreduce.job.inputformat.class- The- InputFormatclass used to connect to your data source of choice.
- key.class- The- Keyclass returned by the- InputFormatin- mapreduce.job.inputformat.class.
- value.class- The- Valueclass returned by the- InputFormatin- mapreduce.job.inputformat.class.
For example:
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass,
  InputFormat.class);
myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class);You will need to check if the Key and Value classes output by the InputFormat have a Beam Coder available. If not, you can use withKeyTranslation or withValueTranslation to specify a method transforming instances of those classes into another class that is supported by a Beam Coder. These settings are optional and you don’t need to specify translation for both key and value.
For example:
SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
  public MyKeyClass apply(InputFormatKeyClass input) {
  // ...logic to transform InputFormatKeyClass to MyKeyClass
  }
};
SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
new SimpleFunction<InputFormatValueClass, MyValueClass>() {
  public MyValueClass apply(InputFormatValueClass input) {
  // ...logic to transform InputFormatValueClass to MyValueClass
  }
};Read data only with Hadoop configuration.
Read data with configuration and key translation
For example, a Beam Coder is not available for Key class, so key translation is required.
Read data with configuration and value translation
For example, a Beam Coder is not available for Value class, so value translation is required.
Read data with configuration, value translation and key translation
For example, Beam Coders are not available for both Key class and Value classes of InputFormat, so key and value translation are required.
Examples for specific InputFormats
Cassandra - CqlInputFormat
To read data from Cassandra, use org.apache.cassandra.hadoop.cql3.CqlInputFormat, which needs the following properties to be set:
Configuration cassandraConf = new Configuration();
cassandraConf.set("cassandra.input.thrift.port", "9160");
cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp);
cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner");
cassandraConf.set("cassandra.input.keyspace", "myKeySpace");
cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily");
cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class);
cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class);
cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class);Call Read transform as follows:
The CqlInputFormat key class is java.lang.Long Long, which has a Beam Coder. The CqlInputFormat value class is com.datastax.driver.core.Row Row, which does not have a Beam Coder. Rather than write a new coder, you can provide your own translation method, as follows:
Elasticsearch - EsInputFormat
To read data from Elasticsearch, use EsInputFormat, which needs following properties to be set:
Configuration elasticsearchConf = new Configuration();
elasticsearchConf.set("es.nodes", ElasticsearchHostIp);
elasticsearchConf.set("es.port", "9200");
elasticsearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName");
elasticsearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class);
elasticsearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class);
elasticsearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class);Call Read transform as follows:
The org.elasticsearch.hadoop.mr.EsInputFormat’s EsInputFormat key class is org.apache.hadoop.io.Text Text, and its value class is org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable. Both key and value classes have Beam Coders.
HCatalog - HCatInputFormat
To read data using HCatalog, use org.apache.hive.hcatalog.mapreduce.HCatInputFormat, which needs the following properties to be set:
Configuration hcatConf = new Configuration();
hcatConf.setClass("mapreduce.job.inputformat.class", HCatInputFormat.class, InputFormat.class);
hcatConf.setClass("key.class", LongWritable.class, Object.class);
hcatConf.setClass("value.class", HCatRecord.class, Object.class);
hcatConf.set("hive.metastore.uris", "thrift://metastore-host:port");
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hcatConf, "my_database", "my_table", "my_filter");Call Read transform as follows:
Amazon DynamoDB - DynamoDBInputFormat
To read data from Amazon DynamoDB, use org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.
DynamoDBInputFormat implements the older org.apache.hadoop.mapred.InputFormat interface and to make it compatible with HadoopFormatIO which uses the newer abstract class org.apache.hadoop.mapreduce.InputFormat,
a wrapper API is required which acts as an adapter between HadoopFormatIO and DynamoDBInputFormat (or in general any InputFormat implementing org.apache.hadoop.mapred.InputFormat)
The below example uses one such available wrapper API - https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java
Configuration dynamoDBConf = new Configuration();
Job job = Job.getInstance(dynamoDBConf);
com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper.setInputFormat(org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.class, job);
dynamoDBConf = job.getConfiguration();
dynamoDBConf.setClass("key.class", Text.class, WritableComparable.class);
dynamoDBConf.setClass("value.class", org.apache.hadoop.dynamodb.DynamoDBItemWritable.class, Writable.class);
dynamoDBConf.set("dynamodb.servicename", "dynamodb");
dynamoDBConf.set("dynamodb.input.tableName", "table_name");
dynamoDBConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
dynamoDBConf.set("dynamodb.regionid", "us-west-1");
dynamoDBConf.set("dynamodb.throughput.read", "1");
dynamoDBConf.set("dynamodb.throughput.read.percent", "1");
dynamoDBConf.set("dynamodb.version", "2011-12-05");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, "aws_access_key");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, "aws_secret_key");Call Read transform as follows:
Apache HBase - TableSnapshotInputFormat
To read data from an HBase table snapshot, use org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.
Reading from a table snapshot bypasses the HBase region servers, instead reading HBase data files directly from the filesystem.
This is useful for cases such as reading historical data or offloading of work from the HBase cluster.
There are scenarios when this may prove faster than accessing content through the region servers using the HBaseIO.
A table snapshot can be taken using the HBase shell or programmatically:
A TableSnapshotInputFormat is configured as follows:
// Construct a typical HBase scan
Scan scan = new Scan();
scan.setCaching(1000);
scan.setBatch(1000);
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_1"));
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_2"));
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "zk1:2181");
hbaseConf.set("hbase.rootdir", "/hbase");
hbaseConf.setClass(
    "mapreduce.job.inputformat.class", TableSnapshotInputFormat.class, InputFormat.class);
hbaseConf.setClass("key.class", ImmutableBytesWritable.class, Writable.class);
hbaseConf.setClass("value.class", Result.class, Writable.class);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));
// Make use of existing utility methods
Job job = Job.getInstance(hbaseConf); // creates internal clone of hbaseConf
TableSnapshotInputFormat.setInput(job, "my_snapshot", new Path("/tmp/snapshot_restore"));
hbaseConf = job.getConfiguration(); // extract the modified clone
Call Read transform as follows:
Writing using HadoopFormatIO
You will need to pass a Hadoop Configuration with parameters specifying how the write will occur. Many properties of the Configuration are optional, and some are required for certain OutputFormat classes, but the following properties must be set for all OutputFormats:
- mapreduce.job.id- The identifier of the write job. E.g.: end timestamp of window.
- mapreduce.job.outputformat.class- The- OutputFormatclass used to connect to your data sink of choice.
- mapreduce.job.output.key.class- The key class passed to the- OutputFormatin- mapreduce.job.outputformat.class.
- mapreduce.job.output.value.class- The value class passed to the- OutputFormatin- mapreduce.job.outputformat.class.
- mapreduce.job.reduces- Number of reduce tasks. Value is equal to number of write tasks which will be generated. This property is not required for- Write.PartitionedWriterBuilder#withoutPartitioning()write.
- mapreduce.job.partitioner.class- Hadoop partitioner class which will be used for distributing of records among partitions. This property is not required for- Write.PartitionedWriterBuilder#withoutPartitioning()write.
Note: All mentioned values have appropriate constants. E.g.: HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR.
For example:
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop OutputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.outputformat.class",
   MyDbOutputFormatClass, OutputFormat.class);
myHadoopConfiguration.setClass("mapreduce.job.output.key.class",
   MyDbOutputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.output.value.class",
   MyDbOutputFormatValueClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.partitioner.class",
   MyPartitionerClass, Object.class);
myHadoopConfiguration.setInt("mapreduce.job.reduces", 2);You will need to set OutputFormat key and value class (i.e. “mapreduce.job.output.key.class” and “mapreduce.job.output.value.class”) in Hadoop Configuration which are equal to KeyT and ValueT. If you set different OutputFormat key or value class than OutputFormat’s actual key or value class then, it will throw IllegalArgumentException.
Batch writing
// Data which will we want to write
PCollection<KV<Text, LongWritable>> boundedWordsCount = ...
// Hadoop configuration for write
// We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
Configuration myHadoopConfiguration = ...
// Path to directory with locks
String locksDirPath = ...;
boundedWordsCount.apply(
    "writeBatch",
    HadoopFormatIO.<Text, LongWritable>write()
        .withConfiguration(myHadoopConfiguration)
        .withPartitioning()
        .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));Stream writing
// Data which will we want to write
PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;
// Transformation which transforms data of one window into one hadoop configuration
PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
  configTransform = ...;
unboundedWordsCount.apply(
  "writeStream",
  HadoopFormatIO.<Text, LongWritable>write()
      .withConfigurationTransform(configTransform)
      .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));Last updated on 2025/10/20
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!
 

 
