Class IcebergIO

java.lang.Object
org.apache.beam.sdk.io.iceberg.IcebergIO

@Internal public class IcebergIO extends Object
A connector that reads and writes to Apache Iceberg tables.

IcebergIO is offered as a Managed transform. This class is subject to change and should not be used directly. Instead, use it like so:


 Map<String, Object> config = Map.of(
         "table", table,
         "catalog_name", name,
         "catalog_properties", Map.of(
                 "warehouse", warehouse_path,
                 "catalog-impl", "org.apache.iceberg.hive.HiveCatalog"),
         "config_properties", Map.of(
                 "hive.metastore.uris", metastore_uri));


 ====== WRITE ======
 pipeline
     .apply(Create.of(BEAM_ROWS))
     .apply(Managed.write(ICEBERG).withConfig(config));


 ====== READ ======
 pipeline
     .apply(Managed.read(ICEBERG).withConfig(config))
     .getSinglePCollection()
     .apply(ParDo.of(...));


 ====== READ CDC ======
 pipeline
     .apply(Managed.read(ICEBERG_CDC).withConfig(config))
     .getSinglePCollection()
     .apply(ParDo.of(...));
 
Look for more detailed examples below.

Configuration Options

Please check the Managed IO configuration page

Beam Rows

Being a Managed transform, this IO exclusively writes and reads using Beam Rows. Conversion takes place between Beam Rows and Iceberg Records using helper methods in IcebergUtils. Below is the mapping between Beam and Iceberg types:

Beam Schema.FieldType Iceberg Type
BYTES BINARY
BOOLEAN BOOLEAN
STRING STRING
INT32 INTEGER
INT64 LONG
DECIMAL STRING
FLOAT FLOAT
DOUBLE DOUBLE
SqlTypes.DATETIME TIMESTAMP
DATETIME TIMESTAMPTZ
SqlTypes.DATE DATE
SqlTypes.TIME TIME
ITERABLE LIST
ARRAY LIST
MAP MAP
ROW STRUCT

Note: SqlTypes are Beam logical types.

Note on timestamps

For an existing table, the following Beam types are supported for both timestamp and timestamptz:

  • SqlTypes.DATETIME --> Using a LocalDateTime object
  • DATETIME --> Using a DateTime object
  • INT64 --> Using a Long representing micros since EPOCH
  • STRING --> Using a timestamp String representation (e.g. "2024-10-08T13:18:20.053+03:27")

Note: If you expect Beam to create the Iceberg table at runtime, please provide SqlTypes.DATETIME for a timestamp column and DATETIME for a timestamptz column. If the table does not exist, Beam will treat STRING and INT64 at face-value and create equivalent column types.

For Iceberg reads, the connector will produce Beam SqlTypes.DATETIME types for Iceberg's timestamp and DATETIME types for timestamptz.

Writing to Tables

Creating Tables

If an Iceberg table does not exist at the time of writing, this connector will automatically create one with the data's schema.

Note that this is a best-effort operation that depends on the Catalog implementation. Some implementations may not support creating a table using the Iceberg API.

Dynamic Destinations

Managed Iceberg supports writing to dynamic destinations. To do so, please provide an identifier template for the table parameter. A template should have placeholders represented as curly braces containing a record field name, e.g.: "my_namespace.my_{foo}_table".

The sink uses simple String interpolation to determine a record's table destination. The placeholder is replaced with the record's field value. Nested fields can be specified using dot-notation (e.g. "{top.middle.nested}").

Pre-filtering Options

Some use cases may benefit from filtering record fields right before the write operation. For example, you may want to provide meta-data to guide records to the right destination, but not necessarily write that meta-data to your table. Some light-weight filtering options are provided to accommodate such cases, allowing you to control what actually gets written (see drop, keep, only}).

