Delta Lake Essentials: ACID Transactions for Data Lakes
Learn the fundamentals of Delta Lake — ACID transactions, time travel, schema enforcement, and more — for building reliable data lakehouses.
Learn the fundamentals of Delta Lake — ACID transactions, time travel, schema enforcement, and more — for building reliable data lakehouses.
Delta Lake is the foundation of the modern lakehouse architecture. It brings reliability, quality, and governance to data lakes that were previously only available in traditional data warehouses.
This guide covers the core concepts you need to master Delta Lake.
Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and data lakes. It extends Parquet files with a transaction log that tracks all changes to your data.
Traditional data lakes suffer from several problems:
| Problem | Delta Lake Solution | |---------|---------------------| | Failed writes leave partial data | Atomic commit – all or nothing | | Concurrent writes cause corruption | MVCC isolation with optimistic concurrency | | No way to fix mistakes | Time travel to recover previous versions | | Schema drift breaks pipelines | Schema enforcement and evolution | | Updates require full rewrites | Efficient MERGE and UPDATE operations |
Every Delta table consists of two parts:
_delta_log/ directorymy_table/
├── _delta_log/
│ ├── 00000000000000000000.json # Version 0
│ ├── 00000000000000000001.json # Version 1
│ ├── 00000000000000000002.json # Version 2
│ └── ...
├── part-00000-...snappy.parquet
├── part-00001-...snappy.parquet
└── ...
The transaction log records every operation:
# Create empty table with schema
spark.sql("""
CREATE TABLE sales (
sale_id BIGINT,
product_id STRING,
quantity INT,
sale_date DATE,
amount DECIMAL(10, 2)
)
USING DELTA
""")
df = spark.createDataFrame([
(1, "PROD-001", 5, "2024-01-15", 125.50),
(2, "PROD-002", 3, "2024-01-15", 89.99),
], ["sale_id", "product_id", "quantity", "sale_date", "amount"])
# Write as Delta table
df.write.format("delta").mode("overwrite").saveAsTable("sales")
# Or to a path
df.write.format("delta").mode("overwrite").save("/data/sales")
# Convert Parquet to Delta
spark.sql("CONVERT TO DELTA parquet.\`/path/to/parquet/\`")
# Or from a DataFrame
parquet_df = spark.read.parquet("/path/to/parquet/")
parquet_df.write.format("delta").mode("overwrite").save("/path/to/delta/")
Delta Lake validates every write against the table schema. Writes that don't match are rejected.
When your schema needs to change, use mergeSchema:
# Add new column automatically
df_with_discount = spark.createDataFrame([
(3, "PROD-003", 2, "2024-01-16", 45.00, 5.00),
], ["sale_id", "product_id", "quantity", "sale_date", "amount", "discount"])
df_with_discount.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable("sales")
Every write creates a new version. Query any previous version:
# Read version 5
df = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("/data/sales")
# SQL syntax
spark.sql("SELECT * FROM sales VERSION AS OF 5")
# Read data as of yesterday
df = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-14") \
.load("/data/sales")
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/data/sales")
deltaTable.history().show()
# Restore to version 2
deltaTable.restoreToVersion(2)
# Or restore to timestamp
deltaTable.restoreToTimestamp("2024-01-15")
MERGE allows you to update, insert, and delete in a single atomic operation:
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/data/sales")
# Source data with updates and new records
updates_df = spark.createDataFrame([
(1, "PROD-001", 10, "2024-01-15", 250.00), # Update
(4, "PROD-004", 1, "2024-01-17", 199.99), # New
], ["sale_id", "product_id", "quantity", "sale_date", "amount"])
# Perform MERGE
deltaTable.alias("target").merge(
updates_df.alias("source"),
"target.sale_id = source.sale_id"
).whenMatchedUpdate(set={
"quantity": "source.quantity",
"amount": "source.amount"
}).whenNotMatchedInsert(values={
"sale_id": "source.sale_id",
"product_id": "source.product_id",
"quantity": "source.quantity",
"sale_date": "source.sale_date",
"amount": "source.amount"
}).execute()
Compacts small files into larger ones:
OPTIMIZE sales
OPTIMIZE sales WHERE sale_date >= '2024-01-01'
Colocates related data for faster queries:
OPTIMIZE sales ZORDER BY (product_id, sale_date)
Removes old files:
VACUUM sales
VACUUM sales RETAIN 24 HOURS
Try these missions:
Start your Delta Lake journey! ⚔️