85 MINS intermediate
9. API Automation
Module 09: API Automation
REST APIs, Authentication, and Workflow Chaining
API automation is the highest-quality form of data collection and interaction โ you're working with data structured exactly for consumption, without parsing HTML. APIs are also the backbone of integrations: connecting services that don't natively talk to each other, automating workflows across platforms, and building data pipelines that span multiple providers.
๐ Authentication Patterns
import requests
import time
from typing import Dict, Optional
from functools import wraps
class APIClient:
'''Base API client with automatic authentication and token refresh'''
def __init__(self, base_url: str, client_id: str, client_secret: str):
self.base_url = base_url.rstrip('/')
self.client_id = client_id
self.client_secret = client_secret
self._token: Optional[str] = None
self._token_expires_at: float = 0
self.session = requests.Session()
def _get_token(self) -> str:
'''Fetch or refresh OAuth2 Bearer token'''
if self._token and time.time() < self._token_expires_at - 60:
return self._token # still valid (with 60s buffer)
response = self.session.post(
f'{self.base_url}/oauth/token',
data={
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
}
)
response.raise_for_status()
data = response.json()
self._token = data['access_token']
self._token_expires_at = time.time() + data['expires_in']
return self._token
def _headers(self) -> Dict[str, str]:
return {
'Authorization': f'Bearer {self._get_token()}',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
def get(self, endpoint: str, params: Dict = None) -> Dict:
response = self.session.get(
f'{self.base_url}/{endpoint.lstrip("/")}',
headers=self._headers(),
params=params
)
response.raise_for_status()
return response.json()
def post(self, endpoint: str, data: Dict) -> Dict:
response = self.session.post(
f'{self.base_url}/{endpoint.lstrip("/")}',
headers=self._headers(),
json=data
)
response.raise_for_status()
return response.json()
# API Key authentication
class ApiKeyClient:
def __init__(self, base_url: str, api_key: str, key_header: str = 'X-API-Key'):
self.session = requests.Session()
self.session.headers.update({key_header: api_key, 'Content-Type': 'application/json'})
self.base_url = base_url
def get(self, path: str, **kwargs):
return self.session.get(f'{self.base_url}{path}', **kwargs).json()๐ Pagination and Rate Limit Handling
import time
from typing import Iterator, Dict, List
from loguru import logger
def paginate_api(client: APIClient, endpoint: str, params: Dict = None) -> Iterator[Dict]:
'''Handle cursor-based, page-based, and offset-based pagination automatically'''
params = params or {}
page = 1
while True:
params['page'] = page
params['per_page'] = 100
response = client.get(endpoint, params=params)
items = response.get('data', response.get('items', response.get('results', [])))
if not items:
break
for item in items:
yield item
# Check for next page
pagination = response.get('pagination', response.get('meta', {}))
if not pagination.get('has_next', False) and not pagination.get('next_cursor'):
break
if 'next_cursor' in pagination:
params['cursor'] = pagination['next_cursor']
else:
page += 1
class RateLimitHandler:
'''Handles 429 Too Many Requests with intelligent backoff'''
@staticmethod
def make_request_with_backoff(func, *args, max_retries=5, **kwargs):
for attempt in range(max_retries):
try:
response = func(*args, **kwargs)
return response
except requests.HTTPError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get('Retry-After', 60))
logger.warning(f'Rate limited. Waiting {retry_after}s (attempt {attempt+1}/{max_retries})')
time.sleep(retry_after)
elif e.response.status_code >= 500:
wait = 2 ** attempt
logger.warning(f'Server error {e.response.status_code}. Waiting {wait}s')
time.sleep(wait)
else:
raise
raise RuntimeError(f'Max retries exceeded for {func}')โ๏ธ Chaining API Workflows
import asyncio
import httpx
from typing import List, Dict
async def enrich_leads_pipeline(raw_leads: List[Dict]) -> List[Dict]:
'''
Multi-API enrichment pipeline:
1. Validate emails via email verification API
2. Enrich company data via Clearbit API
3. Score leads using internal scoring API
4. Push qualified leads to CRM
'''
async with httpx.AsyncClient() as client:
# Step 1: Validate emails in parallel
email_tasks = [
client.get(f'https://api.emailvalidator.com/verify/{lead["email"]}',
headers={'Authorization': 'Bearer ...'})
for lead in raw_leads
]
email_results = await asyncio.gather(*email_tasks, return_exceptions=True)
valid_leads = [
lead for lead, result in zip(raw_leads, email_results)
if not isinstance(result, Exception) and result.json().get('valid')
]
print(f'{len(valid_leads)}/{len(raw_leads)} emails valid')
# Step 2: Enrich company data sequentially (rate limited API)
enriched = []
for lead in valid_leads:
try:
resp = await client.get(
f'https://company.clearbit.com/v1/companies/find?domain={lead["domain"]}',
headers={'Authorization': 'Bearer ...'},
)
company_data = resp.json() if resp.status_code == 200 else {}
enriched.append({**lead, 'company_size': company_data.get('metrics', {}).get('employees', 0)})
await asyncio.sleep(0.5) # respect rate limits
except Exception as e:
enriched.append({**lead, 'company_size': 0})
# Step 3: Score and push to CRM
qualified = [l for l in enriched if l['company_size'] > 50]
crm_task = client.post(
'https://api.crm.com/v1/leads/bulk',
json={'leads': qualified},
headers={'Authorization': 'Bearer ...'},
)
await crm_task
print(f'Pushed {len(qualified)} qualified leads to CRM')
return qualifiedAutomation Arena: REST API Gateway
Active Endpoints
GEThttps://api.voidxhq.com/v1/users
Requires Auth: Bearer Token
Traffic Monitor
Listening for incoming requests...
fetch_users.py
Python 3
1
2
3
4
5
6
7
8
9
10
11
12
CONSOLE OUTPUT
Waiting for script execution...
Knowledge Check
Ready to test your understanding of 9. API Automation?