@Internal public class IcebergIO extends java.lang.Object
IcebergIO
is offered as a Managed transform. This class is subject to change and
should not be used directly. Instead, use it like so:
Map<String, Object> config = Map.of(
"table", table,
"catalog_name", name,
"catalog_properties", Map.of(
"warehouse", warehouse_path,
"catalog-impl", "org.apache.iceberg.hive.HiveCatalog"),
"config_properties", Map.of(
"hive.metastore.uris", metastore_uri));
====== WRITE ======
pipeline
.apply(Create.of(BEAM_ROWS))
.apply(Managed.write(ICEBERG).withConfig(config));
====== READ ======
pipeline
.apply(Managed.read(ICEBERG).withConfig(config))
.getSinglePCollection()
.apply(ParDo.of(...));
====== READ CDC ======
pipeline
.apply(Managed.read(ICEBERG_CDC).withConfig(config))
.getSinglePCollection()
.apply(ParDo.of(...));
Look for more detailed examples below.
Being a Managed transform, this IO exclusively writes and reads using Beam Row
s.
Conversion takes place between Beam Row
s and Iceberg Record
s using helper methods
in IcebergUtils
. Below is the mapping between Beam and Iceberg types:
Beam Schema.FieldType | Iceberg Type
|
BYTES | BINARY |
BOOLEAN | BOOLEAN |
STRING | STRING |
INT32 | INTEGER |
INT64 | LONG |
DECIMAL | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
SqlTypes.DATETIME | TIMESTAMP |
DATETIME | TIMESTAMPTZ |
SqlTypes.DATE | DATE |
SqlTypes.TIME | TIME |
ITERABLE | LIST |
ARRAY | LIST |
MAP | MAP |
ROW | STRUCT |
Note: SqlTypes
are Beam logical types.
For an existing table, the following Beam types are supported for both timestamp
and
timestamptz
:
SqlTypes.DATETIME
--> Using a LocalDateTime
object
DATETIME
--> Using a DateTime
object
INT64
--> Using a Long
representing micros since EPOCH
STRING
--> Using a timestamp String
representation (e.g. "2024-10-08T13:18:20.053+03:27"
)
Note: If you expect Beam to create the Iceberg table at runtime, please provide SqlTypes.DATETIME
for a timestamp
column and DATETIME
for a timestamptz
column. If the table does not exist, Beam will treat STRING
and INT64
at
face-value and create equivalent column types.
For Iceberg reads, the connector will produce Beam SqlTypes.DATETIME
types for
Iceberg's timestamp
and DATETIME
types for timestamptz
.
If an Iceberg table does not exist at the time of writing, this connector will automatically create one with the data's schema.
Note that this is a best-effort operation that depends on the Catalog
implementation.
Some implementations may not support creating a table using the Iceberg API.
Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
identifier template for the table
parameter. A template should have placeholders
represented as curly braces containing a record field name, e.g.: "my_namespace.my_{foo}_table"
.
The sink uses simple String interpolation to determine a record's table destination. The
placeholder is replaced with the record's field value. Nested fields can be specified using
dot-notation (e.g. "{top.middle.nested}"
).
Some use cases may benefit from filtering record fields right before the write operation. For
example, you may want to provide meta-data to guide records to the right destination, but not
necessarily write that meta-data to your table. Some light-weight filtering options are provided
to accommodate such cases, allowing you to control what actually gets written (see drop
, keep
, only
}).
Example write to dynamic destinations (pseudocode):
Map<String, Object> config = Map.of(
"table", "flights.{country}.{airport}",
"catalog_properties", Map.of(...),
"drop", ["country", "airport"]);
JSON_ROWS = [
// first record is written to table "flights.usa.RDU"
"{\"country\": \"usa\"," +
"\"airport\": \"RDU\"," +
"\"flight_id\": \"AA356\"," +
"\"destination\": \"SFO\"," +
"\"meal\": \"chicken alfredo\"}",
// second record is written to table "flights.qatar.HIA"
"{\"country\": \"qatar\"," +
"\"airport\": \"HIA\"," +
"\"flight_id\": \"QR 875\"," +
"\"destination\": \"DEL\"," +
"\"meal\": \"shawarma\"}",
...
];
// fields "country" and "airport" are dropped before
// records are written to tables
pipeline
.apply(Create.of(JSON_ROWS))
.apply(JsonToRow.withSchema(...))
.apply(Managed.write(ICEBERG).withConfig(config));
When records are written and committed to a table, a snapshot is produced. A batch pipeline
will perform a single commit and create a single snapshot per table. A streaming pipeline will
produce a snapshot roughly according to the configured triggering_frequency_seconds
.
You can access these snapshots and perform downstream processing by fetching the "snapshots"
output PCollection:
pipeline
.apply(Create.of(BEAM_ROWS))
.apply(Managed.write(ICEBERG).withConfig(config))
.get("snapshots")
.apply(ParDo.of(new DoFn<Row, T> {...});
Each Snapshot is represented as a Beam Row, with the following Schema:
Field | Type | Description |
table | str | Table identifier. |
manifest_list_location | str | Location of the snapshot's manifest list. |
operation | str | Name of the operation that produced the snapshot. |
parent_id | long | The snapshot's parent ID. |
schema_id | int | The id of the schema used when the snapshot was created. |
summary | map<str, str> | A string map of summary data. |
timestamp_millis | long | The snapshot's timestamp in milliseconds. |
Map<String, Object> config = Map.of(
"table", table,
"catalog_name", name,
"catalog_properties", Map.of(...),
"config_properties", Map.of(...));
Example of a simple batch read:
PCollection<Row> rows = pipeline
.apply(Managed.read(ICEBERG).withConfig(config))
.getSinglePCollection();
Example of a simple CDC streaming read:
PCollection<Row> rows = pipeline
.apply(Managed.read(ICEBERG_CDC).withConfig(config))
.getSinglePCollection();
Note: This reads append-only snapshots. Full CDC is not supported yet.
The CDC streaming source (enabled with streaming=true
) continuously polls the
table for new snapshots, with a default interval of 60 seconds. This can be overridden with
poll_interval_seconds
:
config.put("streaming", true);
config.put("poll_interval_seconds", 10);
starting_strategy
to be "earliest"
or "latest"
.
from_snapshot
.
from_timestamp
.
For example:
Map<String, Object> config = Map.of(
"table", table,
"catalog_name", name,
"catalog_properties", Map.of(...),
"config_properties", Map.of(...),
"streaming", true,
"from_snapshot", 123456789L);
PCollection<Row> = pipeline
.apply(Managed.read(ICEBERG_CDC).withConfig(config))
.getSinglePCollection();
to_snapshot
.
to_timestamp
.
For example:
Map<String, Object> config = Map.of(
"table", table,
"catalog_name", name,
"catalog_properties", Map.of(...),
"config_properties", Map.of(...),
"from_snapshot", 123456789L,
"to_timestamp", 987654321L);
PCollection<Row> = pipeline
.apply(Managed.read(ICEBERG_CDC).withConfig(config))
.getSinglePCollection();
Note: If streaming=true
and an end point is set, the pipeline will run in
streaming mode and shut down automatically after processing the final snapshot.Modifier and Type | Class and Description |
---|---|
static class |
IcebergIO.ReadRows |
static class |
IcebergIO.WriteRows |
Constructor and Description |
---|
IcebergIO() |
Modifier and Type | Method and Description |
---|---|
static IcebergIO.ReadRows |
readRows(IcebergCatalogConfig catalogConfig) |
static IcebergIO.WriteRows |
writeRows(IcebergCatalogConfig catalog) |
public static IcebergIO.WriteRows writeRows(IcebergCatalogConfig catalog)
public static IcebergIO.ReadRows readRows(IcebergCatalogConfig catalogConfig)