Class HCatalogIO
WARNING:This package requires users to declare their own dependency on org.apache.hive:hive-exec and org.apache.hive.hcatalog. At the time of this Beam release every released version of those packages had a transitive dependency on a version of log4j vulnerable to CVE-2021-44228. We strongly encourage users to pin a non-vulnerable version of log4j when using this package. See Issue #21426.
Reading using HCatalog
HCatalog source supports reading of HCatRecord from a HCatalog managed source, for eg. Hive.
To configure a HCatalog source, you must specify a metastore URI and a table name. Other optional parameters are database & filter. For instance:
Map<String, String> configProperties = new HashMap<>();
configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
pipeline
.apply(HCatalogIO.read()
.withConfigProperties(configProperties)
.withDatabase("default") //optional, assumes default if none specified
.withTable("employee")
.withFilter(filterString) //optional, may be specified if the table is partitioned
HCatalog source supports reading of HCatRecord in an unbounded mode. When run in an unbounded mode, HCatalogIO will continuously poll for new partitions and read that data. If provided with a termination condition, it will stop reading data after the condition is met.
pipeline
.apply(HCatalogIO.read()
.withConfigProperties(configProperties)
.withDatabase("default") //optional, assumes default if none specified
.withTable("employee")
.withPollingInterval(Duration.millis(15000)) // poll for new partitions every 15 seconds
.withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000)))) //optional
Writing using HCatalog
HCatalog sink supports writing of HCatRecord to a HCatalog managed source, for eg. Hive.
To configure a HCatalog sink, you must specify a metastore URI and a table name. Other optional parameters are database, partition & batchsize. The destination table should exist beforehand, the transform does not create a new table if it does not exist. For instance:
Map<String, String> configProperties = new HashMap<>();
configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
pipeline
.apply(...)
.apply(HCatalogIO.write()
.withConfigProperties(configProperties)
.withDatabase("default") //optional, assumes default if none specified
.withTable("employee")
.withPartition(partitionValues) //optional, may be specified if the table is partitioned
.withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
APTransform
to read data using HCatalog.static class
APTransform
to write to a HCatalog managed source. -
Method Summary
Modifier and TypeMethodDescriptionstatic HCatalogIO.Read
read()
Read data from Hive.static HCatalogIO.Write
write()
Write data to Hive.
-
Method Details
-
write
Write data to Hive. -
read
Read data from Hive.
-