100 MINS advanced
12. Scaling Automation
Module 12: Scaling
Queues, Workers, Redis, and Distributed Automation
A single-threaded Python script can scrape hundreds of pages per hour. A properly architected distributed system can scrape millions. Scaling automation requires moving from procedural scripts to distributed architectures: task queues, worker pools, centralized state, and orchestration. This module bridges the gap between a working script and a production-grade automation system.
๐ด Redis as Automation Infrastructure
import redis
import json
import time
from typing import Any, Optional, List
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
class DistributedURLQueue:
'''Redis-backed URL queue with deduplication and priority support'''
QUEUE_KEY = 'scraper:queue'
SEEN_KEY = 'scraper:seen'
PROCESSING_KEY = 'scraper:processing'
FAILED_KEY = 'scraper:failed'
def __init__(self, redis_client: redis.Redis):
self.r = redis_client
def push(self, url: str, priority: int = 0) -> bool:
'''Add URL to queue. Returns False if already seen.'''
if self.r.sismember(self.SEEN_KEY, url):
return False
self.r.sadd(self.SEEN_KEY, url)
self.r.zadd(self.QUEUE_KEY, {url: priority})
return True
def push_bulk(self, urls: List[str], priority: int = 0) -> int:
'''Bulk add with deduplication โ much faster than individual pushes'''
new_urls = [u for u in urls if not self.r.sismember(self.SEEN_KEY, u)]
if new_urls:
pipeline = self.r.pipeline()
pipeline.sadd(self.SEEN_KEY, *new_urls)
pipeline.zadd(self.QUEUE_KEY, {u: priority for u in new_urls})
pipeline.execute()
return len(new_urls)
def pop(self) -> Optional[str]:
'''Atomic pop from queue โ safe for multiple concurrent workers'''
result = self.r.zpopmax(self.QUEUE_KEY) # highest priority first
if not result:
return None
url = result[0][0]
self.r.setex(f'{self.PROCESSING_KEY}:{url}', 300, '1') # 5min timeout
return url
def complete(self, url: str):
self.r.delete(f'{self.PROCESSING_KEY}:{url}')
def fail(self, url: str, error: str):
self.r.delete(f'{self.PROCESSING_KEY}:{url}')
self.r.hset(self.FAILED_KEY, url, json.dumps({'error': error, 'time': datetime.now().isoformat()}))
def stats(self) -> dict:
return {
'queued': self.r.zcard(self.QUEUE_KEY),
'seen': self.r.scard(self.SEEN_KEY),
'failed': self.r.hlen(self.FAILED_KEY),
}โ๏ธ Celery Task Queue
from celery import Celery
from celery.utils.log import get_task_logger
from typing import Dict
app = Celery(
'automation',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_expires=3600,
worker_prefetch_multiplier=1, # don't prefetch โ fair distribution
task_acks_late=True, # acknowledge after completion, not receipt
task_reject_on_worker_lost=True, # re-queue if worker dies
)
logger = get_task_logger(__name__)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def scrape_product(self, url: str) -> Dict:
try:
result = {'url': url, 'status': 'success', 'data': {}}
# actual scraping logic
return result
except Exception as exc:
logger.warning(f'Scrape failed for {url}: {exc}. Retrying...')
raise self.retry(exc=exc)
@app.task
def process_scrape_results(results: list) -> Dict:
successful = [r for r in results if r and r.get('status') == 'success']
return {'total': len(results), 'successful': len(successful)}
# Dispatch jobs
urls = ['https://example.com/product/{}'.format(i) for i in range(1000)]
# Option 1: Fire and forget
for url in urls:
scrape_product.delay(url)
# Option 2: Chord (parallel map โ reduce)
from celery import chord, group
callback = process_scrape_results.s()
header = group(scrape_product.s(url) for url in urls[:100])
result = chord(header)(callback)
print(result.get(timeout=300))๐ Monitoring and Observability
import time
from functools import wraps
from collections import defaultdict
class AutomationMetrics:
'''Simple in-process metrics for automation monitoring'''
def __init__(self, redis_client):
self.r = redis_client
def increment(self, metric: str, value: int = 1, tags: dict = None):
key = f'metrics:{metric}:{self._today()}'
self.r.incrby(key, value)
self.r.expire(key, 86400 * 7) # 7 day retention
def timing(self, metric: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
duration_ms = (time.time() - start) * 1000
self.r.lpush(f'timing:{metric}', duration_ms)
self.r.ltrim(f'timing:{metric}', 0, 999) # keep last 1000
self.increment(f'{metric}.success')
return result
except Exception as e:
self.increment(f'{metric}.error')
raise
return wrapper
return decorator
def get_stats(self, metric: str) -> dict:
timings = [float(t) for t in self.r.lrange(f'timing:{metric}', 0, -1)]
if not timings:
return {}
timings.sort()
n = len(timings)
return {
'count': n,
'mean': sum(timings) / n,
'p50': timings[int(n * 0.5)],
'p95': timings[int(n * 0.95)],
'p99': timings[int(n * 0.99)],
}
def _today(self) -> str:
return time.strftime('%Y-%m-%d')Automation Arena: Data Pipeline
Extract (Scraper)
Source: Target Website
0
Items Pulled
Memory Queue (Redis)
Buffer Zone
0
In Queue
Workers & Database
Transform & Load
0
Saved to DB
pipeline_config.py
Max Workers: 10
Python 3
1
2
3
4
5
6
7
8
9
10
11
CLUSTER LOGS
[14:48:58]System ready. Awaiting pipeline execution...
Knowledge Check
Ready to test your understanding of 12. Scaling Automation?