Example write to dynamic destinations (pseudocode):


 Map<String, Object> config = Map.of(
         "table", "flights.{country}.{airport}",
         "catalog_properties", Map.of(...),
         "drop", ["country", "airport"]);

 JSON_ROWS = [
       // first record is written to table "flights.usa.RDU"
         "{\"country\": \"usa\"," +
          "\"airport\": \"RDU\"," +
          "\"flight_id\": \"AA356\"," +
          "\"destination\": \"SFO\"," +
          "\"meal\": \"chicken alfredo\"}",
       // second record is written to table "flights.qatar.HIA"
         "{\"country\": \"qatar\"," +
          "\"airport\": \"HIA\"," +
          "\"flight_id\": \"QR 875\"," +
          "\"destination\": \"DEL\"," +
          "\"meal\": \"shawarma\"}",
          ...
          ];

 // fields "country" and "airport" are dropped before
 // records are written to tables
 pipeline
     .apply(Create.of(JSON_ROWS))
     .apply(JsonToRow.withSchema(...))
     .apply(Managed.write(ICEBERG).withConfig(config));

 

Output Snapshots

When records are written and committed to a table, a snapshot is produced. A batch pipeline will perform a single commit and create a single snapshot per table. A streaming pipeline will produce a snapshot roughly according to the configured triggering_frequency_seconds.

You can access these snapshots and perform downstream processing by fetching the "snapshots" output PCollection:


 pipeline
     .apply(Create.of(BEAM_ROWS))
     .apply(Managed.write(ICEBERG).withConfig(config))
     .get("snapshots")
     .apply(ParDo.of(new DoFn<Row, T> {...});
 
Each Snapshot is represented as a Beam Row, with the following Schema:
Field Type Description
table str Table identifier.
manifest_list_location str Location of the snapshot's manifest list.
operation str Name of the operation that produced the snapshot.
parent_id long The snapshot's parent ID.
schema_id int The id of the schema used when the snapshot was created.
summary map<str, str> A string map of summary data.
timestamp_millis long The snapshot's timestamp in milliseconds.


Reading from Tables

With the following configuration,

 Map<String, Object> config = Map.of(
         "table", table,
         "catalog_name", name,
         "catalog_properties", Map.of(...),
         "config_properties", Map.of(...));
 
Example of a simple batch read:

 PCollection<Row> rows = pipeline
     .apply(Managed.read(ICEBERG).withConfig(config))
     .getSinglePCollection();
 
Example of a simple CDC streaming read:

 PCollection<Row> rows = pipeline
     .apply(Managed.read(ICEBERG_CDC).withConfig(config))
     .getSinglePCollection();
 

Note: This reads append-only snapshots. Full CDC is not supported yet.

The CDC streaming source (enabled with streaming=true) continuously polls the table for new snapshots, with a default interval of 60 seconds. This can be overridden with poll_interval_seconds:


 config.put("streaming", true);
 config.put("poll_interval_seconds", 10);
 

Choosing a Starting Point (ICEBERG_CDC only)

By default, a batch read will start reading from the earliest (oldest) table snapshot. A streaming read will start reading from the latest (most recent) snapshot. This behavior can be overridden in a few mutually exclusive ways:
  • Manually setting a starting strategy with starting_strategy to be "earliest" or "latest".
  • Setting a starting snapshot id with from_snapshot.
  • Setting a starting timestamp (milliseconds) with from_timestamp.

For example:


 Map<String, Object> config = Map.of(
         "table", table,
         "catalog_name", name,
         "catalog_properties", Map.of(...),
         "config_properties", Map.of(...),
         "streaming", true,
         "from_snapshot", 123456789L);

 PCollection<Row> = pipeline
     .apply(Managed.read(ICEBERG_CDC).withConfig(config))
     .getSinglePCollection();
 

Choosing an End Point (ICEBERG_CDC only)

By default, a batch read will go up until the most recent table snapshot. A streaming read will continue monitoring the table for new snapshots forever. This can be overridden with one of the following options:
  • Setting an ending snapshot id with to_snapshot.
  • Setting an ending timestamp (milliseconds) with to_timestamp.

For example:


 Map<String, Object> config = Map.of(
         "table", table,
         "catalog_name", name,
         "catalog_properties", Map.of(...),
         "config_properties", Map.of(...),
         "from_snapshot", 123456789L,
         "to_timestamp", 987654321L);

 PCollection<Row> = pipeline
     .apply(Managed.read(ICEBERG_CDC).withConfig(config))
     .getSinglePCollection();
 
Note: If streaming=true and an end point is set, the pipeline will run in streaming mode and shut down automatically after processing the final snapshot.