1 of 93

Multi-Table Multi-Statement Transaction for Apache Iceberg

Jack Ye - Apache Iceberg PMC Member

Drew Gallardo - Software Engineer, AWS

April 8th, 2025

2 of 93

01

02

03

04

05

06

Motivations

Solution Overview

Architecture Design Deep Dive

Integration with Apache Spark Iceberg Connector

Integration with IRC via Apache Gravitino

Next Steps

Agenda

3 of 93

Motivations

4 of 93

Motivation 1:

Go beyond Commit

5 of 93

What Iceberg Offers

  • All successful commits to a table forms a serializable history. This is ensured by the table format spec.

  • When multiple processes commit to the same table, only 1 wins. This is guarded by the catalog implementation.

  • When a process fails to commit, is tries to rebase and recommit if possible. This is offered by the language specific SDKs.

  • Commit to multiple tables atomically. This is supported when using IRC solutions like Apache Polaris.

For more details, watch the talk Transactions and Isolation in Apache Iceberg by Russell Spitzer

6 of 93

The Next Steps for Lakehouse Transaction Features

  • Multi-Table Transaction:
    • run BEGIN, COMMIT, ROLLBACK TRANSACTION
    • execute any number of DDLs & DMLs inside the transaction block

  • Distributed Transaction:
    • begin a transaction in one process
    • pass the transaction to another process to be committed
    • these processes could be different compute engines, different languages

7 of 93

Motivation 2:

Go beyond Iceberg & Table

8 of 93

Multi-Standard Multi-Object Transaction

Object Types

  • View
  • Materialized View
  • Policy
  • Index
  • Function
  • Model

Open Table Standards

  • Apache Hudi
  • Apache Paimon
  • Delta Lake
  • Lance

Open Object Standards

  • Apache Iceberg View
  • Substrait View
  • Apache Calcite View
  • Apache DataSketch Index
  • OPA Policy
  • Cedar Policy

9 of 93

Motivation 3:

The Mission towards a

Fully Transactional Lakehouse

10 of 93

What exactly is the isolation level of Apache Iceberg?

11 of 93

Iceberg Isolation Level

12 of 93

Iceberg Isolation Level

13 of 93

Database Isolation Levels

14 of 93

SERIALIZABLE isolation guarantees that the result of executing multiple transactions concurrently is the same as if the transactions had been executed one after the other in some order (i.e., serially).

  • Definition of SERIALIZABLE isolation

ChatGPT 4o

15 of 93

SNAPSHOT isolation guarantees that all reads made in a transaction will see a consistent snapshot of the database—as of the time the transaction started—and that no other transactions’ uncommitted changes are visible

  • Definition of SNAPSHOT isolation

ChatGPT 4o

16 of 93

The scope of

database isolation level is

the entire database.

17 of 93

The scope of Iceberg isolation is

a single or multiple Iceberg tables.

18 of 93

We are missing a rigorous transaction definition and implementation for an open lakehouse,

with the scope of the whole lakehouse.

19 of 93

Going back to the original question:

what exactly is the isolation level of Apache Iceberg?

20 of 93

READ COMMITTED SNAPSHOT isolation (RCSI)

  • Acts similar to SNAPSHOT isolation for commit

  • Instead of having a fixed snapshot at transaction start time, the snapshot of each table is decided during query time whenever it is first requested

  • It should be treated more like READ COMMITTED when considering anomaly phenomena.

21 of 93

E.g. Read Skew with RCSI

Even in concurrent single statement transaction!

22 of 93

Iceberg is only “sort of RCSI” because

  • It is only theoretically RCSI with single statement transaction. When naively used in multi-statement transaction, it is READ UNCOMMITTED.

  • The actual isolation level depends on the engine implementation. Usually it is even worse than RCSI because of optimizations like caching

23 of 93

24 of 93

Solution Overview

25 of 93

What are we trying to build here?

26 of 93

Summary of all the requirements

  • Support BEGIN, COMMIT, ROLLBACK TRANSACTION SQL semantics

  • Support distributed transaction semantics

  • Support multi-standard multi-object transaction semantics

  • Support at least SNAPSHOT and SERIALIZABLE isolation levels

27 of 93

Some sort of

transaction management framework

for a catalog to plug in?

28 of 93

Summary of all the features needed

The framework needs to

  • track all the objects in a lakehouse

  • describe to callers the latest state of any object

  • manage all activities against all objects (unless the object is external)

  • decide some activities should succeed and some should fail

29 of 93

30 of 93

Summary of all the features needed

