Data Processing
Garbage in, garbage out: Data quality is the binding constraint on model quality. 60-80% of ML project effort typically goes here, yet it is unsexy and easy to skip.
Garbage in, garbage out: Data quality is the binding constraint on model quality. 60–80% of ML project effort typically goes here, yet it’s unsexy and easy to skip.
The Data Pipeline in Production
Data processing is a 4-stage pipeline that repeats every retraining cycle:
1
2
3
4
5
6
7
8
9
10
11
Raw Data Sources
↓
[Extraction] → Pull from logs, APIs, databases
↓
[Cleaning] → Fix quality issues (nulls, dupes, type errors)
↓
[Feature Engineering] → Compute features, handle transformations
↓
[Validation & Splits] → Create train/val/test, verify no leakage
↓
Ready for Model Training
Stage 1: Data Collection & Assessment
Data Sources
Common sources for ML projects:
Event Logs (Most Common):
- User interactions (clicks, purchases, views)
- System behavior (latency, errors, resource usage)
- Format: Structured (BigQuery, Kafka), semi-structured (JSON), unstructured (text logs)
- Volume: 1B–1T events/day (large-scale companies)
- Freshness: Real-time (streaming) or batch (hourly/daily)
Databases:
- User profiles (age, location, account creation date)
- Item metadata (title, category, price, tags)
- Transactional records (orders, payments, returns)
- Update frequency: Static (rarely changes) or dynamic (daily updates)
Third-party APIs:
- Weather data (for demand forecasting)
- Demographic data (market research providers)
- Real estate data (property listings, prices)
- Cost/latency: API rate limits, payment per call
Sensors & IoT:
- Device telemetry (sensor readings, equipment metrics)
- User behavior (accelerometer, location, screen time)
- Freshness: Real-time streaming (high latency sensitivity)
Data Quality Assessment
Before investing in cleaning, assess the damage:
| Dimension | Good | Acceptable | Problem |
|---|---|---|---|
| Completeness | <1% missing | 1–5% missing | >5% missing (can’t train) |
| Duplicates | <0.1% | 0.1–1% | >1% (biases model) |
| Consistency | Single source | Multiple sources, validated | Conflicting values |
| Timeliness | Real-time | <24h delay | >1 week old (stale) |
| Accuracy | Programmatic labels | Expert verified | User-generated (noisy) |
| Coverage | All needed features | Some features sparse | Key features missing |
Compliance & Privacy
GDPR/CCPA Requirements:
- Do we have user consent to use their data?
- Data retention: Can users request deletion?
- Cross-border transfer: EU data can’t flow to US without Privacy Shield/SCCs
- Anonymization: Can we remove PII before training?
Bias & Fairness:
- Protected attributes: Race, gender, age (in US hiring, lending)
- Disparate impact: Does model treat groups differently?
- Measurement: Regular bias audits required for regulated industries
Stage 2: Data Cleaning
Reality: 80% of the time is spent fixing data quality issues.
Missing Values
| Strategy | Cost | When to Use | Caveat |
|---|---|---|---|
| Delete rows | Low | <5% missing, missing at random | Lose data |
| Mean/median impute | Low | Numeric features, MCAR* | Biases estimates |
| Forward fill | Low | Time-series (use previous value) | Assumes no change |
| Model imputation | High | Important features, complex patterns | Adds bias; expensive |
| Domain knowledge | High | Business logic (e.g., “missing = never purchased”) | Manual; doesn’t scale |
*MCAR = Missing Completely At Random (safest assumption)
Implementation:
1
2
3
4
5
6
7
8
9
10
11
# Bad: ignores why it's missing
df.fillna(df.mean()) # All nulls become mean value
# Better: mark missingness, then impute
df['age_missing'] = df['age'].isna().astype(int) # Track missing
df['age'].fillna(df['age'].median(), inplace=True) # Impute
# Best: use sklearn's IterativeImputer (learns patterns)
from sklearn.impute import IterativeImputer
imputer = IterativeImputer(estimator=Ridge(), max_iter=10)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
Outliers
Outliers inflate loss and can dominate gradients:
Detection Methods:
- IQR method:
outlier = x < Q1-1.5*IQR OR x > Q3+1.5*IQR - Z-score:
|z| > 3(assumes Gaussian) - Domain knowledge: “More than 100 orders in one day is fraud”
Handling:
1
2
3
4
5
6
7
8
9
10
11
12
13
import numpy as np
# IQR-based removal
Q1 = df['purchase_amount'].quantile(0.25)
Q3 = df['purchase_amount'].quantile(0.75)
IQR = Q3 - Q1
mask = ~((df['purchase_amount'] < Q1 - 1.5*IQR) |
(df['purchase_amount'] > Q3 + 1.5*IQR))
df_clean = df[mask]
# Or capping (less aggressive)
upper = Q3 + 1.5*IQR
df['purchase_amount'] = df['purchase_amount'].clip(upper=upper)
Duplicates
Exact duplicates: Same row appears twice (data collection error)
1
df.drop_duplicates(inplace=True) # Remove exact copies
Fuzzy duplicates: Same user appears with typos in name, different spelling
1
2
3
4
5
6
7
8
9
10
# More complex: requires approximate matching (fuzzy-wuzzy, soundex)
from fuzzywuzzy import fuzz
duplicates = []
for i, row1 in df.iterrows():
for j, row2 in df.iterrows():
if i < j:
sim = fuzz.token_sort_ratio(row1['name'], row2['name'])
if sim > 90: # 90% match = likely duplicate
duplicates.append((i, j))
Stage 3: Feature Engineering
This is where domain knowledge creates value.
Scaling & Normalization
Most algorithms require features on similar scales:
| Method | Range | Use Case | Formula |
|---|---|---|---|
| Min-Max | [0, 1] | Bounded, interpretable | (x - min) / (max - min) |
| Standardization | ~[-3, 3] | Gaussian features | (x - mean) / std |
| Log transform | [0, inf) | Right-skewed (power laws) | log(x + 1) |
| Robust scaling | Relative | Outliers present | (x - median) / IQR |
1
2
3
4
5
6
7
8
9
10
from sklearn.preprocessing import StandardScaler, MinMaxScaler
# Standardization (recommended for neural networks)
scaler = StandardScaler()
X_scaled = scaler.fit(X_train).transform(X_train)
X_test_scaled = scaler.transform(X_test) # Use train stats!
# Min-Max (good for tree models; less sensitive to scale)
minmax = MinMaxScaler(feature_range=(0, 1))
X_scaled = minmax.fit(X_train).transform(X_train)
Critical: Fit scaler on TRAIN only, apply to TEST. Never fit on full dataset.
Categorical Encoding
One-Hot Encoding (most common):
1
2
3
4
5
6
7
8
from sklearn.preprocessing import OneHotEncoder
# Bad: lose information
df['category_encoded'] = df['category'].astype('category').cat.codes
# Good: one-hot (but be careful with high cardinality)
df_encoded = pd.get_dummies(df, columns=['category'], drop_first=True)
# Result: category_A (0/1), category_B (0/1), category_C (0/1)
Ordinal Encoding (when categories have order):
1
2
3
4
5
6
7
8
# Bad: doesn't capture order
mapping = {'low': 0, 'medium': 1, 'high': 2}
df['priority_encoded'] = df['priority'].map(mapping)
# Better: explicit ordering
from sklearn.preprocessing import OrdinalEncoder
enc = OrdinalEncoder(categories=[['low', 'medium', 'high']])
df[['priority']] = enc.fit_transform(df[['priority']])
Target Encoding (risky, but powerful):
1
2
3
4
5
6
7
# For categorical feature, encode as average target value
# Example: color_code = average CTR for that color
target_mean = df.groupby('color')['clicked'].mean()
df['color_target_encoded'] = df['color'].map(target_mean)
# Risk: severe overfitting if not careful
# Mitigation: Use cross-fold average (train only, not val/test)
Domain-Specific Features
Creating custom features leverages business knowledge:
Temporal Features:
1
2
3
4
df['hour_of_day'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
df['days_since_signup'] = (pd.Timestamp.now() - df['signup_date']).dt.days
Aggregation Features:
1
2
3
4
5
6
7
8
9
# User-level aggregations (from historical data)
user_stats = df.groupby('user_id').agg({
'purchase_amount': ['mean', 'sum', 'count'],
'days_since_purchase': 'min'
}).reset_index()
user_stats.columns = ['user_id', 'avg_purchase', 'total_spent',
'order_count', 'days_since_last_order']
df = df.merge(user_stats, on='user_id', how='left')
Interaction Features:
1
2
3
# Combinations that matter for your problem
df['age_income_interaction'] = df['age'] * df['income']
df['high_value_user'] = ((df['income'] > 100000) & (df['age'] > 35)).astype(int)
Stage 4: Data Quality & Validation
Class Imbalance
Imbalanced data leads to biased models (predicts majority class):
Problem: 99% of transactions are legitimate, 1% fraud
- Model: “Always predict legitimate” → 99% accuracy (useless!)
- Recall: 0% (catches 0% of actual fraud)
Solutions:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE
# Option 1: Class weights (tell model minority is more important)
class_weights = class_weight.compute_class_weight(
'balanced',
classes=np.unique(y_train),
y=y_train
)
# Trains model.fit(X_train, y_train, sample_weight=weights)
# Option 2: Resampling (oversample minority, undersample majority)
smote = SMOTE(sampling_strategy=0.5) # Make minority 50% of majority
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)
# Option 3: Stratified splitting (ensure both sets have same ratio)
from sklearn.model_selection import StratifiedShuffleSplit
split = StratifiedShuffleSplit(n_splits=1, test_size=0.2)
for train_idx, test_idx in split.split(X, y):
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
# Both train and test have same class ratio as original
Train/Val/Test Splitting
Standard Split (80/10/10, random):
1
2
3
4
5
from sklearn.model_selection import train_test_split
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.1)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.111)
# Results: 80% train, 10% val, 10% test
Stratified Split (preserves class distribution):
1
2
3
4
5
6
X_train, X_temp, y_train, y_temp = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42
)
X_val, X_test, y_val, y_test = train_test_split(
X_temp, y_temp, test_size=0.5, stratify=y_temp
)
Time-Series Split (prevents leakage for temporal data):
1
2
3
4
5
6
7
8
# WRONG: random split mixes past and future
X_train, X_test = train_test_split(X, test_size=0.2)
# RIGHT: temporal boundary
split_point = int(0.8 * len(X))
X_train, X_test = X[:split_point], X[split_point:]
y_train, y_test = y[:split_point], y[split_point:]
# Train on Jan–Oct 2024, test on Nov–Dec 2024
Exploratory Data Analysis (EDA)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import matplotlib.pyplot as plt
import seaborn as sns
# 1. Univariate (feature distributions)
df.describe() # Mean, std, min, max
df.hist(bins=50, figsize=(12, 8)) # Visual inspection
plt.show()
# 2. Bivariate (feature-target relationship)
sns.boxplot(data=df, x='category', y='target') # Category vs target
plt.show()
# 3. Correlations
corr_matrix = df.corr()
sns.heatmap(corr_matrix, annot=True, fmt='.2f') # Heatmap
plt.show()
# 4. Check for multicollinearity
# High correlation between features → redundant, causes instability
vif = pd.DataFrame({
'feature': X.columns,
'VIF': [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
})
print(vif) # VIF > 5 → problematic multicollinearity
Data Versioning & Reproducibility
Problem: “Which data version trained this model?” Often impossible to answer.
Solution: Data versioning tools
1
2
3
4
5
6
7
8
9
10
# DVC (Data Version Control)
# Track data like you track code (git-like interface)
dvc add data/raw/train.csv
dvc add data/processed/features.pkl
dvc commit -m "Updated training data: removed outliers, added new features"
dvc checkout # Restore old data versions
# Delta Lake (Databricks)
# Time-travel data; rollback to any version
SELECT * FROM training_data VERSION AS OF 12345
Key Properties by Stage
| Stage | Typical Duration | Effort | Tools | Common Issues |
|---|---|---|---|---|
| Collection | 1–2 weeks | 1 FTE | SQL, dbt, Python | Data unavailable, compliance issues |
| Cleaning | 2–4 weeks | 1–2 FTE | Pandas, SQL, custom scripts | 80% time spent here |
| Feature Engineering | 3–6 weeks | 1–2 FTE | scikit-learn, custom Python | Over-engineered features |
| Validation & Splits | 1 week | 0.5 FTE | scikit-learn, pandas | Data leakage, imbalance |
Implementation Example: Full Data Pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedShuffleSplit, train_test_split
from imblearn.over_sampling import SMOTE
# Load raw data
df = pd.read_csv('s3://data-lake/user_interactions_2026_q1.csv')
print(f"Initial shape: {df.shape}")
# ===== Stage 1: Assessment =====
print(f"\nMissing values:\n{df.isnull().sum()}")
print(f"\nData types:\n{df.dtypes}")
# ===== Stage 2: Cleaning =====
# Remove exact duplicates
df = df.drop_duplicates()
# Handle missing values
df['user_age'].fillna(df['user_age'].median(), inplace=True)
df['item_category'].fillna('unknown', inplace=True)
# Remove outliers (purchase amount > 3 std dev)
mean = df['purchase_amount'].mean()
std = df['purchase_amount'].std()
df = df[df['purchase_amount'] <= mean + 3*std]
# ===== Stage 3: Feature Engineering =====
# Temporal features
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
# User aggregations
user_agg = df.groupby('user_id').agg({
'purchase_amount': ['mean', 'sum', 'count'],
'timestamp': lambda x: (pd.Timestamp.now() - x.max()).days
}).reset_index()
user_agg.columns = ['user_id', 'user_avg_purchase', 'user_total_spent',
'user_order_count', 'user_days_since_purchase']
df = df.merge(user_agg, on='user_id', how='left')
# Item features
item_agg = df.groupby('item_id')['clicked'].mean().reset_index()
item_agg.columns = ['item_id', 'item_ctr']
df = df.merge(item_agg, on='item_id', how='left')
# Scaling
numeric_cols = ['user_age', 'purchase_amount', 'user_avg_purchase']
scaler = StandardScaler()
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
# Categorical encoding
df = pd.get_dummies(df, columns=['item_category'], drop_first=True)
# ===== Stage 4: Validation & Splits =====
# Stratified split (preserve class distribution)
X = df.drop('clicked', axis=1)
y = df['clicked']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.125, stratify=y_train
)
print(f"\nTrain: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
print(f"Class distribution (train): {y_train.value_counts().to_dict()}")
# Handle imbalance (SMOTE)
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)
print(f"\nAfter SMOTE: {X_train_resampled.shape}")
print(f"Class distribution (resampled): {pd.Series(y_train_resampled).value_counts().to_dict()}")
# Ready for model training!
print("\nData pipeline complete. Ready for Stage 4: Model Development")
How Real Companies Use This
Uber’s Data Processing for Michelangelo Platform: Uber processes 5TB of raw feature data daily across 100+ production ML models, each requiring distinct data pipelines. Data sources: Kafka streams (real-time events from millions of riders/drivers), PostgreSQL (user profiles, trip history), Redis (cached features for low-latency serving). Data cleaning pipeline: deduplication (same trip event fires multiple times), missing value imputation (driver location sometimes unavailable), outlier removal (unrealistic surge multipliers). Feature engineering: temporal aggregations (average driver rating past 7 days, 30 days), geospatial features (grid-based location bucketing for traffic patterns), behavioral signals (driver acceptance rate, cancellation rate). Data quality validation: 200+ automated checks per feature (null rate <5%, cardinality bounds, distribution stability). Train/test split: temporal split (train on past 60 days, validate on next 7 days, test on final 7 days) to prevent leakage in time-sensitive predictions. Pipeline time: 4–6 hours from raw event to training-ready dataset, run daily to capture fresh patterns.
Airbnb’s Golden Dataset for Pricing and Search: Airbnb processes 500TB/day of user activity data but recognized that raw user behavior includes biases (users only see items the ranking system shows). Solution: manually create “golden dataset” (10k properties labeled by internal experts) used as ground truth for offline evaluation and to detect data drift. Data processing stages: label extraction (which users interacted with which property), feature engineering (property embeddings from description text via sentence transformers, image quality scores via ResNet, neighborhood context from crime/transit data). Class imbalance handling: listing availability is imbalanced (popular listings overrepresented), solved via stratified sampling. Data quality: expert annotations validated with inter-annotator agreement >0.85 (Cohen’s Kappa). Feature store implementation: computed features cached in DynamoDB, updated nightly, enables sub-100ms serving. Retraining data pipeline: weekly incremental training on fresh booking data, with automated drift detection to catch distribution shifts.
Stripe’s Fraud Detection Data Processing: Stripe processes 5M+ transactions daily for fraud detection, where labeling is adversarial (fraudsters adapt). Data sources: transaction metadata (card, merchant, amount, location), behavioral signals (historical transaction velocity), external data (IP reputation, email verification). Labeling strategy: programmatic (transactions disputed/chargebacked = fraudulent), but ground truth lags 30–90 days. Class imbalance extreme: 99.9% legitimate, 0.1% fraud. Solution: oversampling fraud cases + undersampling legitimate, with class weights during training. Feature engineering: velocity features (transactions in last hour, last day, last month), temporal features (time of day, day of week), amount ratios (transaction amount vs historical average). Data quality: 99.99% completeness required; missing values cause automatic decline (safety-first). Train/test split: temporal split (train on 60 days, test on 7 days) to simulate production drift. Retraining cadence: daily, with immediate retraining triggered if false positive rate exceeds SLA.
Wayfair’s Demand Forecasting Data Pipeline: Wayfair forecasts demand for 10M+ SKUs daily to optimize inventory. Data sources: historical sales, website traffic, search trends, seasonality calendar. Cleaning challenges: returns and cancellations complicate sales labels; solution is to use “fulfilled demand” (quantity kept, not returned). Feature engineering: trend extraction (exponential moving average over 7/30/90 days), seasonality signals (day-of-week, holiday calendars, markdown events), contextual features (competitor prices scraped daily, inventory levels). Data quality: missing values (product not sold on some days) handled via forward-fill + holiday adjustment. Train/test split: temporal split essential for demand forecasting (train on past 2 years, validate on next 3 months, test on final 30 days). Feature pipeline: runs 6 hours daily, generating 200+ features per SKU. Validation: demand distribution checked for stationarity (if sales suddenly 10x, it’s likely data error, not trend).
LinkedIn’s Profile Feature Pipeline: LinkedIn processes member profile features (skills, experience, connections) for ranking and search. Data sources: member-provided (resume, education), inferred (job transition patterns), derived (skill combinations from job titles). Labeling challenges: member-provided data is voluntary and noisy (typos in skills, outdated experience dates). Cleaning: deduplication of similar skills (Python, Python 3, Py unified), fuzzy matching on company names (Google, Alphabet unified), temporal validation (end-date after start-date). Feature engineering: skill frequency (how many members list this skill?), skill trends (hiring demand over time), skill combination graphs (skills commonly paired). Data quality: profile completeness tracked (missing experience = quality concern). Train/test split: user-level split (same member never in train and test) to prevent leakage, since collaborative filtering uses member similarity. Feature staleness: member profiles update asynchronously; features cached for 24 hours. Retraining cadence: weekly, capturing emerging skill trends and career transitions.
References
- Feature Engineering for Machine Learning (Cassie Kozyrkov, Google Crash Course) — Comprehensive course
- Feature Engineering Handbook (Gupta, Muller) — Reference guide
- The Dark Secrets of Transfer Learning (Korshunov & Theis, 2017) — Why data matters more than algorithms
- Pandas Documentation — Data manipulation reference
- scikit-learn Preprocessing — Standard tools
- Hands-On Machine Learning (Aurelien Geron) — Chapter 2: Data handling
- Fast.ai: Lesson 1 (Data Handling & EDA) — Practical walkthrough
- DVC (Data Version Control) — Data reproducibility
- Fairness and Machine Learning (Barocas et al.) — Bias in data