Built-in I/O Transforms

Apache Iceberg I/O connector

The Beam SDKs include built-in transforms that can read data from and write data to Apache Iceberg tables.

To use IcebergIO, add the Maven artifact dependency to your pom.xml file.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.72.0</version>
</dependency>

To use IcebergIO, install the Beam SQL Shell and run the following command:

./beam-sql.sh --io iceberg

To use IcebergIO with Beam YAML, install the yaml extra:

pip install apache_beam[yaml]

If you’re new to Iceberg, check out the basics section under the guide.

Additional resources:

Quickstart with Public Datasets

We can jump straight into reading some high-quality public datasets served via Iceberg’s REST Catalog. These datasets are hosted on Google Cloud’s BigLake and are available to read by anyone, making it a good resource to experiment with.

There are some prerequisites to using the BigLake Catalog:

When you’ve met those prerequisites, start by setting up your catalog:

  CREATE CATALOG my_catalog TYPE 'iceberg'
  PROPERTIES (
    'type' = 'rest',
    'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
    'warehouse' = 'gs://biglake-public-nyc-taxi-iceberg',
    'header.x-goog-user-project' = '$PROJECT_ID',
    'rest.auth.type' = 'google',
    'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
    'header.X-Iceberg-Access-Delegation' = 'vended-credentials'
  );
  
  Map<String, String> catalogProps =
      ImmutableMap.of(
          "type", "rest",
          "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
          "warehouse", "gs://biglake-public-nyc-taxi-iceberg",
          "header.x-goog-user-project", PROJECT_ID,
          "rest.auth.type", "google",
          "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
          "header.X-Iceberg-Access-Delegation", "vended-credentials");
  
  biglake_catalog_props = {
      'type': 'rest',
      'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
      'warehouse': 'gs://biglake-public-nyc-taxi-iceberg',
      'header.x-goog-user-project': PROJECT_ID,
      'rest.auth.type': 'google',
      'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
      'header.X-Iceberg-Access-Delegation': 'vended-credentials'
  }
catalog_props: &catalog_props
  type: "rest"
  uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog"
  warehouse: "gs://biglake-public-nyc-taxi-iceberg"
  header.x-goog-user-project: *PROJECT_ID
  rest.auth.type: "google"
  io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
  header.X-Iceberg-Access-Delegation: "vended-credentials"

Now simply query the public dataset:

  SELECT
    passenger_count,
    COUNT(1) AS num_trips,
    ROUND(AVG(total_amount), 2) AS avg_fare,
    ROUND(AVG(trip_distance), 2) AS avg_distance
  FROM
    bqms.public_data.nyc_taxicab
  WHERE
    data_file_year = 2021
    AND tip_amount > 100
  GROUP BY
    passenger_count
  ORDER BY
    num_trips DESC;
  
  Pipeline p = Pipeline.create();
  
  // Set up query properties:
  Map<String, Object> config =
      ImmutableMap.of(
          "table",
          "public_data.nyc_taxicab",
          "catalog_properties",
          catalogProps,
          "filter",
          "data_file_year = 2021 AND tip_amount > 100",
          "keep",
          Arrays.asList("passenger_count", "total_amount", "trip_distance"));
  
  // Read Iceberg records
  PCollection<Row> icebergRows =
      p.apply(Managed.read("iceberg").withConfig(config)).getSinglePCollection();
  
  // Perform further analysis on records
  PCollection<Row> result =
      icebergRows
          .apply(AddFields.<Row>create().field("num_trips", Schema.FieldType.INT32, 1))
          .apply(
              Group.<Row>byFieldNames("passenger_count")
                  .aggregateField("num_trips", Sum.ofIntegers(), "num_trips")
                  .aggregateField("total_amount", Mean.of(), "avg_fare")
                  .aggregateField("trip_distance", Mean.of(), "avg_distance"));
  
  // Print to console
  result.apply(
      MapElements.into(TypeDescriptors.voids())
          .via(
              row -> {
                System.out.println(row);
                return null;
              }));
  
  // Execute
  p.run().waitUntilFinish();
  
  from statistics import mean
  
  config = {
      "table": "public_data.nyc_taxicab",
      "catalog_properties": biglake_catalog_props,
      "filter": "data_file_year = 2021 AND tip_amount > 100",
      "keep": ["passenger_count", "total_amount", "trip_distance"]
  }
  
  with beam.Pipeline() as p:
    rows = p | beam.managed.Read("iceberg", config=config)
  
    result = (
        rows | beam.Select(num_trips=lambda x: 1, *rows.element_type._fields)
        | beam.GroupBy('passenger_count').aggregate_field(
            'num_trips', sum, 'total_trips').aggregate_field(
                'total_amount', mean, 'avg_fare').aggregate_field(
                    'trip_distance', mean, 'avg_distance'))
  
    result | beam.Map(print)
  pipeline:
    type: chain
    transforms:
      - type: ReadFromIceberg
        config:
        table: "public_data.nyc_taxicab"
        catalog_properties: *biglake_catalog_props
        filter: "data_file_year = 2021 AND tip_amount > 100"
        keep: [ "passenger_count", "total_amount", "trip_distance" ]
      - type: Sql
        config:
          query: "SELECT
                    passenger_count,
                    COUNT(1) AS num_trips,
                    ROUND(AVG(total_amount), 2) AS avg_fare,
                    ROUND(AVG(trip_distance), 2) AS avg_distance
                  FROM
                    PCOLLECTION
                  GROUP BY
                    passenger_count"
  

