90 MINS advanced
13. Big Data Fundamentals
Module 13: Big Data
Spark, Distributed Computing, and Data Lakes
When data exceeds what a single machine can process, you need distributed computing. Apache Spark is the dominant framework for big data processing — it distributes computation across hundreds of machines, processes data in memory rather than disk, and provides both batch and streaming capabilities. This module teaches Spark from a data science perspective: when you need it, how to use it efficiently, and how modern cloud data lakehouses work.
⚡ PySpark Fundamentals
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
# Initialize Spark — configure for your cluster size
spark = SparkSession.builder \
.appName('DataSciencePipeline') \
.config('spark.sql.adaptive.enabled', 'true') \
.config('spark.sql.adaptive.coalescePartitions.enabled', 'true') \
.config('spark.executor.memory', '8g') \
.config('spark.executor.cores', '4') \
.config('spark.default.parallelism', '200') \
.getOrCreate()
# Define schema explicitly — avoid schema inference on large datasets
schema = StructType([
StructField('transaction_id', StringType(), nullable=False),
StructField('customer_id', StringType(), nullable=True),
StructField('amount', DoubleType(), nullable=True),
StructField('transaction_date', DateType(), nullable=True),
StructField('region', StringType(), nullable=True),
])
# Read partitioned data from cloud storage
df = spark.read \
.schema(schema) \
.option('header', 'true') \
.parquet('s3://data-lake/transactions/year=2024/')
print(f'Rows: {df.count():,}') # triggers distributed count
df.printSchema()
df.show(5, truncate=False)
# Spark transformations are LAZY — no computation until an action is triggered
# This allows Spark to optimize the execution plan before running
revenue_by_region = (
df
.filter(F.col('amount') > 0) # filter null/negative amounts
.withColumn('year', F.year('transaction_date'))
.withColumn('month', F.month('transaction_date'))
.groupBy('region', 'year', 'month')
.agg(
F.sum('amount').alias('total_revenue'),
F.count('transaction_id').alias('transaction_count'),
F.mean('amount').alias('avg_amount'),
F.countDistinct('customer_id').alias('unique_customers')
)
.orderBy('year', 'month', F.desc('total_revenue'))
)
# Action: triggers actual computation across the cluster
revenue_by_region.show(20)
revenue_by_region.count() # another action🪟 Window Functions and Advanced Analytics
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Window functions for rolling metrics and rankings
window_30d = Window.partitionBy('customer_id').orderBy('transaction_date').rowsBetween(-30, 0)
window_customer = Window.partitionBy('customer_id').orderBy('transaction_date')
window_rank = Window.partitionBy('region', 'month').orderBy(F.desc('total_revenue'))
df_enriched = df \
.withColumn('rolling_30d_revenue', F.sum('amount').over(window_30d)) \
.withColumn('rolling_30d_transactions', F.count('transaction_id').over(window_30d)) \
.withColumn('cumulative_revenue', F.sum('amount').over(window_customer)) \
.withColumn('transaction_number', F.row_number().over(window_customer)) \
.withColumn('days_since_first',
F.datediff('transaction_date', F.first('transaction_date').over(window_customer))
) \
.withColumn('lag_1_amount', F.lag('amount', 1).over(window_customer)) \
.withColumn('amount_vs_previous', (F.col('amount') - F.col('lag_1_amount')) / F.col('lag_1_amount'))
# Efficient joins — broadcast small tables to avoid shuffle
small_lookup = spark.createDataFrame([
('North', 'AMER-N'), ('South', 'AMER-S'), ('East', 'APAC-E'), ('West', 'AMER-W')
], ['region', 'geo_code'])
df_joined = df.join(
F.broadcast(small_lookup), # broadcast hint — send small table to all executors
on='region',
how='left'
)🏞️ Data Lake Architecture
from delta import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Delta Lake: ACID transactions on data lakes
# Initialize Spark with Delta support
spark = SparkSession.builder \
.config('spark.jars.packages', 'io.delta:delta-core_2.12:2.4.0') \
.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
.getOrCreate()
# Write with Delta format
new_data = spark.read.parquet('s3://staging/daily_transactions/')
new_data.write \
.format('delta') \
.mode('append') \
.partitionBy('year', 'month') \
.save('s3://data-lake/transactions-delta/')
# MERGE operation (Upsert) — handle updates without full rewrite
delta_table = DeltaTable.forPath(spark, 's3://data-lake/transactions-delta/')
delta_table.alias('target').merge(
new_data.alias('source'),
'target.transaction_id = source.transaction_id'
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Time travel — query historical versions
df_yesterday = spark.read.format('delta') \
.option('versionAsOf', 1) \
.load('s3://data-lake/transactions-delta/')
df_last_week = spark.read.format('delta') \
.option('timestampAsOf', '2024-01-08') \
.load('s3://data-lake/transactions-delta/')
print('Current rows:', delta_table.toDF().count())
print('Yesterday rows:', df_yesterday.count())
# Schema evolution — add new columns without breaking existing pipelines
spark.sql('ALTER TABLE delta.`s3://data-lake/transactions-delta/` ADD COLUMNS (channel STRING)')Data Science: Big Data Pipeline
S3 Data Lake
Raw Unstructured JSON
0.0 GB
Read Volume
Spark Cluster
4 Executor Nodes Active
0.0 GB
Processing Backlog
Filtered Out (Bad Rows):0.0 GB
Data Warehouse
Structured Parquet
0.0 GB
Clean Data Written
pyspark_job.py
PySpark 3.5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SPARK CONSOLE
[15:48:16]SparkSession v3.5.0 initialized.
[15:48:16]Awaiting PySpark script execution...
Knowledge Check
Ready to test your understanding of 13. Big Data Fundamentals?