A catalog needs to

  • track all the objects in a lakehouse

  • describe to callers the latest state of any object

  • manage all activities against all objects (unless the object is external)

  • decide some activities should succeed and some should fail

31 of 93

How to build this catalog?

32 of 93

The Easy Way

33 of 93

34 of 93

Amazon S3 PutObject IF-NONE-MATCH

35 of 93

Mutual Exclusion of File Creation

Atomic Rename

  • Linux File System
  • BtrFS
  • Apache HDFS
  • JuiceFS
  • MinIO

PutObject IF-NONE-MATCH

  • Amazon S3
  • Google Cloud Storage
  • Azure Data Lake Storage

Any Key-Value Storage

  • Postgres
  • Redis
  • Apache Cassandra
  • Apache HBase
  • Amazon DynamoDB

36 of 93

Make the catalog storage only!

Iceberg Devlist Discussions:

  • “Deprecate HadoopTableOperations, move to tests in 2.0”
  • “Storing catalog directly on object store”

And it should depend on as few storage primitives as possible

so users can move across different storage providers!

37 of 93

What we want to create is a format

  • A set of files in storage
  • A definition of how these files are related to each other
  • A definition of the contents in these files

38 of 93

We will build a Catalog Format

A set of files in storage that

  • Defines the types of objects in a catalog
  • Defines types of actions for each object
  • Defines the interaction among these objects
  • Outside Iceberg, but integrates back with Iceberg

39 of 93

40 of 93

41 of 93

Architecture Design Deep Dive

42 of 93

Naive Implementation

  • A single file holding all catalog objects

  • Verison-based file naming

  • A commit writes to the next file location with mutual exclusion

  • Complete copy of the catalog on each commit

43 of 93

Improvement 1: Faster Lookup via B-Tree

The b-tree is stored as a file, where each node is stored as a page in the file

44 of 93

Improvement 2: Faster Update via 1 File per Tree Node

Only a subset of the files are rewritten, and these rewrites can mostly happen in parallel

45 of 93

Olympia

Format

Overview

46 of 93

Node Files

  • Apache Arrow IPC format
    • Columnar
    • Memory mapping
    • Zero-copy access
  • Each node is a key-value map
  • Each node contains:
    • System keys
    • A pivot table
    • committed actions (if root)

47 of 93

System Keys

In all node files

  • Number of keys in pivot table
  • Node creation timestamp

Only in root node files

  • Catalog definition
  • Previous root node
  • Rollback from root node
  • Number of actions

48 of 93

Object Keys

  • Each object type has a type ID

  • Object key is prefixed with base64 encoded type ID to facilitate listing the same type of objects (e.g. SHOW TABLES, SHOW VIEWS)

  • Object name is padded with space in the key to ensure lexicographical ordering

  • E.g. key for ns1.table1 looks like “C===ns1 table1

49 of 93

Node File Locations

  • Root node file: stored in /vn folder, with revered 64-bit binary of the version number as the file name for optimal performance on object storage.
    • E.g. /vn/00100110000000000000000000000000 for version 100
    • Learned from Iceberg object storage layout
    • for simplicity, later contents in this presentation uses /vn/<version-number> directly to represent the root node file locations.

  • Non-root node file: stored in /node folder with random UUID name

50 of 93

Object and Action Definition Files

  • Protobuf definitions

  • Describes the basic schema for each object in an Olympia catalog

  • If too large, a pointer is stored as value to the larger protobuf binary payload file (called definition file) on storage

  • Stored in /def folder

51 of 93

Version Files

  • /vn/latest: a best-effort hint to the latest version of the catalog. If not exist, treat version 0 as the hint.

  • /vn/oldest: the guaranteed oldest version of the catalog. If not exist, treat version 0 as the oldest version.

52 of 93

Begin a Transaction

  • Identify latest catalog version
    • Read /vn/latest if exists
    • Starting from that version, continue to see if newer version root node file exists
    • Check it is greater or equal to the version in /vn/oldest

  • Load the latest root node file into memory

.

└── /prefix/vn/

├── lastest (101)

├── oldest (50)

├── 101

├── 100

├── 98

├── 99

└── ...

53 of 93

Why not listing the /vn directory

to find the latest version like Delta Lake or Lance?

  • Minimize the differences across storages
  • Lexicographical ordering for listing is not guaranteed
  • Might actually be faster (marginally)

54 of 93

Find an Object in the B-Tree

E.g. find ns5.table-p

  • Look into the root
  • Do binary search for the key
  • Go to node 3
  • Do binary search for the key
  • ns5.table-p found!

55 of 93

Update an Object in the B-Tree In Memory

