PySpark Performance: 10 Optimization Tips for Data Engineers
Master PySpark performance optimization with these essential tips covering partitioning, caching, joins, and avoiding common pitfalls.
Master PySpark performance optimization with these essential tips covering partitioning, caching, joins, and avoiding common pitfalls.
Writing PySpark code that works is one thing. Writing PySpark code that runs fast at scale is another. This guide covers the optimization techniques that separate junior data engineers from senior ones.
Spark transformations are lazy — they don't execute until an action is called.
# All these transformations are lazy
df = spark.read.parquet("/data/events")
df_filtered = df.filter(col("event_type") == "purchase")
df_selected = df_filtered.select("user_id", "amount")
# NOW everything executes
df_selected.show()
| Action | Description |
|--------|-------------|
| show() | Display rows |
| collect() | Return all data to driver |
| count() | Count rows |
| write() | Write to storage |
collect() on Large DataFramescollect() brings all data to the driver node. This causes OOM errors.
# BAD
all_data = df.collect()
# GOOD - use limit
sample = df.limit(100).collect()
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "customer_id")
result.explain()
# Look for BroadcastHashJoin
# SLOW: Python UDF
@udf(StringType())
def upper_udf(s):
return s.upper()
# FAST: Built-in function
from pyspark.sql.functions import upper
df.select(upper(col("name")))
Cache only when:
df_processed.cache()
df_processed.count() # Force materialization
Rules of thumb:
# Repartition (full shuffle)
df = df.repartition(100)
# Coalesce (minimize shuffle)
df = df.coalesce(10)
# GOOD: Filter first
df_filtered = df.filter(col("date") >= "2024-01-01")
df_joined = df_filtered.join(other_df, "id")
# BAD: Join then filter
df_joined = df.join(other_df, "id")
df_filtered = df_joined.filter(col("date") >= "2024-01-01")
Enable Adaptive Query Execution:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Only reads needed columns and partitions
df = spark.read.parquet("/data/events") \
.select("user_id", "event_type") \
.filter(col("date") == "2024-01-15")
Key things to check:
Level up your PySpark skills! ⚔️