15. AI + Data Science
Feature Pipelines for ML, Continuous Learning, and LLM Integration
The boundary between traditional data science and AI engineering is dissolving. Modern production ML systems require feature stores that serve consistent features across training and serving. Continuous learning systems update models as data distribution shifts. LLMs are now components in data pipelines — extracting structure from text, generating synthetic data, and powering intelligent automation. This module bridges data science with the AI engineering practices that make models stay useful in production.
🏪 Feature Stores: The Training-Serving Problem
The most insidious production ML failure is the training-serving skew: the features computed during training don't exactly match the features computed at serving time. This happens because:
- Training computes features in batch (daily aggregations, historical lookups)
- Serving computes features in real-time (must be millisecond-fast, different code path)
- Two separate codebases → two separate bugs → different results
from feast import FeatureStore, Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta
import pandas as pd
# Feature store ensures identical features in training and serving
store = FeatureStore(repo_path='feature_repo/')
# 1. TRAINING: Retrieve historical features (point-in-time correct)
entity_df = pd.DataFrame({
'customer_id': ['cust_001', 'cust_002', 'cust_003'],
'event_timestamp': pd.to_datetime(['2024-01-15', '2024-01-20', '2024-01-25'])
})
# Point-in-time join: for each row, get feature values as they were at event_timestamp
# Prevents future data leakage in historical training sets
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
'customer_features:avg_purchase_30d',
'customer_features:purchase_count_90d',
'customer_features:days_since_last_purchase',
'customer_features:preferred_category'
]
).to_df()
print('Training features shape:', training_df.shape)
# 2. SERVING: Same features, now in real-time from online store (Redis/DynamoDB)
feature_vector = store.get_online_features(
features=[
'customer_features:avg_purchase_30d',
'customer_features:purchase_count_90d',
'customer_features:days_since_last_purchase',
],
entity_rows=[{'customer_id': 'cust_001'}]
).to_dict()
print('Online features:', feature_vector)
# SAME FEATURE NAMES, SAME TRANSFORMATIONS — guaranteed consistency🔄 Continuous Learning and Model Drift
import numpy as np
import pandas as pd
from scipy.stats import ks_2samp
from typing import Dict
import mlflow
class ModelDriftDetector:
'''Detects when model inputs or outputs have drifted from training distribution'''
def __init__(self, reference_data: pd.DataFrame, threshold: float = 0.05):
self.reference = reference_data
self.threshold = threshold # p-value threshold for KS test
def check_drift(self, current_data: pd.DataFrame) -> Dict:
results = {}
for col in self.reference.select_dtypes(include='number').columns:
if col in current_data.columns:
# Kolmogorov-Smirnov test: are two samples from the same distribution?
ks_stat, p_value = ks_2samp(self.reference[col].dropna(),
current_data[col].dropna())
drifted = p_value < self.threshold
results[col] = {
'ks_statistic': ks_stat,
'p_value': p_value,
'drifted': drifted,
'severity': 'high' if ks_stat > 0.3 else 'medium' if ks_stat > 0.1 else 'low'
}
n_drifted = sum(1 for r in results.values() if r['drifted'])
print(f'Drift detected in {n_drifted}/{len(results)} features')
return results
class ContinuousLearningPipeline:
def __init__(self, model_name: str, drift_threshold: float = 0.05):
self.model_name = model_name
self.drift_threshold = drift_threshold
def evaluate_and_retrain(self, new_data: pd.DataFrame, reference_data: pd.DataFrame):
detector = ModelDriftDetector(reference_data, self.drift_threshold)
drift_report = detector.check_drift(new_data)
high_severity_drift = sum(1 for r in drift_report.values()
if r.get('severity') == 'high')
if high_severity_drift > 0:
print(f'{high_severity_drift} features have high-severity drift — triggering retraining')
self._retrain(new_data)
else:
print('Drift within acceptable bounds — model remains in production')
def _retrain(self, new_data: pd.DataFrame):
with mlflow.start_run(run_name=f'retrain_{pd.Timestamp.now().date()}'):
mlflow.log_param('model_name', self.model_name)
mlflow.log_param('training_rows', len(new_data))
# Training code here...
print('Model retrained and logged to MLflow')🤖 LLMs as Data Pipeline Components
from anthropic import Anthropic
import pandas as pd
from typing import List, Dict, Optional
import json
client = Anthropic()
def extract_structured_data_with_llm(
texts: List[str],
schema: Dict,
batch_size: int = 10
) -> pd.DataFrame:
'''
Use Claude to extract structured data from unstructured text at scale.
Useful for: parsing customer reviews, classifying support tickets,
extracting entities from documents, categorizing feedback.
'''
all_results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
prompt = f'''Extract structured information from each text below.
Return a JSON array with {len(batch)} objects, each with these fields:
{json.dumps(schema, indent=2)}
Texts:
{chr(10).join(f"{j+1}. {text}" for j, text in enumerate(batch))}
Return ONLY the JSON array, nothing else.'''
response = client.messages.create(
model='claude-opus-4-5',
max_tokens=2000,
messages=[{'role': 'user', 'content': prompt}]
)
try:
results = json.loads(response.content[0].text)
all_results.extend(results)
except json.JSONDecodeError:
print(f'Failed to parse batch {i//batch_size + 1}')
all_results.extend([{k: None for k in schema.keys()} for _ in batch])
return pd.DataFrame(all_results)
# Example: Extract sentiment and entities from customer reviews
reviews = [
'Great product but the delivery was 3 days late. Customer service was unhelpful.',
'Absolutely love it! Fast shipping to Lagos, perfect quality.',
]
extraction_schema = {
'sentiment': 'positive, negative, or neutral',
'sentiment_score': 'float between 0 and 1',
'issues': 'list of specific problems mentioned',
'praises': 'list of specific positives mentioned',
'would_recommend': 'boolean'
}
df_extracted = extract_structured_data_with_llm(reviews, extraction_schema)
print(df_extracted.to_string())Knowledge Check
Ready to test your understanding of 15. AI + Data Science?