User Guide

Choose Your Catalog

First, select a Catalog implementation to handle metadata management and storage interaction. Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two common paths: Hadoop for easy local development and BigLake for managing production data at cloud scale.

Use Hadoop Catalog for quick, local testing with zero setup and no external dependencies. The following examples use a temporary local directory.


    CREATE CATALOG my_catalog TYPE 'iceberg'
    PROPERTIES (
      'type' = 'hadoop',
      'warehouse' = 'file:///tmp/beam-iceberg-local-quickstart',
    );
  
    
    Map<String, String> catalogProps =
        ImmutableMap.of(
            "type", "hadoop",
            "warehouse", "file:///tmp/beam-iceberg-local-quickstart");

  
    
    hadoop_catalog_props = {
        'type': 'hadoop',
        'warehouse': 'file:///tmp/beam-iceberg-local-quickstart'
    }

  
    catalog_props: &catalog_props
      type: "hadoop"
      warehouse: "file:///tmp/beam-iceberg-local-quickstart"
  

Use BigLake Catalog for a fully managed REST-based experience. It simplifies access to cloud storage with built-in credential delegation and unified metadata management. It requires a few pre-requisites:

  CREATE CATALOG my_catalog TYPE 'iceberg'
  PROPERTIES (
    'type' = 'rest',
    'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
    'warehouse' = 'gs://$BUCKET_NAME',
    'header.x-goog-user-project' = '$PROJECT_ID',
    'rest.auth.type' = 'google',
    'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
    'header.X-Iceberg-Access-Delegation' = 'vended-credentials'
  );
  
  
  Map<String, String> catalogProps =
      ImmutableMap.of(
          "type", "rest",
          "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
          "warehouse", "gs://" + BUCKET_NAME,
          "header.x-goog-user-project", PROJECT_ID,
          "rest.auth.type", "google",
          "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
          "header.X-Iceberg-Access-Delegation", "vended-credentials");

  
  
  biglake_catalog_config = {
      'type': 'rest',
      'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
      'warehouse': 'gs://' + BUCKET_NAME,
      'header.x-goog-user-project': PROJECT_ID,
      'rest.auth.type': 'google',
      'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
      'header.X-Iceberg-Access-Delegation': 'vended-credentials'
  }

  
  catalog_props: &catalog_props
    type: "rest"
    uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog"
    warehouse: "gs://" + *BUCKET_NAME
    header.x-goog-user-project: *PROJECT_ID
    rest.auth.type: "google"
    io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
    header.X-Iceberg-Access-Delegation: "vended-credentials"
  

Create a Namespace

If you’re on Beam SQL, you can explicitly create a new namespace:

CREATE DATABASE my_catalog.my_db;

Alternatively, the IcebergIO sink will automatically create missing namespaces at runtime. This is ideal for dynamic pipelines where destinations are determined by the incoming data

Create a Table

Tables are defined by a schema and an optional partition spec. You can create a table using SQL DDL or by configuring the Iceberg destination in your Beam pipeline.

