@Internal public class IcebergIO extends java.lang.Object
IcebergIO
is offered as a Managed
transform. This class is subject to change
and should not be used directly. Instead, use it via Managed.ICEBERG
like so:
Map<String, Object> config = Map.of(
"table", table,
"triggering_frequency_seconds", 5,
"catalog_name", name,
"catalog_properties", Map.of(
"warehouse", warehouse_path,
"catalog-impl", "org.apache.iceberg.hive.HiveCatalog"),
"config_properties", Map.of(
"hive.metastore.uris", metastore_uri));
pipeline
.apply(Create.of(BEAM_ROWS))
.apply(Managed.write(ICEBERG).withConfig(config));
// ====== READ ======
pipeline
.apply(Managed.read(ICEBERG).withConfig(config))
.getSinglePCollection()
.apply(ParDo.of(...));
Parameter | Type | Description |
table | str | Required. A fully-qualified table identifier. You may also provide a template to use dynamic destinations (see the `Dynamic Destinations` section below for details). |
triggering_frequency_seconds | int | Required for streaming writes. Roughly every
triggering_frequency_seconds duration, the sink will write records to data files and produce a table snapshot.
Generally, a higher value will produce fewer, larger data files.
|
catalog_name | str | The name of the catalog. Defaults to apache-beam-<VERSION> . |
catalog_properties | map<str, str> | A map of properties to be used when constructing the Iceberg catalog. Required properties will depend on what catalog you are using, but this list is a good starting point. |
config_properties | map<str, str> | A map of properties
to instantiate the catalog's Hadoop Configuration . Required properties will depend on your catalog
implementation, but this list
is a good starting point.
|
Additional configuration options are provided in the `Pre-filtering Options` section below, for Iceberg writes.
Being a Managed transform, this IO exclusively writes and reads using Beam Row
s.
Conversion takes place between Beam Row
s and Iceberg Record
s using helper methods
in IcebergUtils
. Below is a type conversion table mapping Beam and Iceberg types:
Beam Schema.FieldType | Iceberg Type
|
BYTES | BINARY |
BOOLEAN | BOOLEAN |
STRING | STRING |
INT32 | INTEGER |
INT64 | LONG |
DECIMAL | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATETIME | STRING |
ITERABLE | LIST |
ARRAY | LIST |
MAP | MAP |
ROW | STRUCT |
Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
identifier template for the table
parameter. A template should have placeholders
represented as curly braces containing a record field name, e.g.: "my_namespace.my_{foo}_table"
.
The sink uses simple String interpolation to determine a record's table destination. The
placeholder is replaced with the record's field value. Nested fields can be specified using
dot-notation (e.g. "{top.middle.nested}"
).
Some use cases may benefit from filtering record fields right before the write operation. For example, you may want to provide meta-data to guide records to the right destination, but not necessarily write that meta-data to your table. Some light-weight filtering options are provided to accommodate such cases, allowing you to control what actually gets written:
Parameter | Type | Description |
drop | list<str> | Drops the specified fields. |
keep | list<str> | Keeps the specified fields and drops the rest. |
only | str | Use this to specify a nested record you intend to write. That record wll be written and the rest will be dropped. |
Example write to dynamic destinations (pseudocode):
Map<String, Object> config = Map.of(
"table", "flights.{country}.{airport}",
"catalog_properties", Map.of(...),
"drop", ["country", "airport"]);
JSON_ROWS = [
// first record is written to table "flights.usa.RDU"
"{\"country\": \"usa\"," +
"\"airport\": \"RDU\"," +
"\"flight_id\": \"AA356\"," +
"\"destination\": \"SFO\"," +
"\"meal\": \"chicken alfredo\"}",
// second record is written to table "flights.qatar.HIA"
"{\"country\": \"qatar\"," +
"\"airport\": \"HIA\"," +
"\"flight_id\": \"QR 875\"," +
"\"destination\": \"DEL\"," +
"\"meal\": \"shawarma\"}",
...
];
// fields "country" and "airport" are dropped before
// records are written to tables
pipeline
.apply(Create.of(JSON_ROWS))
.apply(JsonToRow.withSchema(...))
.apply(Managed.write(ICEBERG).withConfig(config));
When records are written and committed to a table, a snapshot is produced. A batch pipeline
will perform a single commit and create a single snapshot per table. A streaming pipeline will
produce a snapshot roughly according to the configured triggering_frequency_seconds
.
You can access these snapshots and perform downstream processing by fetching the "snapshots"
output PCollection:
pipeline
.apply(Create.of(BEAM_ROWS))
.apply(Managed.write(ICEBERG).withConfig(config))
.get("snapshots")
.apply(ParDo.of(new DoFn<Row, T> {...});
Each Snapshot is represented as a Beam Row, with the following Schema:
Field | Type | Description |
table | str | Table identifier. |
manifest_list_location | str | Location of the snapshot's manifest list. |
operation | str | Name of the operation that produced the snapshot. |
parent_id | long | The snapshot's parent ID. |
schema_id | int | The id of the schema used when the snapshot was created. |
summary | map<str, str> | A string map of summary data. |
timestamp_millis | long | The snapshot's timestamp in milliseconds. |
For internal use only; no backwards compatibility guarantees
Modifier and Type | Class and Description |
---|---|
static class |
IcebergIO.ReadRows |
static class |
IcebergIO.WriteRows |
Constructor and Description |
---|
IcebergIO() |
Modifier and Type | Method and Description |
---|---|
static IcebergIO.ReadRows |
readRows(IcebergCatalogConfig catalogConfig) |
static IcebergIO.WriteRows |
writeRows(IcebergCatalogConfig catalog) |
public static IcebergIO.WriteRows writeRows(IcebergCatalogConfig catalog)
public static IcebergIO.ReadRows readRows(IcebergCatalogConfig catalogConfig)