Apache Iceberg I/O connector
The Beam SDKs include built-in transforms that can read data from and write data to Apache Iceberg tables.
- SQL Shell
- Java SDK
- Python SDK
- Yaml API
To use IcebergIO, add the Maven artifact dependency to your pom.xml file.
To use IcebergIO, install the Beam SQL Shell and run the following command:
./beam-sql.sh --io iceberg
To use IcebergIO with Beam YAML, install the yaml extra:
If you’re new to Iceberg, check out the basics section under the guide.
Additional resources:
Quickstart with Public Datasets
We can jump straight into reading some high-quality public datasets served via Iceberg’s REST Catalog. These datasets are hosted on Google Cloud’s BigLake and are available to read by anyone, making it a good resource to experiment with.
There are some prerequisites to using the BigLake Catalog:
- A Google Cloud Project (for authentication). Create an account here if you don’t have one.
- Standard Google Application Default Credentials (ADC) set up in your environment.
- A Google Cloud Storage bucket
When you’ve met those prerequisites, start by setting up your catalog:
CREATE CATALOG my_catalog TYPE 'iceberg'
PROPERTIES (
'type' = 'rest',
'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse' = 'gs://biglake-public-nyc-taxi-iceberg',
'header.x-goog-user-project' = '$PROJECT_ID',
'rest.auth.type' = 'google',
'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'header.X-Iceberg-Access-Delegation' = 'vended-credentials'
);
Map<String, String> catalogProps =
ImmutableMap.of(
"type", "rest",
"uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"warehouse", "gs://biglake-public-nyc-taxi-iceberg",
"header.x-goog-user-project", PROJECT_ID,
"rest.auth.type", "google",
"io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
"header.X-Iceberg-Access-Delegation", "vended-credentials");
biglake_catalog_props = {
'type': 'rest',
'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse': 'gs://biglake-public-nyc-taxi-iceberg',
'header.x-goog-user-project': PROJECT_ID,
'rest.auth.type': 'google',
'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'header.X-Iceberg-Access-Delegation': 'vended-credentials'
}catalog_props: &catalog_props
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog"
warehouse: "gs://biglake-public-nyc-taxi-iceberg"
header.x-goog-user-project: *PROJECT_ID
rest.auth.type: "google"
io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
header.X-Iceberg-Access-Delegation: "vended-credentials"Now simply query the public dataset:
Pipeline p = Pipeline.create();
// Set up query properties:
Map<String, Object> config =
ImmutableMap.of(
"table",
"public_data.nyc_taxicab",
"catalog_properties",
catalogProps,
"filter",
"data_file_year = 2021 AND tip_amount > 100",
"keep",
Arrays.asList("passenger_count", "total_amount", "trip_distance"));
// Read Iceberg records
PCollection<Row> icebergRows =
p.apply(Managed.read("iceberg").withConfig(config)).getSinglePCollection();
// Perform further analysis on records
PCollection<Row> result =
icebergRows
.apply(AddFields.<Row>create().field("num_trips", Schema.FieldType.INT32, 1))
.apply(
Group.<Row>byFieldNames("passenger_count")
.aggregateField("num_trips", Sum.ofIntegers(), "num_trips")
.aggregateField("total_amount", Mean.of(), "avg_fare")
.aggregateField("trip_distance", Mean.of(), "avg_distance"));
// Print to console
result.apply(
MapElements.into(TypeDescriptors.voids())
.via(
row -> {
System.out.println(row);
return null;
}));
// Execute
p.run().waitUntilFinish();
from statistics import mean
config = {
"table": "public_data.nyc_taxicab",
"catalog_properties": biglake_catalog_props,
"filter": "data_file_year = 2021 AND tip_amount > 100",
"keep": ["passenger_count", "total_amount", "trip_distance"]
}
with beam.Pipeline() as p:
rows = p | beam.managed.Read("iceberg", config=config)
result = (
rows | beam.Select(num_trips=lambda x: 1, *rows.element_type._fields)
| beam.GroupBy('passenger_count').aggregate_field(
'num_trips', sum, 'total_trips').aggregate_field(
'total_amount', mean, 'avg_fare').aggregate_field(
'trip_distance', mean, 'avg_distance'))
result | beam.Map(print) pipeline:
type: chain
transforms:
- type: ReadFromIceberg
config:
table: "public_data.nyc_taxicab"
catalog_properties: *biglake_catalog_props
filter: "data_file_year = 2021 AND tip_amount > 100"
keep: [ "passenger_count", "total_amount", "trip_distance" ]
- type: Sql
config:
query: "SELECT
passenger_count,
COUNT(1) AS num_trips,
ROUND(AVG(total_amount), 2) AS avg_fare,
ROUND(AVG(trip_distance), 2) AS avg_distance
FROM
PCOLLECTION
GROUP BY
passenger_count"
User Guide
Choose Your Catalog
First, select a Catalog implementation to handle metadata management and storage interaction. Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two common paths: Hadoop for easy local development and BigLake for managing production data at cloud scale.
Use Hadoop Catalog for quick, local testing with zero setup and no external dependencies. The following examples use a temporary local directory.
Use BigLake Catalog for a fully managed REST-based experience. It simplifies access to cloud storage with built-in credential delegation and unified metadata management. It requires a few pre-requisites:
- A Google Cloud Project (for authentication). Create an account here if you don’t have one.
- Standard Google Application Default Credentials (ADC) set up in your environment.
- A Google Cloud Storage bucket
CREATE CATALOG my_catalog TYPE 'iceberg'
PROPERTIES (
'type' = 'rest',
'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse' = 'gs://$BUCKET_NAME',
'header.x-goog-user-project' = '$PROJECT_ID',
'rest.auth.type' = 'google',
'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'header.X-Iceberg-Access-Delegation' = 'vended-credentials'
);
Map<String, String> catalogProps =
ImmutableMap.of(
"type", "rest",
"uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"warehouse", "gs://" + BUCKET_NAME,
"header.x-goog-user-project", PROJECT_ID,
"rest.auth.type", "google",
"io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
"header.X-Iceberg-Access-Delegation", "vended-credentials");
biglake_catalog_config = {
'type': 'rest',
'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse': 'gs://' + BUCKET_NAME,
'header.x-goog-user-project': PROJECT_ID,
'rest.auth.type': 'google',
'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'header.X-Iceberg-Access-Delegation': 'vended-credentials'
}
catalog_props: &catalog_props
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog"
warehouse: "gs://" + *BUCKET_NAME
header.x-goog-user-project: *PROJECT_ID
rest.auth.type: "google"
io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
header.X-Iceberg-Access-Delegation: "vended-credentials"
Create a Namespace
If you’re on Beam SQL, you can explicitly create a new namespace:
CREATE DATABASE my_catalog.my_db;
Alternatively, the IcebergIO sink will automatically create missing namespaces at runtime. This is ideal for dynamic pipelines where destinations are determined by the incoming data
Create a Table
Tables are defined by a schema and an optional partition spec. You can create a table using SQL DDL or by configuring the Iceberg destination in your Beam pipeline.
Insert Data
Once your table is defined, you can write data using standard SQL INSERT or by calling the IcebergIO sink in your SDK of choice.
Schema inputSchema =
Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();
Pipeline p = Pipeline.create();
p.apply(
Create.of(
Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(),
Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(),
Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build()))
.apply(Managed.write("iceberg").withConfig(managedConfig));
p.run();View Namespaces and Tables
If you’re on Beam SQL, you can view the newly created resources:
SHOW DATABASES my_catalog;
SHOW TABLES my_catalog.my_db;
Query Data
Further steps
Check out the full IcebergIO configuration to make use of other features like applying a partition spec, table properties, row filtering, column pruning, etc.
Data Types
Check this overview of Iceberg data types.
IcebergIO leverages Beam Schemas to bridge the gap between SDK-native types and the Iceberg specification. While the Java SDK provides full coverage for the Iceberg v2 spec (with v3 support currently in development), other SDKs may have specific constraints on complex or experimental types. The following examples demonstrate the standard mapping for core data types across SQL, Java, Python, and YAML:
INSERT INTO catalog.namespace.table VALUES (
9223372036854775807, -- BIGINT
2147483647, -- INTEGER
1.0, -- FLOAT
1.0, -- DOUBLE
TRUE, -- BOOLEAN
TIMESTAMP '2018-05-28 20:17:40.123', -- TIMESTAMP
'varchar', -- VARCHAR
'char', -- CHAR
ARRAY['abc', 'xyz'], -- ARRAY
ARRAY[CAST(ROW('abc', 123) AS ROW(nested_str VARCHAR, nested_int INTEGER))] -- ARRAY[STRUCT]
)
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
public class IcebergBeamSchemaAndRow {
public Row createRow() {
Schema nestedSchema =
Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build();
Schema beamSchema =
Schema.builder()
.addBooleanField("boolean_field")
.addInt32Field("int_field")
.addInt64Field("long_field")
.addFloatField("float_field")
.addDoubleField("double_field")
.addDecimalField("numeric_field")
.addByteArrayField("bytes_field")
.addStringField("string_field")
.addLogicalTypeField("time_field", SqlTypes.TIME)
.addLogicalTypeField("date_field", SqlTypes.DATE)
.addLogicalTypeField("timestamp_field", Timestamp.MICROS)
.addDateTimeField("timestamptz_field")
.addArrayField("array_field", Schema.FieldType.INT32)
.addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32)
.addRowField("struct_field", nestedSchema)
.build();
Row beamRow =
Row.withSchema(beamSchema)
.withFieldValues(
ImmutableMap.<String, Object>builder()
.put("boolean_field", true)
.put("int_field", 1)
.put("long_field", 2L)
.put("float_field", 3.4f)
.put("double_field", 4.5d)
.put("numeric_field", new BigDecimal(67))
.put("bytes_field", new byte[] {1, 2, 3})
.put("string_field", "value")
.put("time_field", LocalTime.now())
.put("date_field", LocalDate.now())
.put("timestamp_field", Instant.now())
.put("timestamptz_field", DateTime.now())
.put("array_field", Arrays.asList(1, 2, 3))
.put("map_field", ImmutableMap.of("a", 1, "b", 2))
.put(
"struct_field",
Row.withSchema(nestedSchema).addValues("nested_value", 123).build())
.build())
.build();
return beamRow;
}
}
import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp
row = beam.Row(
boolean_field=True,
int_field=1,
long_field=2,
float_field=3.45,
bytes_field=b'value',
string_field="value",
timestamptz_field=Timestamp(4, 5),
array_field=[1, 2, 3],
map_field={
"a": 1, "b": 2
},
struct_field=beam.Row(nested_field="nested_value", nested_field2=123))
import numpy as np
from apache_beam.typehints.row_type import RowTypeConstraint
from typing import Sequence
# Override data schema by adding `with_output_types` to the transform:
beam.Create(row).with_output_types(
RowTypeConstraint.from_fields([
('boolean_field', bool), ('int_field', int), ('long_field', np.int64),
('float_field', float), ('bytes_field', bytes), ('string_field', str),
('timestamptz_field', Timestamp), ('array_field', Sequence[int]),
('map_field', dict[str, int]),
(
'struct_field',
RowTypeConstraint.from_fields([('nested_field', str),
('nested_field2', int)]))
])) pipeline:
transforms:
- type: Create
config:
elements:
- boolean_field: false
integer_field: 123
number_field: 4.56
string_field: "abc"
struct_field:
nested_1: a
nested_2: 1
array_field: [1, 2, 3]
output_schema:
type: object
properties:
boolean_field:
type: boolean
integer_field:
type: integer
number_field:
type: number
string_field:
type: string
struct_field:
type: object
properties:
nested_1:
type: string
nested_2:
type: integer
array_field:
type: array
items:
type: integerIceberg basics
Catalogs
A catalog is a top-level entity used to manage and access Iceberg tables. There are many catalog implementations out there; this guide focuses on the Hadoop catalog for easy local testing and BigLake REST catalog for cloud-scale development.
Namespaces
A namespace lives inside a catalog and may contain a number of Iceberg tables. This is the equivalent of a “database”.
Tables
The actual entity containing data, and is described by a schema and partition spec.
Snapshots
A new snapshot is created whenever a change is made to an Iceberg table. Each snapshot provides a summary of the change and references its parent snapshot. An Iceberg table’s history is a chronological list of snapshots, enabling features like time travel and ACID-compliant concurrent writes.
Last updated on 2026/03/30
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!

