Class HCatalogIO

java.lang.Object
org.apache.beam.sdk.io.hcatalog.HCatalogIO

public class HCatalogIO extends Object
IO to read and write data using HCatalog.

WARNING:This package requires users to declare their own dependency on org.apache.hive:hive-exec and org.apache.hive.hcatalog. At the time of this Beam release every released version of those packages had a transitive dependency on a version of log4j vulnerable to CVE-2021-44228. We strongly encourage users to pin a non-vulnerable version of log4j when using this package. See Issue #21426.

Reading using HCatalog

HCatalog source supports reading of HCatRecord from a HCatalog managed source, for eg. Hive.

To configure a HCatalog source, you must specify a metastore URI and a table name. Other optional parameters are database & filter. For instance:


 Map<String, String> configProperties = new HashMap<>();
 configProperties.put("hive.metastore.uris","thrift://metastore-host:port");

 pipeline
   .apply(HCatalogIO.read()
       .withConfigProperties(configProperties)
       .withDatabase("default") //optional, assumes default if none specified
       .withTable("employee")
       .withFilter(filterString) //optional, may be specified if the table is partitioned
 

HCatalog source supports reading of HCatRecord in an unbounded mode. When run in an unbounded mode, HCatalogIO will continuously poll for new partitions and read that data. If provided with a termination condition, it will stop reading data after the condition is met.


 pipeline
   .apply(HCatalogIO.read()
       .withConfigProperties(configProperties)
       .withDatabase("default") //optional, assumes default if none specified
       .withTable("employee")
       .withPollingInterval(Duration.millis(15000)) // poll for new partitions every 15 seconds
       .withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000)))) //optional
 

Writing using HCatalog

HCatalog sink supports writing of HCatRecord to a HCatalog managed source, for eg. Hive.

To configure a HCatalog sink, you must specify a metastore URI and a table name. Other optional parameters are database, partition & batchsize. The destination table should exist beforehand, the transform does not create a new table if it does not exist. For instance:


 Map<String, String> configProperties = new HashMap<>();
 configProperties.put("hive.metastore.uris","thrift://metastore-host:port");

 pipeline
   .apply(...)
   .apply(HCatalogIO.write()
       .withConfigProperties(configProperties)
       .withDatabase("default") //optional, assumes default if none specified
       .withTable("employee")
       .withPartition(partitionValues) //optional, may be specified if the table is partitioned
       .withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified