Multi-Table Multi-Statement Transaction for Apache Iceberg
Jack Ye - Apache Iceberg PMC Member
Drew Gallardo - Software Engineer, AWS
April 8th, 2025
01
02
03
04
05
06
Motivations
Solution Overview
Architecture Design Deep Dive
Integration with Apache Spark Iceberg Connector
Integration with IRC via Apache Gravitino
Next Steps
Agenda
Motivations
Motivation 1:
Go beyond Commit
What Iceberg Offers
For more details, watch the talk Transactions and Isolation in Apache Iceberg by Russell Spitzer
The Next Steps for Lakehouse Transaction Features
Motivation 2:
Go beyond Iceberg & Table
Multi-Standard Multi-Object Transaction
Object Types
Open Table Standards
Open Object Standards
Motivation 3:
The Mission towards a
Fully Transactional Lakehouse
What exactly is the isolation level of Apache Iceberg?
Iceberg Isolation Level
Source: https://iceberg.apache.org
Iceberg Isolation Level
Database Isolation Levels
SERIALIZABLE isolation guarantees that the result of executing multiple transactions concurrently is the same as if the transactions had been executed one after the other in some order (i.e., serially).
ChatGPT 4o
SNAPSHOT isolation guarantees that all reads made in a transaction will see a consistent snapshot of the database—as of the time the transaction started—and that no other transactions’ uncommitted changes are visible
ChatGPT 4o
The scope of
database isolation level is
the entire database.
The scope of Iceberg isolation is
a single or multiple Iceberg tables.
We are missing a rigorous transaction definition and implementation for an open lakehouse,
with the scope of the whole lakehouse.
Going back to the original question:
what exactly is the isolation level of Apache Iceberg?
READ COMMITTED SNAPSHOT isolation (RCSI)
E.g. Read Skew with RCSI
Even in concurrent single statement transaction!
Iceberg is only “sort of RCSI” because
Solution Overview
What are we trying to build here?
Summary of all the requirements
Some sort of
transaction management framework
for a catalog to plug in?
Summary of all the features needed
The framework needs to
Summary of all the features needed
A catalog needs to
How to build this catalog?
The Easy Way
Amazon S3 PutObject IF-NONE-MATCH
Mutual Exclusion of File Creation
Atomic Rename
PutObject IF-NONE-MATCH
Any Key-Value Storage
Make the catalog storage only!
Iceberg Devlist Discussions:
And it should depend on as few storage primitives as possible
so users can move across different storage providers!
What we want to create is a format
We will build a Catalog Format
A set of files in storage that
Architecture Design Deep Dive
Naive Implementation
Improvement 1: Faster Lookup via B-Tree
The b-tree is stored as a file, where each node is stored as a page in the file
Improvement 2: Faster Update via 1 File per Tree Node
Only a subset of the files are rewritten, and these rewrites can mostly happen in parallel
Olympia
Format
Overview
Node Files
System Keys
In all node files
Only in root node files
Object Keys
Node File Locations
Object and Action Definition Files
Version Files
Begin a Transaction
.
└── /prefix/vn/
├── lastest (101)
├── oldest (50)
├── 101
├── 100
├── 98
├── 99
└── ...
Why not listing the /vn directory
to find the latest version like Delta Lake or Lance?
Find an Object in the B-Tree
E.g. find ns5.table-p
Update an Object in the B-Tree In Memory
E.g. add new table ns5.table-t
Commit a Transaction
Commit
Concurrent
Transactions
(Storage)
Commit
Concurrent
Transactions
(Storage)
One of the transactions wins and the other one sees commit failure
Commit
Concurrent
Transactions
(Lakehouse)
Commit
Concurrent
Transactions
(Lakehouse)
Distributed
Transaction
Additional Benefits - Full Catalog Time Travel
Additional Benefits - Portable Catalog
Additional Benefits - Catalog Snapshot Export
Integration with Apache Spark Iceberg Connector
Start a Spark-Iceberg Session
spark-sql --jars iceberg-spark-runtime-3.5_2.12.jar \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=s3://my-bucket
Start a Spark-Iceberg Session with Olympia
spark-sql --jars iceberg-spark-runtime-3.5_2.12.jar,olympia-spark-runtime-3.5_2.12.jar \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
org.format.olympia.spark.extensions.OlympiaSparkExtensions \
--conf spark.sql.catalog.drew=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.drew.catalog-impl=org.format.olympia.iceberg.OlympiaIcebergCatalog \
--conf spark.sql.catalog.drew.warehouse=s3://my-bucket
Behaviors
Another session by another user
spark-sql --jars iceberg-spark-runtime-3.5_2.12.jar,olympia-spark-runtime-3.5_2.12.jar \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
org.format.olympia.spark.extensions.OlympiaSparkExtensions \
--conf spark.sql.catalog.jack=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.jack.catalog-impl=org.format.olympia.iceberg.OlympiaIcebergCatalog \
--conf spark.sql.catalog.jack.warehouse=s3://my-bucket \
--conf spark.sql.catalog.jack.txn.isolation-level=serializable // default to ‘snapshot’
Example
BEGIN;
INSERT INTO drew.ns1.t1
VALUES (‘a’, ‘bcd’);
COMMIT;
BEGIN;
CREATE TEMP VIEW temp_view AS
SELECT c1 FROM jack.ns1.t1;
UPDATE jack.ns1.t2 SET c2 = ‘v2’
WHERE c1 IN (SELECT * FROM temp_view);
COMMIT;
Example Analysis
Integration with IRC via Apache Gravitino
Using Olympia
to Build a Server
If you are building a server for any communication protocol (e.g. IRC) to integrate with engines, you can use Olympia to fulfill most of the technical catalog interactions, and focus on the business logic.
Example Integration of IRC
via Apache Gravitino
Olympia
With
Gravitino
IRC
Configure Olympia with Apache Gravitino
gravitino.iceberg-rest.catalog-backend = custom
gravitino.iceberg-rest.catalog-backend-impl = org.format.olympia.iceberg.OlympiaIcebergCatalog
gravitino.iceberg-rest.uri = http://127.0.0.1:9001
gravitino.iceberg-rest.warehouse = s3://my-bucket
Run Olympia with Apache Gravitino
* Available as Docker image olympiaformat/olympia-gravitino-irc:latest
Start an IRC Session
spark-sql --jars iceberg-spark-runtime-3.5_2.12.jar \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.demo.type=rest \
--conf spark.sql.catalog.demo.warehouse=s3://by-bucket \
--conf spark.sql.catalog.demo.uri=https://127.0.0.1:9001
All Olympia logic is already in the Gravitino IRC server, so there is no Olympia dependency on the Spark side.
Problem: this cannot work with the Olympia Spark SQL extensions because of missing IRC features
Solution
.
├── ns1/
│ └── table1
├── ns2/
│ └── table2
└── sys/
└── dtxns/
└── dtxn_1234/
├── ns1/
│ └── table1
└── ns2/
└── table2
Leveraging Multi-Level Namespace
-- initialize a catalog at the location specified in session
CREATE DATABASE sys
-- shows all current distributed transactions
SHOW DATABASES IN sys.dtxns
Leveraging Multi-Level Namespace
-- create a transaction with ID 1234
CREATE DATABASE system.dtxns.dtxn_1234
WITH DBPROPERTIES ('isolation-level'='serializable')
-- list tables in transaction of ID 1234 under namespace ns1
SHOW TABLES IN sys.dtxns.dtxn_1234.ns1;
Leveraging Multi-Level Namespace
-- read data from ns1.t1 within transaction 1234
SELECT * FROM sys.dtxns.dtxn_1234.ns1.t1;
-- write data to ns1.t1 within transaction 1234
INSERT INTO sys.dtxns.dtxn_1234.ns1.t1 VALUES (3, 'ghi');
Leveraging Multi-Level Namespace
-- commit transaction with ID 1234
ALTER DATABASE sys.dtxns.dtxn_1234
SET DBPROPERTIES ('committed' = 'true');
-- rollback transaction with ID 1234
DROP DATABASE sys.dtxns.dtxn_1234
Next Steps
Next Steps - Fundamental
Next Steps - Integration
Next Steps - Iceberg REST Catalog
To Learn More Details
After all…
And…
Thank you!
Jack Ye
jackye@apache.org
linkedin.com/in/yezhaoqin
Drew Gallardo
dru@amazon.com
linkedin.com/in/drew-gallardo