80 MINS beginner
3. Data Collection & Ingestion
Module 03: Data Ingestion
APIs, Scraping, Databases, and Validation
Before any analysis can happen, data must be collected. In real data science work, data collection is rarely as simple as downloading a CSV. You query APIs with authentication and pagination, scrape websites that weren't designed to be scraped, pull from multiple databases with different schemas, and must validate everything that arrives because external data sources are unreliable. This module covers the complete data ingestion stack used by professional data teams.
🌐 REST API Ingestion
import requests
import pandas as pd
from typing import List, Dict, Optional
import time
from loguru import logger
def fetch_paginated_api(
base_url: str,
endpoint: str,
api_key: str,
params: Dict = None,
max_pages: int = 100,
delay: float = 0.5
) -> pd.DataFrame:
'''Generic paginated API fetcher — handles most REST pagination patterns'''
all_records = []
session = requests.Session()
session.headers.update({
'Authorization': f'Bearer {api_key}',
'Accept': 'application/json',
'User-Agent': 'DataPipeline/1.0'
})
page = 1
while page <= max_pages:
try:
response = session.get(
f'{base_url}/{endpoint}',
params={**(params or {}), 'page': page, 'per_page': 100},
timeout=30
)
response.raise_for_status()
data = response.json()
records = data.get('data', data.get('items', data.get('results', [])))
if not records:
logger.info(f'No more records at page {page}')
break
all_records.extend(records)
logger.info(f'Fetched page {page}: {len(records)} records (total: {len(all_records)})')
# Check for 'has_more' or 'next_page' pagination signals
if not data.get('has_more', True) or not data.get('next_page'):
break
page += 1
time.sleep(delay) # rate limiting
except requests.HTTPError as e:
if e.response.status_code == 429: # rate limited
retry_after = int(e.response.headers.get('Retry-After', 60))
logger.warning(f'Rate limited. Waiting {retry_after}s')
time.sleep(retry_after)
else:
logger.error(f'HTTP error on page {page}: {e}')
break
return pd.json_normalize(all_records) # flatten nested JSON🔗 Database Ingestion with SQLAlchemy
from sqlalchemy import create_engine, text
import pandas as pd
from typing import Iterator
def stream_large_table(
connection_string: str,
table: str,
chunk_size: int = 50_000,
where_clause: str = ''
) -> Iterator[pd.DataFrame]:
'''Stream a large database table without loading it all into RAM'''
engine = create_engine(connection_string)
query = f'SELECT * FROM {table} {where_clause}'
with engine.connect() as conn:
for chunk in pd.read_sql(
text(query),
conn,
chunksize=chunk_size
):
yield chunk
# Incremental loading — only fetch new records
def incremental_load(
engine,
source_table: str,
target_df: pd.DataFrame,
watermark_col: str = 'updated_at'
) -> pd.DataFrame:
last_value = target_df[watermark_col].max() if not target_df.empty else '1970-01-01'
query = f'''
SELECT * FROM {source_table}
WHERE {watermark_col} > '{last_value}'
ORDER BY {watermark_col}
'''
new_data = pd.read_sql(query, engine)
logger.info(f'Incremental load: {len(new_data)} new records since {last_value}')
return pd.concat([target_df, new_data], ignore_index=True)✅ Data Validation: Trust Nothing
import pandas as pd
import numpy as np
from dataclasses import dataclass, field
from typing import List, Callable, Any
@dataclass
class ValidationResult:
passed: bool
rule_name: str
failed_count: int = 0
message: str = ''
class DataValidator:
def __init__(self, df: pd.DataFrame):
self.df = df
self.results: List[ValidationResult] = []
def expect_no_nulls(self, column: str) -> 'DataValidator':
null_count = self.df[column].isnull().sum()
self.results.append(ValidationResult(
passed=null_count == 0,
rule_name=f'no_nulls:{column}',
failed_count=int(null_count),
message=f'{null_count} null values found in {column}'
))
return self # enable chaining
def expect_unique(self, column: str) -> 'DataValidator':
dup_count = self.df[column].duplicated().sum()
self.results.append(ValidationResult(
passed=dup_count == 0,
rule_name=f'unique:{column}',
failed_count=int(dup_count),
message=f'{dup_count} duplicate values in {column}'
))
return self
def expect_in_range(self, column: str, min_val, max_val) -> 'DataValidator':
out_of_range = ((self.df[column] < min_val) | (self.df[column] > max_val)).sum()
self.results.append(ValidationResult(
passed=out_of_range == 0,
rule_name=f'range:{column}',
failed_count=int(out_of_range),
message=f'{out_of_range} values outside [{min_val}, {max_val}]'
))
return self
def expect_values_in_set(self, column: str, valid_values: set) -> 'DataValidator':
invalid = ~self.df[column].isin(valid_values)
self.results.append(ValidationResult(
passed=not invalid.any(),
rule_name=f'value_set:{column}',
failed_count=int(invalid.sum()),
message=f'Invalid values: {self.df.loc[invalid, column].unique()[:5].tolist()}'
))
return self
def report(self) -> bool:
all_passed = all(r.passed for r in self.results)
for result in self.results:
status = '✅' if result.passed else '❌'
print(f'{status} {result.rule_name}: {result.message}')
print(f'\nValidation: {"PASSED" if all_passed else "FAILED"} '
f'({sum(r.passed for r in self.results)}/{len(self.results)} rules passed)')
return all_passed
# Usage
df = pd.read_csv('sales_data.csv')
validator = DataValidator(df)
is_valid = (
validator
.expect_no_nulls('customer_id')
.expect_unique('transaction_id')
.expect_in_range('amount', 0, 1_000_000)
.expect_values_in_set('status', {'pending', 'completed', 'refunded', 'cancelled'})
.report()
)
if not is_valid:
raise ValueError('Data validation failed. Halting pipeline.')Knowledge Check
Ready to test your understanding of 3. Data Collection & Ingestion?