CREATE EXTERNAL TABLE my_catalog.my_db.my_table (
    id BIGINT,
    name VARCHAR,
    age INTEGER
)
TYPE 'iceberg'
Map<String, Object> managedConfig =
    ImmutableMap.of("table", "my_db.my_table", "catalog_properties", catalogProps);

// Note: The table will get created when inserting data (see below)
managed_config = {
    'table': 'my_db.my_table', 'catalog_properties': hadoop_catalog_props
}

# Note: The table will get created when inserting data (see below)
- type: WriteToIceberg
  config:
    table: "my_db.my_table"
    catalog_properties: *catalog_props

# Note: The table will get created when inserting data (see below)

Insert Data

Once your table is defined, you can write data using standard SQL INSERT or by calling the IcebergIO sink in your SDK of choice.

INSERT INTO my_catalog.my_db.my_table VALUES
    (1, 'Mark', 32),
    (2, 'Omar', 24),
    (3, 'Rachel', 27);
Schema inputSchema =
    Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();

Pipeline p = Pipeline.create();
p.apply(
        Create.of(
            Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(),
            Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(),
            Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build()))
    .apply(Managed.write("iceberg").withConfig(managedConfig));

p.run();
with beam.Pipeline() as p:
  (
      p
      | beam.Create([
          beam.Row(id=1, name="Mark", age=32),
          beam.Row(id=2, name="Omar", age=24),
          beam.Row(id=3, name="Rachel", age=27)
      ])
      | beam.managed.Write("iceberg", config=managed_config))
pipeline:
  type: chain
  transforms:
    - type: Create
      config:
        elements:
          - id: 1
            name: "Mark"
            age: 32
          - id: 2
            name: "Omar"
            age: 24
          - id: 3
            name: "Rachel"
            age: 27
    - type: WriteToIceberg
      config:
        table: "my_db.my_table"
        catalog_properties: *catalog_props

View Namespaces and Tables

If you’re on Beam SQL, you can view the newly created resources:

SHOW DATABASES my_catalog;
SHOW TABLES my_catalog.my_db;

Query Data

SELECT * FROM my_catalog.my_db.my_table;
Pipeline q = Pipeline.create();
PCollection<Row> rows =
    q.apply(Managed.read("iceberg").withConfig(managedConfig)).getSinglePCollection();

rows.apply(
    MapElements.into(TypeDescriptors.voids())
        .via(
            row -> {
              System.out.println(row);
              return null;
            }));

q.run();
with beam.Pipeline() as p:
  (
      p
      | beam.managed.Read("iceberg", config=managed_config)
      | beam.LogElements())
pipeline:
  type: chain
  transforms:
    - type: ReadFromIceberg
      config:
        table: "my_db.my_table"
        catalog_properties: *catalog_props
    - type: LogForTesting

Further steps

Check out the full IcebergIO configuration to make use of other features like applying a partition spec, table properties, row filtering, column pruning, etc.

Data Types

Check this overview of Iceberg data types.

