180 MINS advanced
17. Capstone System
Module 17: Capstone
Full Pipeline from Ingestion to Deployment
The capstone is the culmination of every technique in this track — a fully integrated data science system that demonstrates you can build production-grade AI products from scratch. The VOIDX Analytics Platform ingests multi-source data, processes it through a validated ETL pipeline, serves ML predictions via a REST API, monitors model health, and delivers insights through an interactive dashboard. This is the system that becomes your portfolio flagship.
🏗️ System Architecture
The VOIDX Analytics Platform has five integrated layers:
- Ingestion Layer: REST API clients pulling from 3+ data sources, web scraping for market data, SQL queries from transactional DB, schema validation and data quality checks on every batch.
- Processing Layer: Prefect-orchestrated ETL pipeline, Delta Lake storage with ACID guarantees, feature engineering service producing 50+ model features, data versioning for reproducibility.
- Modeling Layer: Automated training with MLflow experiment tracking, ensemble model (XGBoost + LightGBM), hyperparameter optimization with Optuna, A/B testing between model versions.
- Serving Layer: FastAPI REST endpoint returning predictions + SHAP explanations, Redis caching for frequently requested predictions, model version management.
- Monitoring Layer: Feature drift detection with automated alerts, business metric dashboards, pipeline health monitoring, automated retraining trigger.
🚀 The Serving API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, validator
from typing import List, Dict, Optional
import mlflow.pyfunc
import pandas as pd
import shap
import redis
import json
from datetime import datetime
from loguru import logger
app = FastAPI(title='VOIDX Prediction API', version='2.0.0')
# Load model and artifacts at startup
model = mlflow.pyfunc.load_model('models:/revenue_predictor/Production')
redis_client = redis.Redis(host='redis', port=6379, db=0, decode_responses=True)
class PredictionRequest(BaseModel):
customer_id: str
features: Dict[str, float]
include_explanations: bool = False
@validator('features')
def validate_feature_count(cls, v):
required = {'recency', 'frequency', 'monetary', 'avg_order_value'}
missing = required - set(v.keys())
if missing:
raise ValueError(f'Missing required features: {missing}')
return v
class PredictionResponse(BaseModel):
customer_id: str
predicted_revenue: float
confidence_interval: List[float]
explanations: Optional[Dict[str, float]]
model_version: str
latency_ms: float
cached: bool
@app.post('/predict', response_model=PredictionResponse)
async def predict(request: PredictionRequest, background_tasks: BackgroundTasks):
start = datetime.now()
# Check cache
cache_key = f'pred:{request.customer_id}:{hash(str(sorted(request.features.items())))}'
cached = redis_client.get(cache_key)
if cached:
response = PredictionResponse(**json.loads(cached))
response.cached = True
return response
try:
df = pd.DataFrame([request.features])
prediction = float(model.predict(df)[0])
# Compute 95% prediction interval (model-specific — using quantile regression here)
ci_lower = prediction * 0.85
ci_upper = prediction * 1.15
explanations = None
if request.include_explanations:
explainer = shap.TreeExplainer(model._model_impl)
shap_vals = explainer.shap_values(df)
explanations = {k: float(v) for k, v in zip(request.features.keys(), shap_vals[0])}
explanations = dict(sorted(explanations.items(), key=lambda x: abs(x[1]), reverse=True)[:10])
latency = (datetime.now() - start).total_seconds() * 1000
response = PredictionResponse(
customer_id=request.customer_id,
predicted_revenue=prediction,
confidence_interval=[ci_lower, ci_upper],
explanations=explanations,
model_version='2.1.0',
latency_ms=latency,
cached=False
)
# Cache for 1 hour
redis_client.setex(cache_key, 3600, response.json())
# Log prediction for monitoring (non-blocking)
background_tasks.add_task(log_prediction, request, response)
return response
except Exception as e:
logger.exception(f'Prediction failed for {request.customer_id}: {e}')
raise HTTPException(status_code=500, detail='Prediction service error')
@app.get('/health')
async def health():
return {'status': 'healthy', 'model_version': '2.1.0',
'redis': redis_client.ping(), 'timestamp': datetime.now().isoformat()}
async def log_prediction(request, response):
# Write to database for model monitoring
logger.info(f'Prediction logged: customer={request.customer_id}, '
f'pred={response.predicted_revenue:.2f}, latency={response.latency_ms:.1f}ms')🐳 Deployment with Docker Compose
# docker-compose.yml
version: '3.8'
services:
api:
build: ./api
ports: ['8000:8000']
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- REDIS_URL=redis://redis:6379
depends_on: [mlflow, redis]
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:8000/health']
interval: 30s
timeout: 10s
retries: 3
mlflow:
image: ghcr.io/mlflow/mlflow:latest
ports: ['5000:5000']
volumes: ['mlflow_data:/mlflow']
command: mlflow server --host 0.0.0.0 --backend-store-uri /mlflow
redis:
image: redis:7-alpine
volumes: ['redis_data:/data']
dashboard:
build: ./dashboard
ports: ['8050:8050']
depends_on: [api]
prefect:
build: ./pipelines
command: prefect agent start -q production
environment:
- PREFECT_API_URL=http://prefect-server:4200/api
volumes:
mlflow_data:
redis_data:📋 Capstone Evaluation Rubric
- Data Pipeline (20pts): Multi-source ingestion, validation, incremental loading, documented schema
- Model Quality (20pts): Cross-validated performance, appropriate metric selection, baseline comparison, SHAP interpretability
- API Design (20pts): Input validation, error handling, caching, health endpoint, OpenAPI documentation
- Monitoring (20pts): Drift detection, prediction logging, business metric tracking, alerting
- Documentation (20pts): Architecture diagram, README with setup instructions, model card, data dictionary
Knowledge Check
Ready to test your understanding of 17. Capstone System?