2. Python for Automation
Python as Your Automation Engine
Python dominates automation because its standard library covers almost every operating system interaction you'll ever need, its syntax reads like pseudocode, and its ecosystem has a specialized library for every automation task imaginable. This module covers the Python patterns that appear in virtually every automation script you will ever write.
We focus on practical, automation-specific patterns โ not general Python tutorials. If you know Python basics, this module upgrades your knowledge specifically for automation work.
๐ File System Mastery
Automation constantly reads, writes, moves, and organizes files. The pathlib module is the modern, object-oriented way to handle all of this:
from pathlib import Path
import shutil
import json
import csv
from datetime import datetime
# pathlib makes paths cross-platform and readable
base_dir = Path('/data/automation')
output_dir = base_dir / 'output' / datetime.now().strftime('%Y-%m-%d')
output_dir.mkdir(parents=True, exist_ok=True) # create all directories
# Find all CSV files recursively
all_csvs = list(base_dir.glob('**/*.csv'))
print(f'Found {len(all_csvs)} CSV files')
# Read and write JSON config files
config_file = base_dir / 'config.json'
config = json.loads(config_file.read_text())
config['last_run'] = datetime.now().isoformat()
config_file.write_text(json.dumps(config, indent=2))
# Batch rename files with a pattern
for i, csv_file in enumerate(all_csvs, start=1):
new_name = output_dir / f'processed_{i:04d}_{csv_file.name}'
shutil.copy2(csv_file, new_name) # copy2 preserves metadata
print(f'Copied: {csv_file.name} โ {new_name.name}')๐ Context Managers and Resource Safety
Automation scripts run unattended. If a file handle leaks or a connection isn't closed, your script will silently corrupt data or run out of resources after hours of operation. Always use context managers:
import csv
from contextlib import contextmanager
import sqlite3
# Reading CSVs safely
with open('data.csv', 'r', encoding='utf-8', newline='') as f:
reader = csv.DictReader(f)
rows = [row for row in reader]
# Writing CSVs safely
fieldnames = ['url', 'title', 'price', 'scraped_at']
with open('output.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
# Custom context manager for database operations
@contextmanager
def get_db(db_path='automation.db'):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # access columns by name
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
with get_db() as db:
db.execute('INSERT INTO results (url, data) VALUES (?, ?)', (url, data))โฐ Scheduling and Process Management
Automation scripts need to run on schedules or in response to system events. The schedule library and subprocess module handle these needs:
import schedule
import time
import subprocess
import os
from loguru import logger
# Configure structured logging
logger.add('automation.log', rotation='10 MB', retention='30 days',
format='{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}')
def run_scraper():
logger.info('Starting scheduled scrape')
try:
result = subprocess.run(
['python', 'scraper.py', '--output', 'data/daily/'],
capture_output=True, text=True, timeout=300
)
if result.returncode == 0:
logger.success(f'Scrape complete. Output: {result.stdout[:200]}')
else:
logger.error(f'Scrape failed: {result.stderr}')
except subprocess.TimeoutExpired:
logger.critical('Scraper timed out after 5 minutes')
except Exception as e:
logger.exception(f'Unexpected error: {e}')
# Schedule the job
schedule.every().day.at('09:00').do(run_scraper)
schedule.every(4).hours.do(run_scraper)
schedule.every().monday.at('06:00').do(run_scraper)
logger.info('Scheduler started โ running indefinitely')
while True:
schedule.run_pending()
time.sleep(60) # check every minuteโก Async Python for High-Performance Automation
When your script needs to make hundreds of HTTP requests or process thousands of files simultaneously, asyncio is essential. Async lets you do I/O-bound work concurrently without the overhead of threads:
import asyncio
import aiohttp
import aiofiles
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
text = await resp.text()
return {'url': url, 'status': resp.status, 'length': len(text)}
except aiohttp.ClientError as e:
return {'url': url, 'error': str(e)}
async def fetch_all(urls: List[str], concurrency: int = 20) -> List[Dict]:
semaphore = asyncio.Semaphore(concurrency) # limit concurrent requests
async def bounded_fetch(session, url):
async with semaphore:
return await fetch_url(session, url)
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [bounded_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if isinstance(r, dict)]
# Run it
urls = ['https://example.com/page/{}'.format(i) for i in range(500)]
results = asyncio.run(fetch_all(urls, concurrency=30))
print(f'Fetched {len(results)} URLs')๐ง Environment Management and Configuration
import os
from dotenv import load_dotenv
from dataclasses import dataclass, field
from typing import Optional
load_dotenv() # loads .env file into environment
@dataclass
class AutomationConfig:
# Required โ will raise if missing
telegram_token: str = field(default_factory=lambda: os.environ['TELEGRAM_BOT_TOKEN'])
# Optional with defaults
request_delay: float = float(os.getenv('REQUEST_DELAY', '1.5'))
max_retries: int = int(os.getenv('MAX_RETRIES', '3'))
output_dir: str = os.getenv('OUTPUT_DIR', './output')
log_level: str = os.getenv('LOG_LEVEL', 'INFO')
proxy_url: Optional[str] = os.getenv('PROXY_URL') # None if not set
config = AutomationConfig()
print(f'Delay: {config.request_delay}s | Retries: {config.max_retries}')Knowledge Check
Ready to test your understanding of 2. Python for Automation?