E.g. add new table ns5.table-t

  • Find the node to add table-t
  • Split the node (assuming each node can have max 2 keys, i.e. b-tree order is 3)
  • Form pivot table slices and pending changes in memory
  • Propagate parent pointers up to the root

56 of 93

Commit a Transaction

  • Materialize all changes by writing all child nodes in parallel
  • Write the new root node to at the new version location with mutual exclusion

57 of 93

Commit

Concurrent

Transactions

(Storage)

  • Two transactions are committing to the same new version

58 of 93

Commit

Concurrent

Transactions

(Storage)

One of the transactions wins and the other one sees commit failure

59 of 93

Commit

Concurrent

Transactions

(Lakehouse)

  • The failing transaction checks the possibility to recommit
  • If possible (depending on the isolation level), it rebases the whole b-tree against the latest committed root, or the other way around.

60 of 93

Commit

Concurrent

Transactions

(Lakehouse)

  • The failing transaction tries to commit to the next version
  • The commit process repeats at storage layer

61 of 93

Distributed

Transaction

62 of 93

Additional Benefits - Full Catalog Time Travel

63 of 93

Additional Benefits - Portable Catalog

64 of 93

Additional Benefits - Catalog Snapshot Export

65 of 93

Integration with Apache Spark Iceberg Connector

66 of 93

Start a Spark-Iceberg Session

spark-sql --jars iceberg-spark-runtime-3.5_2.12.jar \

--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \

--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \

--conf spark.sql.catalog.demo.type=hadoop \

--conf spark.sql.catalog.demo.warehouse=s3://my-bucket

67 of 93

Start a Spark-Iceberg Session with Olympia

spark-sql --jars iceberg-spark-runtime-3.5_2.12.jar,olympia-spark-runtime-3.5_2.12.jar \

--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,

org.format.olympia.spark.extensions.OlympiaSparkExtensions \

--conf spark.sql.catalog.drew=org.apache.iceberg.spark.SparkCatalog \

--conf spark.sql.catalog.drew.catalog-impl=org.format.olympia.iceberg.OlympiaIcebergCatalog \

--conf spark.sql.catalog.drew.warehouse=s3://my-bucket

68 of 93

Behaviors

  • Users can run multi-table transactions with
    • BEGIN [TRANSACTION]
    • COMMIT [TRANSACTION]
    • ROLLBACK [TRANSACTION]

  • Single table operations are treated as independent transactions (imagine that every operation is wrapped by BEGIN and COMMIT)

69 of 93

Another session by another user

spark-sql --jars iceberg-spark-runtime-3.5_2.12.jar,olympia-spark-runtime-3.5_2.12.jar \

--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,

org.format.olympia.spark.extensions.OlympiaSparkExtensions \

--conf spark.sql.catalog.jack=org.apache.iceberg.spark.SparkCatalog \

--conf spark.sql.catalog.jack.catalog-impl=org.format.olympia.iceberg.OlympiaIcebergCatalog \

--conf spark.sql.catalog.jack.warehouse=s3://my-bucket \

--conf spark.sql.catalog.jack.txn.isolation-level=serializable // default to ‘snapshot’

70 of 93

Example

BEGIN;

INSERT INTO drew.ns1.t1

VALUES (‘a’, ‘bcd’);

COMMIT;

BEGIN;

CREATE TEMP VIEW temp_view AS

SELECT c1 FROM jack.ns1.t1;

UPDATE jack.ns1.t2 SET c2 = ‘v2’

WHERE c1 IN (SELECT * FROM temp_view);

COMMIT;

71 of 93

Example Analysis

  • Consider the initial catalog version before any commit to be v99.
  • After Drew commits, catalog goes to v100 with 1 committed TableInsert(ns1.t1) action.
  • Jack’s commit cannot serially happen on top of v100 because ns1.t1 has changed and should have produced a different update result when updating ns1.t2.
  • However, Jack’s transaction can happen BEFORE Drew’s transaction and it will be a valid serial execution order.
  • Catalog goes to v101 with 3 committed actions TableSelect(ns1.t1), TableUpdate(ns1.t2), TableInsert(ns1.t1), and previous root node is v99 (skipping v100).

72 of 93

Integration with IRC via Apache Gravitino

73 of 93

Using Olympia

to Build a Server

If you are building a server for any communication protocol (e.g. IRC) to integrate with engines, you can use Olympia to fulfill most of the technical catalog interactions, and focus on the business logic.

74 of 93

Example Integration of IRC

via Apache Gravitino

  • Apache Gravitino is a federated metadata lake on top of technical data catalogs to perform data governance and enable open engine access for various data sources.
  • The Apache Gravitino IRC Server follows the IRC specification and acts as an Iceberg REST catalog server.

