95 MINS advanced
10. Data Automation
Module 10: Data Pipelines
ETL Pipelines, Data Validation, and Storage
Raw scraped data is nearly always unusable in its initial form โ it contains inconsistencies, missing values, wrong types, encoding issues, and duplicates. Data automation transforms raw extraction into clean, queryable, reliable datasets. This is the ETL (Extract, Transform, Load) paradigm applied to automation: every pipeline has three stages, and each must be engineered with the same rigor as production software.
๐ญ The ETL Pipeline Architecture
from dataclasses import dataclass, field
from typing import List, Dict, Any, Iterator, Callable, Optional
from datetime import datetime
from pathlib import Path
import json
import sqlite3
from loguru import logger
@dataclass
class PipelineRecord:
raw: Dict[str, Any]
clean: Dict[str, Any] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
is_valid: bool = True
class Pipeline:
def __init__(self, name: str):
self.name = name
self._extractors: List[Callable] = []
self._transformers: List[Callable] = []
self._loaders: List[Callable] = []
self.stats = {'extracted': 0, 'valid': 0, 'errors': 0, 'loaded': 0}
def extract(self, func: Callable) -> Callable:
self._extractors.append(func)
return func
def transform(self, func: Callable) -> Callable:
self._transformers.append(func)
return func
def load(self, func: Callable) -> Callable:
self._loaders.append(func)
return func
def run(self) -> Dict[str, int]:
logger.info(f'Pipeline [{self.name}] starting')
start = datetime.now()
# Extract
raw_records = []
for extractor in self._extractors:
for record in extractor():
raw_records.append(PipelineRecord(raw=record))
self.stats['extracted'] += 1
# Transform
for record in raw_records:
for transformer in self._transformers:
transformer(record)
if record.is_valid:
self.stats['valid'] += 1
else:
self.stats['errors'] += 1
# Load
valid_records = [r for r in raw_records if r.is_valid]
for loader in self._loaders:
loader(valid_records)
self.stats['loaded'] += len(valid_records)
duration = (datetime.now() - start).total_seconds()
logger.success(f'Pipeline done in {duration:.2f}s: {self.stats}')
return self.stats๐งน Data Validation and Transformation
import re
from decimal import Decimal, InvalidOperation
from dateutil import parser as date_parser
class DataValidator:
@staticmethod
def clean_price(raw: str) -> Optional[float]:
if not raw:
return None
cleaned = re.sub(r'[^\d.,]', '', str(raw)).replace(',', '')
try:
return float(cleaned)
except ValueError:
return None
@staticmethod
def clean_date(raw: str) -> Optional[str]:
if not raw:
return None
try:
return date_parser.parse(str(raw)).isoformat()
except (ValueError, OverflowError):
return None
@staticmethod
def clean_url(raw: str, base_url: str = '') -> Optional[str]:
if not raw:
return None
url = str(raw).strip()
if url.startswith('//'):
return f'https:{url}'
if url.startswith('/'):
return f'{base_url.rstrip("/")}{url}'
if not url.startswith(('http://', 'https://')):
return f'https://{url}'
return url
@staticmethod
def normalize_text(raw: str) -> str:
if not raw:
return ''
import unicodedata
normalized = unicodedata.normalize('NFKD', str(raw))
return ' '.join(normalized.split()) # collapse whitespace
pipeline = Pipeline('ecommerce_daily')
@pipeline.extract
def extract_from_db() -> Iterator[Dict]:
with sqlite3.connect('raw_data.db') as conn:
conn.row_factory = sqlite3.Row
for row in conn.execute('SELECT * FROM raw_products WHERE processed = 0'):
yield dict(row)
@pipeline.transform
def validate_and_clean(record: PipelineRecord):
v = DataValidator()
raw = record.raw
name = v.normalize_text(raw.get('name', ''))
if not name or len(name) < 3:
record.errors.append('name_missing')
record.is_valid = False
return
price = v.clean_price(raw.get('price', ''))
if price is None or price < 0:
record.errors.append('price_invalid')
record.is_valid = False
return
record.clean = {
'name': name,
'price': price,
'url': v.clean_url(raw.get('url', '')),
'scraped_date': v.clean_date(raw.get('scraped_at', '')),
'pipeline_run': datetime.now().isoformat(),
}
@pipeline.load
def load_to_database(records: List[PipelineRecord]):
with sqlite3.connect('clean_data.db') as conn:
conn.executemany(
'INSERT OR REPLACE INTO products (name, price, url, scraped_date) VALUES (:name, :price, :url, :scraped_date)',
[r.clean for r in records]
)
pipeline.run()๐ Delta Loading and Change Detection
import hashlib
import json
def compute_record_hash(record: Dict) -> str:
'''Compute a stable hash for change detection'''
# Only hash the content fields, not timestamps
content = {k: v for k, v in sorted(record.items())
if k not in ('scraped_at', 'updated_at', 'pipeline_run')}
return hashlib.md5(json.dumps(content, sort_keys=True).encode()).hexdigest()
class DeltaLoader:
'''Only loads records that are new or have changed since last run'''
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self.conn.execute('CREATE TABLE IF NOT EXISTS record_hashes (id TEXT PRIMARY KEY, hash TEXT, last_seen TEXT)')
def filter_changed(self, records: List[Dict], id_field: str) -> Dict[str, List[Dict]]:
new, updated, unchanged = [], [], []
for record in records:
record_id = str(record[id_field])
current_hash = compute_record_hash(record)
stored = self.conn.execute(
'SELECT hash FROM record_hashes WHERE id = ?', (record_id,)
).fetchone()
if not stored:
new.append(record)
self.conn.execute('INSERT INTO record_hashes VALUES (?, ?, ?)',
(record_id, current_hash, datetime.now().isoformat()))
elif stored[0] != current_hash:
updated.append(record)
self.conn.execute('UPDATE record_hashes SET hash=?, last_seen=? WHERE id=?',
(current_hash, datetime.now().isoformat(), record_id))
else:
unchanged.append(record)
self.conn.commit()
logger.info(f'Delta: {len(new)} new | {len(updated)} updated | {len(unchanged)} unchanged')
return {'new': new, 'updated': updated, 'unchanged': unchanged}Automation Arena: Data Pipeline
Extract (Scraper)
Source: Target Website
0
Items Pulled
Memory Queue (Redis)
Buffer Zone
0
In Queue
Workers & Database
Transform & Load
0
Saved to DB
pipeline_config.py
Max Workers: 10
Python 3
1
2
3
4
5
6
7
8
9
10
11
CLUSTER LOGS
[11:52:25]System ready. Awaiting pipeline execution...
Knowledge Check
Ready to test your understanding of 10. Data Automation?