@Internal public class IcebergIO extends java.lang.Object
IcebergIO
is offered as a Managed transform. This class is subject to change and
should not be used directly. Instead, use it like so:
Map<String, Object> config = Map.of(
"table", table,
"catalog_name", name,
"catalog_properties", Map.of(
"warehouse", warehouse_path,
"catalog-impl", "org.apache.iceberg.hive.HiveCatalog"),
"config_properties", Map.of(
"hive.metastore.uris", metastore_uri));
====== WRITE ======
pipeline
.apply(Create.of(BEAM_ROWS))
.apply(Managed.write(ICEBERG).withConfig(config));
====== READ ======
pipeline
.apply(Managed.read(ICEBERG).withConfig(config))
.getSinglePCollection()
.apply(ParDo.of(...));
====== READ CDC ======
pipeline
.apply(Managed.read(ICEBERG_CDC).withConfig(config))
.getSinglePCollection()
.apply(ParDo.of(...));
Look for more detailed examples below.
Parameter | Type | Description |
table | str | Required. A fully-qualified table identifier. You may also provide a template to use dynamic destinations (see the `Dynamic Destinations` section below for details). |
catalog_name | str | The name of the catalog. Defaults to apache-beam-<VERSION> . |
catalog_properties | map<str, str> | A map of properties to be used when constructing the Iceberg catalog. Required properties will depend on what catalog you are using, but this list is a good starting point. |
config_properties | map<str, str> | A map of properties
to instantiate the catalog's Hadoop Configuration . Required properties will depend on your catalog
implementation, but this list
is a good starting point.
|
Parameter | Type | Description |
triggering_frequency_seconds |
int |
Required for streaming writes. Roughly every
triggering_frequency_seconds duration, the sink will write records to data files and produce a table snapshot.
Generally, a higher value will produce fewer, larger data files.
|
drop | list<str> | A list of fields to drop before writing to table(s). |
keep | list<str> | A list of fields to keep, dropping the rest before writing to table(s). |
only | str | A nested record field that should be the only thing written to table(s). |
Parameter | Type | Description |
streaming |
boolean |
Enables streaming reads. The source will continuously poll for snapshots forever. |
poll_interval_seconds |
int |
The interval at which to scan the table for new snapshots. Defaults to 60 seconds. Only applicable for streaming reads. |
from_snapshot |
long |
Starts reading from this snapshot ID (inclusive). |
to_snapshot |
long |
Reads up to this snapshot ID (inclusive). By default, batch reads will read up to the latest snapshot (inclusive), while streaming reads will continue polling for new snapshots forever. |
from_timestamp |
long |
Starts reading from the earliest snapshot (inclusive) created after this timestamp (in milliseconds). |
to_timestamp |
long |
Reads up to the latest snapshot (inclusive) created before this timestamp (in milliseconds). By default, batch reads will read up to the latest snapshot (inclusive), while streaming reads will continue polling for new snapshots forever. |
starting_strategy |
str |
The source's starting strategy. Valid options are:
Defaults to |
Being a Managed transform, this IO exclusively writes and reads using Beam Row
s.
Conversion takes place between Beam Row
s and Iceberg Record
s using helper methods
in IcebergUtils
. Below is the mapping between Beam and Iceberg types:
Beam Schema.FieldType | Iceberg Type
|
BYTES | BINARY |
BOOLEAN | BOOLEAN |
STRING | STRING |
INT32 | INTEGER |
INT64 | LONG |
DECIMAL | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
SqlTypes.DATETIME | TIMESTAMP |
DATETIME | TIMESTAMPTZ |
SqlTypes.DATE | DATE |
SqlTypes.TIME | TIME |
ITERABLE | LIST |
ARRAY | LIST |
MAP | MAP |
ROW | STRUCT |
Note: SqlTypes
are Beam logical types.
For an existing table, the following Beam types are supported for both timestamp
and
timestamptz
:
SqlTypes.DATETIME
--> Using a LocalDateTime
object
DATETIME
--> Using a DateTime
object
INT64
--> Using a Long
representing micros since EPOCH
STRING
--> Using a timestamp String
representation (e.g. "2024-10-08T13:18:20.053+03:27"
)
Note: If you expect Beam to create the Iceberg table at runtime, please provide SqlTypes.DATETIME
for a timestamp
column and DATETIME
for a timestamptz
column. If the table does not exist, Beam will treat STRING
and INT64
at
face-value and create equivalent column types.
For Iceberg reads, the connector will produce Beam SqlTypes.DATETIME
types for
Iceberg's timestamp
and DATETIME
types for timestamptz
.
If an Iceberg table does not exist at the time of writing, this connector will automatically create one with the data's schema.
Note that this is a best-effort operation that depends on the Catalog
implementation.
Some implementations may not support creating a table using the Iceberg API.
Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
identifier template for the table
parameter. A template should have placeholders
represented as curly braces containing a record field name, e.g.: "my_namespace.my_{foo}_table"
.
The sink uses simple String interpolation to determine a record's table destination. The
placeholder is replaced with the record's field value. Nested fields can be specified using
dot-notation (e.g. "{top.middle.nested}"
).
Some use cases may benefit from filtering record fields right before the write operation. For
example, you may want to provide meta-data to guide records to the right destination, but not
necessarily write that meta-data to your table. Some light-weight filtering options are provided
to accommodate such cases, allowing you to control what actually gets written (see drop
, keep
, only
}).
Example write to dynamic destinations (pseudocode):
Map<String, Object> config = Map.of(
"table", "flights.{country}.{airport}",
"catalog_properties", Map.of(...),
"drop", ["country", "airport"]);
JSON_ROWS = [
// first record is written to table "flights.usa.RDU"
"{\"country\": \"usa\"," +
"\"airport\": \"RDU\"," +
"\"flight_id\": \"AA356\"," +
"\"destination\": \"SFO\"," +
"\"meal\": \"chicken alfredo\"}",
// second record is written to table "flights.qatar.HIA"
"{\"country\": \"qatar\"," +
"\"airport\": \"HIA\"," +
"\"flight_id\": \"QR 875\"," +
"\"destination\": \"DEL\"," +
"\"meal\": \"shawarma\"}",
...
];
// fields "country" and "airport" are dropped before
// records are written to tables
pipeline
.apply(Create.of(JSON_ROWS))
.apply(JsonToRow.withSchema(...))
.apply(Managed.write(ICEBERG).withConfig(config));
When records are written and committed to a table, a snapshot is produced. A batch pipeline
will perform a single commit and create a single snapshot per table. A streaming pipeline will
produce a snapshot roughly according to the configured triggering_frequency_seconds
.
You can access these snapshots and perform downstream processing by fetching the "snapshots"
output PCollection:
pipeline
.apply(Create.of(BEAM_ROWS))
.apply(Managed.write(ICEBERG).withConfig(config))
.get("snapshots")
.apply(ParDo.of(new DoFn<Row, T> {...});
Each Snapshot is represented as a Beam Row, with the following Schema:
Field | Type | Description |
table | str | Table identifier. |
manifest_list_location | str | Location of the snapshot's manifest list. |
operation | str | Name of the operation that produced the snapshot. |
parent_id | long | The snapshot's parent ID. |
schema_id | int | The id of the schema used when the snapshot was created. |
summary | map<str, str> | A string map of summary data. |
timestamp_millis | long | The snapshot's timestamp in milliseconds. |
Map<String, Object> config = Map.of(
"table", table,
"catalog_name", name,
"catalog_properties", Map.of(...),
"config_properties", Map.of(...));
Example of a simple batch read:
PCollection<Row> rows = pipeline
.apply(Managed.read(ICEBERG).withConfig(config))
.getSinglePCollection();
Example of a simple CDC streaming read:
PCollection<Row> rows = pipeline
.apply(Managed.read(ICEBERG_CDC).withConfig(config))
.getSinglePCollection();
Note: This reads append-only snapshots. Full CDC is not supported yet.
The CDC streaming source (enabled with streaming=true
) continuously polls the
table for new snapshots, with a default interval of 60 seconds. This can be overridden with
poll_interval_seconds
:
config.put("streaming", true);
config.put("poll_interval_seconds", 10);
starting_strategy
to be "earliest"
or "latest"
.
from_snapshot
.
from_timestamp
.
For example:
Map<String, Object> config = Map.of(
"table", table,
"catalog_name", name,
"catalog_properties", Map.of(...),
"config_properties", Map.of(...),
"streaming", true,
"from_snapshot", 123456789L);
PCollection<Row> = pipeline
.apply(Managed.read(ICEBERG_CDC).withConfig(config))
.getSinglePCollection();
to_snapshot
.
to_timestamp
.
For example:
Map<String, Object> config = Map.of(
"table", table,
"catalog_name", name,
"catalog_properties", Map.of(...),
"config_properties", Map.of(...),
"from_snapshot", 123456789L,
"to_timestamp", 987654321L);
PCollection<Row> = pipeline
.apply(Managed.read(ICEBERG_CDC).withConfig(config))
.getSinglePCollection();
Note: If streaming=true
and an end point is set, the pipeline will run in
streaming mode and shut down automatically after processing the final snapshot.Modifier and Type | Class and Description |
---|---|
static class |
IcebergIO.ReadRows |
static class |
IcebergIO.WriteRows |
Constructor and Description |
---|
IcebergIO() |
Modifier and Type | Method and Description |
---|---|
static IcebergIO.ReadRows |
readRows(IcebergCatalogConfig catalogConfig) |
static IcebergIO.WriteRows |
writeRows(IcebergCatalogConfig catalog) |
public static IcebergIO.WriteRows writeRows(IcebergCatalogConfig catalog)
public static IcebergIO.ReadRows readRows(IcebergCatalogConfig catalogConfig)