75 of 93

Olympia

With

Gravitino

IRC

76 of 93

Configure Olympia with Apache Gravitino

gravitino.iceberg-rest.catalog-backend = custom

gravitino.iceberg-rest.catalog-backend-impl = org.format.olympia.iceberg.OlympiaIcebergCatalog

gravitino.iceberg-rest.uri = http://127.0.0.1:9001

gravitino.iceberg-rest.warehouse = s3://my-bucket

77 of 93

Run Olympia with Apache Gravitino

  • Download gravitino-iceberg-rest-server-bin.tar.gz
  • Add olympia-core.jar and olympia-s3.jar to libs
  • Edit configurations in conf/gravitino-iceberg-rest-server.conf
  • Start the server by running ./bin/gravitino-iceberg-rest-server.sh

* Available as Docker image olympiaformat/olympia-gravitino-irc:latest

78 of 93

79 of 93

Start an IRC Session

spark-sql --jars iceberg-spark-runtime-3.5_2.12.jar \

--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \

--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \

--conf spark.sql.catalog.demo.type=rest \

--conf spark.sql.catalog.demo.warehouse=s3://by-bucket \

--conf spark.sql.catalog.demo.uri=https://127.0.0.1:9001

All Olympia logic is already in the Gravitino IRC server, so there is no Olympia dependency on the Spark side.

80 of 93

Problem: this cannot work with the Olympia Spark SQL extensions because of missing IRC features

81 of 93

Solution

  • Use Multi-level namespace to represent transactions

  • Make all transactions distributed to share states across steps within a transaction

  • We can simulate BEGIN, COMMIT, ROLLBACK using namespace operations

.

├── ns1/

│ └── table1

├── ns2/

│ └── table2

└── sys/

└── dtxns/

└── dtxn_1234/

├── ns1/

│ └── table1

└── ns2/

└── table2

82 of 93

Leveraging Multi-Level Namespace

-- initialize a catalog at the location specified in session

CREATE DATABASE sys

-- shows all current distributed transactions

SHOW DATABASES IN sys.dtxns

83 of 93

Leveraging Multi-Level Namespace

-- create a transaction with ID 1234

CREATE DATABASE system.dtxns.dtxn_1234

WITH DBPROPERTIES ('isolation-level'='serializable')

-- list tables in transaction of ID 1234 under namespace ns1

SHOW TABLES IN sys.dtxns.dtxn_1234.ns1;

84 of 93

Leveraging Multi-Level Namespace

-- read data from ns1.t1 within transaction 1234

SELECT * FROM sys.dtxns.dtxn_1234.ns1.t1;

-- write data to ns1.t1 within transaction 1234

INSERT INTO sys.dtxns.dtxn_1234.ns1.t1 VALUES (3, 'ghi');

85 of 93

Leveraging Multi-Level Namespace

-- commit transaction with ID 1234

ALTER DATABASE sys.dtxns.dtxn_1234

SET DBPROPERTIES ('committed' = 'true');

-- rollback transaction with ID 1234

DROP DATABASE sys.dtxns.dtxn_1234

86 of 93

Next Steps

87 of 93

Next Steps - Fundamental

  • Finish a few key features (conflict resolution, delete marker, rollback, etc.)

  • Refine and release the initial Olympia catalog format spec

  • Move from Java to Rust core implementation with different language bindings

  • Integrate with Apache OpenDAL for generic storage access

  • Evaluate performance by tuning b-tree order and improving b-tree design

  • Improve the AuthZ experience and define how DCLs work in a transaction

88 of 93

Next Steps - Integration

  • Support more table formats (Apache Hudi, Apache Paimon, Delta Lake, Lance, etc.)

  • Support more object standards (Substrait view, Apache DataSketch index, Apache Calcite function, Cedar policy, etc.)

  • Support more engine integrations (Trino, Apache Flink, Ray, DuckDB, etc.)

  • Support more communication protocol integrations (HMS, LanceCatalog, MCP, etc.)

89 of 93

Next Steps - Iceberg REST Catalog

  • BeginTransaction, RollbackTransaction REST APIs

  • The ability to track transaction (e.g. pass a transaction handle) across all API calls

90 of 93

To Learn More Details

  • Documentation: https://olympiaformat.org

  • Community (e.g. Slack invite link): https://olympiaformat.org/community

91 of 93

After all…

92 of 93

And…

93 of 93

Thank you!

Jack Ye

jackye@apache.org

linkedin.com/in/yezhaoqin

Drew Gallardo

dru@amazon.com

linkedin.com/in/drew-gallardo