IcebergIO leverages Beam Schemas to bridge the gap between SDK-native types and the Iceberg specification. While the Java SDK provides full coverage for the Iceberg v2 spec (with v3 support currently in development), other SDKs may have specific constraints on complex or experimental types. The following examples demonstrate the standard mapping for core data types across SQL, Java, Python, and YAML:

  INSERT INTO catalog.namespace.table VALUES (
    9223372036854775807, -- BIGINT
    2147483647,          -- INTEGER
    1.0,                 -- FLOAT
    1.0,                 -- DOUBLE
    TRUE,                -- BOOLEAN
    TIMESTAMP '2018-05-28 20:17:40.123', -- TIMESTAMP
    'varchar',           -- VARCHAR
    'char',              -- CHAR
    ARRAY['abc', 'xyz'],  -- ARRAY
    ARRAY[CAST(ROW('abc', 123) AS ROW(nested_str VARCHAR, nested_int INTEGER))] -- ARRAY[STRUCT]
  )
  
  import java.math.BigDecimal;
  import java.time.Instant;
  import java.time.LocalDate;
  import java.time.LocalTime;
  import java.util.Arrays;
  import org.apache.beam.sdk.schemas.Schema;
  import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
  import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
  import org.apache.beam.sdk.values.Row;
  import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
  import org.joda.time.DateTime;
  
  public class IcebergBeamSchemaAndRow {
    public Row createRow() {
      Schema nestedSchema =
          Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build();
      Schema beamSchema =
          Schema.builder()
              .addBooleanField("boolean_field")
              .addInt32Field("int_field")
              .addInt64Field("long_field")
              .addFloatField("float_field")
              .addDoubleField("double_field")
              .addDecimalField("numeric_field")
              .addByteArrayField("bytes_field")
              .addStringField("string_field")
              .addLogicalTypeField("time_field", SqlTypes.TIME)
              .addLogicalTypeField("date_field", SqlTypes.DATE)
              .addLogicalTypeField("timestamp_field", Timestamp.MICROS)
              .addDateTimeField("timestamptz_field")
              .addArrayField("array_field", Schema.FieldType.INT32)
              .addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32)
              .addRowField("struct_field", nestedSchema)
              .build();
  
      Row beamRow =
          Row.withSchema(beamSchema)
              .withFieldValues(
                  ImmutableMap.<String, Object>builder()
                      .put("boolean_field", true)
                      .put("int_field", 1)
                      .put("long_field", 2L)
                      .put("float_field", 3.4f)
                      .put("double_field", 4.5d)
                      .put("numeric_field", new BigDecimal(67))
                      .put("bytes_field", new byte[] {1, 2, 3})
                      .put("string_field", "value")
                      .put("time_field", LocalTime.now())
                      .put("date_field", LocalDate.now())
                      .put("timestamp_field", Instant.now())
                      .put("timestamptz_field", DateTime.now())
                      .put("array_field", Arrays.asList(1, 2, 3))
                      .put("map_field", ImmutableMap.of("a", 1, "b", 2))
                      .put(
                          "struct_field",
                          Row.withSchema(nestedSchema).addValues("nested_value", 123).build())
                      .build())
              .build();
  
      return beamRow;
    }
  }
  
  import apache_beam as beam
  from apache_beam.utils.timestamp import Timestamp
  
  row = beam.Row(
      boolean_field=True,
      int_field=1,
      long_field=2,
      float_field=3.45,
      bytes_field=b'value',
      string_field="value",
      timestamptz_field=Timestamp(4, 5),
      array_field=[1, 2, 3],
      map_field={
          "a": 1, "b": 2
      },
      struct_field=beam.Row(nested_field="nested_value", nested_field2=123))
  
  import numpy as np
  from apache_beam.typehints.row_type import RowTypeConstraint
  from typing import Sequence
  
  # Override data schema by adding `with_output_types` to the transform:
  beam.Create(row).with_output_types(
      RowTypeConstraint.from_fields([
          ('boolean_field', bool), ('int_field', int), ('long_field', np.int64),
          ('float_field', float), ('bytes_field', bytes), ('string_field', str),
          ('timestamptz_field', Timestamp), ('array_field', Sequence[int]),
          ('map_field', dict[str, int]),
          (
              'struct_field',
              RowTypeConstraint.from_fields([('nested_field', str),
                                             ('nested_field2', int)]))
      ]))
  pipeline:
    transforms:
      - type: Create
        config:
          elements:
            - boolean_field: false
              integer_field: 123
              number_field: 4.56
              string_field: "abc"
              struct_field:
                nested_1: a
                nested_2: 1
              array_field: [1, 2, 3]
          output_schema:
            type: object
            properties:
              boolean_field:
                type: boolean
              integer_field:
                type: integer
              number_field:
                type: number
              string_field:
                type: string
              struct_field:
                type: object
                properties:
                  nested_1:
                    type: string
                  nested_2:
                    type: integer
              array_field:
                type: array
                items:
                  type: integer

Iceberg basics

Catalogs

A catalog is a top-level entity used to manage and access Iceberg tables. There are many catalog implementations out there; this guide focuses on the Hadoop catalog for easy local testing and BigLake REST catalog for cloud-scale development.

Namespaces

A namespace lives inside a catalog and may contain a number of Iceberg tables. This is the equivalent of a “database”.

Tables

The actual entity containing data, and is described by a schema and partition spec.

Snapshots

A new snapshot is created whenever a change is made to an Iceberg table. Each snapshot provides a summary of the change and references its parent snapshot. An Iceberg table’s history is a chronological list of snapshots, enabling features like time travel and ACID-compliant concurrent writes.