16. Projects: Operator Mode
Sales Forecasting, Customer Segmentation, and A/B Engine
Operator Mode means building complete, production-ready systems โ not notebook experiments. Each project in this module represents a real-world data science deliverable: a sales forecasting system used by a finance team, a customer segmentation engine powering a marketing platform, and an A/B testing infrastructure that organizations use to make statistically rigorous product decisions. Build at least two to have a portfolio that demonstrates genuine capability.
๐ Project 1: End-to-End Sales Forecasting System
Business Problem: Finance needs monthly revenue forecasts 3 months ahead with confidence intervals for budgeting. Operations needs weekly unit forecasts by product category for inventory planning.
Architecture:
- Data ingestion from ERP/CRM via SQL queries (daily incremental load)
- Feature engineering: lag features, rolling statistics, calendar features, promotion flags
- Ensemble model: LightGBM + Prophet, with weighted average based on recent performance
- Automated model retraining when MAPE degrades beyond threshold
- Plotly Dash dashboard with drill-down by category, region, and time horizon
- Automated weekly email report to finance team with forecast vs. actuals
import pandas as pd
import numpy as np
from prophet import Prophet
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_absolute_percentage_error
from sklearn.model_selection import TimeSeriesSplit
class EnsembleForecaster:
def __init__(self, n_splits: int = 3):
self.prophet = Prophet(yearly_seasonality=True, weekly_seasonality=True,
daily_seasonality=False, changepoint_prior_scale=0.05)
self.lgbm = LGBMRegressor(n_estimators=500, learning_rate=0.05,
num_leaves=31, random_state=42)
self.weights = {'prophet': 0.4, 'lgbm': 0.6} # tuned via cross-validation
self.n_splits = n_splits
def fit(self, df: pd.DataFrame, feature_cols: list, target_col: str = 'y'):
# Train Prophet
prophet_df = df[['ds', 'y']].copy()
self.prophet.fit(prophet_df)
# Train LightGBM with time series cross-validation
tscv = TimeSeriesSplit(n_splits=self.n_splits)
X, y = df[feature_cols], df[target_col]
self.lgbm.fit(X, y) # final fit on all data
return self
def predict(self, df: pd.DataFrame, feature_cols: list, periods: int) -> pd.DataFrame:
# Prophet forecast
future = self.prophet.make_future_dataframe(periods=periods, freq='D')
prophet_pred = self.prophet.predict(future).tail(periods)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
# LightGBM forecast
lgbm_pred = self.lgbm.predict(df.tail(periods)[feature_cols])
# Ensemble
result = prophet_pred.copy()
result['prophet_pred'] = prophet_pred['yhat'].values
result['lgbm_pred'] = lgbm_pred
result['ensemble_pred'] = (
self.weights['prophet'] * result['prophet_pred'] +
self.weights['lgbm'] * result['lgbm_pred']
)
return result๐ฏ Project 2: Customer Segmentation Engine
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
class CustomerSegmentationEngine:
def __init__(self, n_segments: int = 5):
self.n_segments = n_segments
self.scaler = StandardScaler()
self.pca = PCA(n_components=10, random_state=42)
self.kmeans = KMeans(n_clusters=n_segments, random_state=42, n_init=20)
self.segment_profiles = None
def build_rfm_features(self, transactions: pd.DataFrame) -> pd.DataFrame:
ref_date = transactions['date'].max()
rfm = transactions.groupby('customer_id').agg(
recency=('date', lambda x: (ref_date - x.max()).days),
frequency=('order_id', 'count'),
monetary=('revenue', 'sum'),
avg_order_value=('revenue', 'mean'),
std_order_value=('revenue', 'std'),
distinct_categories=('category', 'nunique'),
days_active=('date', lambda x: (x.max() - x.min()).days + 1)
).fillna(0)
return rfm
def fit(self, rfm: pd.DataFrame) -> 'CustomerSegmentationEngine':
X_scaled = self.scaler.fit_transform(rfm)
X_pca = self.pca.fit_transform(X_scaled)
self.kmeans.fit(X_pca)
score = silhouette_score(X_pca, self.kmeans.labels_)
print(f'Silhouette Score: {score:.4f} (higher = better separated clusters)')
rfm['segment'] = self.kmeans.labels_
self.segment_profiles = rfm.groupby('segment').mean().round(2)
self.segment_profiles['size'] = rfm.groupby('segment').size()
self.segment_profiles['size_pct'] = self.segment_profiles['size'] / len(rfm) * 100
print('\nSegment Profiles:')
print(self.segment_profiles.to_string())
return self
def label_segments(self, rfm: pd.DataFrame) -> pd.DataFrame:
'''Assign human-readable labels based on segment characteristics'''
profiles = self.segment_profiles
labels = {}
for seg_id, row in profiles.iterrows():
if row['recency'] < 30 and row['monetary'] > profiles['monetary'].quantile(0.8):
labels[seg_id] = 'Champions'
elif row['frequency'] > profiles['frequency'].median() and row['recency'] < 60:
labels[seg_id] = 'Loyal Customers'
elif row['recency'] > 180:
labels[seg_id] = 'At Risk / Lost'
elif row['monetary'] > profiles['monetary'].median() and row['frequency'] == 1:
labels[seg_id] = 'High-Value New'
else:
labels[seg_id] = 'Potential Loyalists'
rfm['segment_label'] = rfm['segment'].map(labels)
return rfm๐งช Project 3: A/B Testing Infrastructure
from dataclasses import dataclass, field
from typing import Dict, Optional
import pandas as pd
import numpy as np
from scipy import stats
from statsmodels.stats.power import TTestIndPower
from datetime import datetime
@dataclass
class Experiment:
name: str
hypothesis: str
primary_metric: str
minimum_detectable_effect: float # e.g., 0.05 = 5% improvement
statistical_power: float = 0.80
significance_level: float = 0.05
started_at: datetime = field(default_factory=datetime.now)
stopped_at: Optional[datetime] = None
def required_sample_size(self, baseline_conversion: float) -> int:
effect_size = self.minimum_detectable_effect
analysis = TTestIndPower()
n = analysis.solve_power(
effect_size=effect_size / np.sqrt(baseline_conversion * (1-baseline_conversion)),
alpha=self.significance_level,
power=self.statistical_power
)
return int(np.ceil(n))
class ABTestingEngine:
def __init__(self):
self.experiments: Dict[str, Experiment] = {}
def create_experiment(self, exp: Experiment, baseline_conversion: float) -> dict:
n_required = exp.required_sample_size(baseline_conversion)
self.experiments[exp.name] = exp
return {
'experiment': exp.name,
'required_n_per_group': n_required,
'estimated_runtime_days': None, # compute from daily traffic
'started_at': exp.started_at
}
def analyze(self, control_data: pd.Series, variant_data: pd.Series,
exp_name: str) -> dict:
exp = self.experiments.get(exp_name)
stat, p_value = stats.ttest_ind(control_data, variant_data,
equal_var=False, alternative='two-sided')
pooled_std = np.sqrt((control_data.std()**2 + variant_data.std()**2) / 2)
cohens_d = (variant_data.mean() - control_data.mean()) / pooled_std
relative_lift = (variant_data.mean() - control_data.mean()) / control_data.mean()
return {
'experiment': exp_name,
'control_mean': control_data.mean(),
'variant_mean': variant_data.mean(),
'relative_lift': relative_lift,
'p_value': p_value,
'significant': p_value < exp.significance_level,
'cohens_d': cohens_d,
'recommendation': 'Ship variant' if p_value < exp.significance_level and relative_lift > 0
else 'Do not ship' if p_value < exp.significance_level
else 'Gather more data'
}Knowledge Check
Ready to test your understanding of 16. Projects: Operator Mode?