100 MINS advanced
12. Data Pipelines & Engineering
Module 12: Pipelines
ETL, Orchestration, and Production Data Systems
A data science model is only as reliable as the pipeline feeding it. Production data engineering transforms one-off scripts into monitored, scheduled, self-healing data systems. This module covers the design and implementation of production ETL pipelines — from extraction and transformation to orchestrated scheduling, data quality monitoring, and lineage tracking.
🏭 ETL Pipeline Design Patterns
from dataclasses import dataclass, field
from typing import List, Dict, Callable, Any, Optional
from datetime import datetime
import pandas as pd
from loguru import logger
import time
@dataclass
class PipelineRun:
pipeline_name: str
run_id: str
started_at: datetime = field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
status: str = 'running'
rows_extracted: int = 0
rows_loaded: int = 0
errors: List[str] = field(default_factory=list)
metrics: Dict = field(default_factory=dict)
class ETLPipeline:
def __init__(self, name: str):
self.name = name
self._extractors: List[Callable] = []
self._transformers: List[Callable] = []
self._loaders: List[Callable] = []
self._hooks: Dict[str, List[Callable]] = {'on_success': [], 'on_failure': [], 'on_complete': []}
def extract(self, func: Callable) -> Callable:
self._extractors.append(func); return func
def transform(self, func: Callable) -> Callable:
self._transformers.append(func); return func
def load(self, func: Callable) -> Callable:
self._loaders.append(func); return func
def on_success(self, func: Callable) -> Callable:
self._hooks['on_success'].append(func); return func
def run(self, **kwargs) -> PipelineRun:
import uuid
run = PipelineRun(pipeline_name=self.name, run_id=str(uuid.uuid4())[:8])
logger.info(f'Pipeline [{self.name}] starting | run_id={run.run_id}')
try:
# EXTRACT
raw_data = {}
for extractor in self._extractors:
result = extractor(**kwargs)
raw_data.update(result if isinstance(result, dict) else {'data': result})
run.rows_extracted += len(raw_data.get('data', []))
logger.info(f'Extracted {run.rows_extracted} records')
# TRANSFORM
for transformer in self._transformers:
raw_data = transformer(raw_data)
# LOAD
for loader in self._loaders:
loaded_count = loader(raw_data)
run.rows_loaded += loaded_count or 0
run.status = 'success'
run.completed_at = datetime.now()
logger.success(f'Pipeline complete: {run.rows_loaded} rows loaded')
for hook in self._hooks['on_success']:
hook(run)
except Exception as e:
run.status = 'failed'
run.completed_at = datetime.now()
run.errors.append(str(e))
logger.exception(f'Pipeline failed: {e}')
raise
return run📅 Orchestration with Prefect
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.results import LocalFileSystemResultStorage
from datetime import timedelta
import pandas as pd
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
tags=['extraction', 'critical']
)
def extract_sales_data(start_date: str, end_date: str) -> pd.DataFrame:
'''Extract sales data from DWH for the given date range'''
import sqlalchemy
engine = sqlalchemy.create_engine('postgresql://user:pass@host/db')
df = pd.read_sql(
f"SELECT * FROM sales WHERE date BETWEEN '{start_date}' AND '{end_date}'",
engine
)
print(f'Extracted {len(df)} rows from {start_date} to {end_date}')
return df
@task(retries=2, retry_delay_seconds=30)
def transform_sales(df: pd.DataFrame) -> pd.DataFrame:
df['revenue'] = df['quantity'] * df['unit_price']
df['date'] = pd.to_datetime(df['date'])
df['week'] = df['date'].dt.isocalendar().week
df = df.dropna(subset=['customer_id', 'revenue'])
df = df[df['revenue'] > 0]
return df
@task(retries=3)
def load_to_warehouse(df: pd.DataFrame, table_name: str) -> int:
engine = sqlalchemy.create_engine('postgresql://user:pass@warehouse/analytics')
df.to_sql(table_name, engine, if_exists='append', index=False, method='multi', chunksize=5000)
print(f'Loaded {len(df)} rows to {table_name}')
return len(df)
@task
def send_pipeline_alert(success: bool, rows_loaded: int, run_date: str):
status = '✅ SUCCESS' if success else '❌ FAILURE'
message = f'{status}\nPipeline: Sales ETL\nDate: {run_date}\nRows loaded: {rows_loaded:,}'
# send to Slack/email
print(f'Alert sent: {message}')
@flow(name='Sales ETL Pipeline', retries=1, retry_delay_seconds=300)
def sales_etl_flow(start_date: str, end_date: str):
# Prefect handles retry, logging, and monitoring automatically
raw_df = extract_sales_data(start_date, end_date)
clean_df = transform_sales(raw_df)
rows_loaded = load_to_warehouse(clean_df, 'fact_sales')
send_pipeline_alert(True, rows_loaded, end_date)
return rows_loaded
# Deploy and schedule
if __name__ == '__main__':
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=sales_etl_flow,
name='daily-sales-etl',
schedule=CronSchedule(cron='0 6 * * *', timezone='Africa/Accra'), # 6am daily
tags=['production', 'sales']
)
deployment.apply()🔍 Data Quality Monitoring
import pandas as pd
import numpy as np
from great_expectations.dataset import PandasDataset
def build_quality_suite(df: pd.DataFrame) -> dict:
'''Build Great Expectations data quality checks programmatically'''
gx_df = PandasDataset(df)
results = {}
# Schema checks
results['no_null_ids'] = gx_df.expect_column_values_to_not_be_null('customer_id')
results['positive_revenue'] = gx_df.expect_column_values_to_be_between('revenue', min_value=0)
results['valid_regions'] = gx_df.expect_column_values_to_be_in_set(
'region', ['North', 'South', 'East', 'West', 'Central']
)
# Statistical checks — alert on distribution drift
results['revenue_mean_range'] = gx_df.expect_column_mean_to_be_between(
'revenue', min_value=800, max_value=2500
)
results['row_count_range'] = gx_df.expect_table_row_count_to_be_between(
min_value=1000, max_value=50000
)
passed = sum(1 for r in results.values() if r.success)
total = len(results)
print(f'Quality checks: {passed}/{total} passed')
failed_checks = {k: v for k, v in results.items() if not v.success}
if failed_checks:
print('FAILED checks:', list(failed_checks.keys()))
raise ValueError(f'{len(failed_checks)} quality checks failed — halting load')
return resultsData Science: Big Data Pipeline
S3 Data Lake
Raw Unstructured JSON
0.0 GB
Read Volume
Spark Cluster
4 Executor Nodes Active
0.0 GB
Processing Backlog
Filtered Out (Bad Rows):0.0 GB
Data Warehouse
Structured Parquet
0.0 GB
Clean Data Written
pyspark_job.py
PySpark 3.5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SPARK CONSOLE
[14:24:17]SparkSession v3.5.0 initialized.
[14:24:17]Awaiting PySpark script execution...
Knowledge Check
Ready to test your understanding of 12. Data Pipelines & Engineering?