@Experimental(value=SOURCE_SINK) public class HCatalogIO extends java.lang.Object
WARNING:This package requires users to declare their own dependency on org.apache.hive:hive-exec and org.apache.hive.hcatalog. At the time of this Beam release every released version of those packages had a transitive dependency on a version of log4j vulnerable to CVE-2021-44228. We strongly encourage users to pin a non-vulnerable version of log4j when using this package. See Issue #21426.
HCatalog source supports reading of HCatRecord from a HCatalog managed source, for eg. Hive.
To configure a HCatalog source, you must specify a metastore URI and a table name. Other optional parameters are database & filter. For instance:
Map<String, String> configProperties = new HashMap<>();
configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
pipeline
.apply(HCatalogIO.read()
.withConfigProperties(configProperties)
.withDatabase("default") //optional, assumes default if none specified
.withTable("employee")
.withFilter(filterString) //optional, may be specified if the table is partitioned
HCatalog source supports reading of HCatRecord in an unbounded mode. When run in an unbounded mode, HCatalogIO will continuously poll for new partitions and read that data. If provided with a termination condition, it will stop reading data after the condition is met.
pipeline
.apply(HCatalogIO.read()
.withConfigProperties(configProperties)
.withDatabase("default") //optional, assumes default if none specified
.withTable("employee")
.withPollingInterval(Duration.millis(15000)) // poll for new partitions every 15 seconds
.withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000)))) //optional
HCatalog sink supports writing of HCatRecord to a HCatalog managed source, for eg. Hive.
To configure a HCatalog sink, you must specify a metastore URI and a table name. Other optional parameters are database, partition & batchsize. The destination table should exist beforehand, the transform does not create a new table if it does not exist. For instance:
Map<String, String> configProperties = new HashMap<>();
configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
pipeline
.apply(...)
.apply(HCatalogIO.write()
.withConfigProperties(configProperties)
.withDatabase("default") //optional, assumes default if none specified
.withTable("employee")
.withPartition(partitionValues) //optional, may be specified if the table is partitioned
.withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified
Modifier and Type | Class and Description |
---|---|
static class |
HCatalogIO.Read
A
PTransform to read data using HCatalog. |
static class |
HCatalogIO.Write
A
PTransform to write to a HCatalog managed source. |
Modifier and Type | Method and Description |
---|---|
static HCatalogIO.Read |
read()
Read data from Hive.
|
static HCatalogIO.Write |
write()
Write data to Hive.
|
public static HCatalogIO.Write write()
public static HCatalogIO.Read read()