Post

Data Processing

Garbage in, garbage out: Data quality is the binding constraint on model quality. 60-80% of ML project effort typically goes here, yet it is unsexy and easy to skip.

Data Processing

Garbage in, garbage out: Data quality is the binding constraint on model quality. 60–80% of ML project effort typically goes here, yet it’s unsexy and easy to skip.

The Data Pipeline in Production

Data processing is a 4-stage pipeline that repeats every retraining cycle:

1
2
3
4
5
6
7
8
9
10
11
Raw Data Sources
      ↓
[Extraction] → Pull from logs, APIs, databases
      ↓
[Cleaning] → Fix quality issues (nulls, dupes, type errors)
      ↓
[Feature Engineering] → Compute features, handle transformations
      ↓
[Validation & Splits] → Create train/val/test, verify no leakage
      ↓
Ready for Model Training

Stage 1: Data Collection & Assessment

Data Sources

Common sources for ML projects:

Event Logs (Most Common):

  • User interactions (clicks, purchases, views)
  • System behavior (latency, errors, resource usage)
  • Format: Structured (BigQuery, Kafka), semi-structured (JSON), unstructured (text logs)
  • Volume: 1B–1T events/day (large-scale companies)
  • Freshness: Real-time (streaming) or batch (hourly/daily)

Databases:

  • User profiles (age, location, account creation date)
  • Item metadata (title, category, price, tags)
  • Transactional records (orders, payments, returns)
  • Update frequency: Static (rarely changes) or dynamic (daily updates)

Third-party APIs:

  • Weather data (for demand forecasting)
  • Demographic data (market research providers)
  • Real estate data (property listings, prices)
  • Cost/latency: API rate limits, payment per call

Sensors & IoT:

  • Device telemetry (sensor readings, equipment metrics)
  • User behavior (accelerometer, location, screen time)
  • Freshness: Real-time streaming (high latency sensitivity)

Data Quality Assessment

Before investing in cleaning, assess the damage:

Dimension Good Acceptable Problem
Completeness <1% missing 1–5% missing >5% missing (can’t train)
Duplicates <0.1% 0.1–1% >1% (biases model)
Consistency Single source Multiple sources, validated Conflicting values
Timeliness Real-time <24h delay >1 week old (stale)
Accuracy Programmatic labels Expert verified User-generated (noisy)
Coverage All needed features Some features sparse Key features missing

Compliance & Privacy

GDPR/CCPA Requirements:

  • Do we have user consent to use their data?
  • Data retention: Can users request deletion?
  • Cross-border transfer: EU data can’t flow to US without Privacy Shield/SCCs
  • Anonymization: Can we remove PII before training?

Bias & Fairness:

  • Protected attributes: Race, gender, age (in US hiring, lending)
  • Disparate impact: Does model treat groups differently?
  • Measurement: Regular bias audits required for regulated industries

Stage 2: Data Cleaning

Reality: 80% of the time is spent fixing data quality issues.

Missing Values

Strategy Cost When to Use Caveat
Delete rows Low <5% missing, missing at random Lose data
Mean/median impute Low Numeric features, MCAR* Biases estimates
Forward fill Low Time-series (use previous value) Assumes no change
Model imputation High Important features, complex patterns Adds bias; expensive
Domain knowledge High Business logic (e.g., “missing = never purchased”) Manual; doesn’t scale

*MCAR = Missing Completely At Random (safest assumption)

Implementation:

1
2
3
4
5
6
7
8
9
10
11
# Bad: ignores why it's missing
df.fillna(df.mean())  # All nulls become mean value

# Better: mark missingness, then impute
df['age_missing'] = df['age'].isna().astype(int)  # Track missing
df['age'].fillna(df['age'].median(), inplace=True)  # Impute

# Best: use sklearn's IterativeImputer (learns patterns)
from sklearn.impute import IterativeImputer
imputer = IterativeImputer(estimator=Ridge(), max_iter=10)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])

Outliers

Outliers inflate loss and can dominate gradients:

Detection Methods:

  • IQR method: outlier = x < Q1-1.5*IQR OR x > Q3+1.5*IQR
  • Z-score: |z| > 3 (assumes Gaussian)
  • Domain knowledge: “More than 100 orders in one day is fraud”

Handling:

1
2
3
4
5
6
7
8
9
10
11
12
13
import numpy as np

# IQR-based removal
Q1 = df['purchase_amount'].quantile(0.25)
Q3 = df['purchase_amount'].quantile(0.75)
IQR = Q3 - Q1
mask = ~((df['purchase_amount'] < Q1 - 1.5*IQR) |
         (df['purchase_amount'] > Q3 + 1.5*IQR))
df_clean = df[mask]

# Or capping (less aggressive)
upper = Q3 + 1.5*IQR
df['purchase_amount'] = df['purchase_amount'].clip(upper=upper)

Duplicates

Exact duplicates: Same row appears twice (data collection error)

1
df.drop_duplicates(inplace=True)  # Remove exact copies

Fuzzy duplicates: Same user appears with typos in name, different spelling

1
2
3
4
5
6
7
8
9
10
# More complex: requires approximate matching (fuzzy-wuzzy, soundex)
from fuzzywuzzy import fuzz

duplicates = []
for i, row1 in df.iterrows():
    for j, row2 in df.iterrows():
        if i < j:
            sim = fuzz.token_sort_ratio(row1['name'], row2['name'])
            if sim > 90:  # 90% match = likely duplicate
                duplicates.append((i, j))

Stage 3: Feature Engineering

This is where domain knowledge creates value.

Scaling & Normalization

Most algorithms require features on similar scales:

Method Range Use Case Formula
Min-Max [0, 1] Bounded, interpretable (x - min) / (max - min)
Standardization ~[-3, 3] Gaussian features (x - mean) / std
Log transform [0, inf) Right-skewed (power laws) log(x + 1)
Robust scaling Relative Outliers present (x - median) / IQR
1
2
3
4
5
6
7
8
9
10
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# Standardization (recommended for neural networks)
scaler = StandardScaler()
X_scaled = scaler.fit(X_train).transform(X_train)
X_test_scaled = scaler.transform(X_test)  # Use train stats!

# Min-Max (good for tree models; less sensitive to scale)
minmax = MinMaxScaler(feature_range=(0, 1))
X_scaled = minmax.fit(X_train).transform(X_train)

Critical: Fit scaler on TRAIN only, apply to TEST. Never fit on full dataset.

Categorical Encoding

One-Hot Encoding (most common):

1
2
3
4
5
6
7
8
from sklearn.preprocessing import OneHotEncoder

# Bad: lose information
df['category_encoded'] = df['category'].astype('category').cat.codes

# Good: one-hot (but be careful with high cardinality)
df_encoded = pd.get_dummies(df, columns=['category'], drop_first=True)
# Result: category_A (0/1), category_B (0/1), category_C (0/1)

Ordinal Encoding (when categories have order):

1
2
3
4
5
6
7
8
# Bad: doesn't capture order
mapping = {'low': 0, 'medium': 1, 'high': 2}
df['priority_encoded'] = df['priority'].map(mapping)

# Better: explicit ordering
from sklearn.preprocessing import OrdinalEncoder
enc = OrdinalEncoder(categories=[['low', 'medium', 'high']])
df[['priority']] = enc.fit_transform(df[['priority']])

Target Encoding (risky, but powerful):

1
2
3
4
5
6
7
# For categorical feature, encode as average target value
# Example: color_code = average CTR for that color
target_mean = df.groupby('color')['clicked'].mean()
df['color_target_encoded'] = df['color'].map(target_mean)

# Risk: severe overfitting if not careful
# Mitigation: Use cross-fold average (train only, not val/test)

Domain-Specific Features

Creating custom features leverages business knowledge:

Temporal Features:

1
2
3
4
df['hour_of_day'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
df['days_since_signup'] = (pd.Timestamp.now() - df['signup_date']).dt.days

Aggregation Features:

1
2
3
4
5
6
7
8
9
# User-level aggregations (from historical data)
user_stats = df.groupby('user_id').agg({
    'purchase_amount': ['mean', 'sum', 'count'],
    'days_since_purchase': 'min'
}).reset_index()
user_stats.columns = ['user_id', 'avg_purchase', 'total_spent',
                       'order_count', 'days_since_last_order']

df = df.merge(user_stats, on='user_id', how='left')

Interaction Features:

1
2
3
# Combinations that matter for your problem
df['age_income_interaction'] = df['age'] * df['income']
df['high_value_user'] = ((df['income'] > 100000) & (df['age'] > 35)).astype(int)

Stage 4: Data Quality & Validation

Class Imbalance

Imbalanced data leads to biased models (predicts majority class):

Problem: 99% of transactions are legitimate, 1% fraud

  • Model: “Always predict legitimate” → 99% accuracy (useless!)
  • Recall: 0% (catches 0% of actual fraud)

Solutions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE

# Option 1: Class weights (tell model minority is more important)
class_weights = class_weight.compute_class_weight(
    'balanced',
    classes=np.unique(y_train),
    y=y_train
)
# Trains model.fit(X_train, y_train, sample_weight=weights)

# Option 2: Resampling (oversample minority, undersample majority)
smote = SMOTE(sampling_strategy=0.5)  # Make minority 50% of majority
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

# Option 3: Stratified splitting (ensure both sets have same ratio)
from sklearn.model_selection import StratifiedShuffleSplit
split = StratifiedShuffleSplit(n_splits=1, test_size=0.2)
for train_idx, test_idx in split.split(X, y):
    X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
    y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
    # Both train and test have same class ratio as original

Train/Val/Test Splitting

Standard Split (80/10/10, random):

1
2
3
4
5
from sklearn.model_selection import train_test_split

X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.1)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.111)
# Results: 80% train, 10% val, 10% test

Stratified Split (preserves class distribution):

1
2
3
4
5
6
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, stratify=y_temp
)

Time-Series Split (prevents leakage for temporal data):

1
2
3
4
5
6
7
8
# WRONG: random split mixes past and future
X_train, X_test = train_test_split(X, test_size=0.2)

# RIGHT: temporal boundary
split_point = int(0.8 * len(X))
X_train, X_test = X[:split_point], X[split_point:]
y_train, y_test = y[:split_point], y[split_point:]
# Train on Jan–Oct 2024, test on Nov–Dec 2024

Exploratory Data Analysis (EDA)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import matplotlib.pyplot as plt
import seaborn as sns

# 1. Univariate (feature distributions)
df.describe()  # Mean, std, min, max
df.hist(bins=50, figsize=(12, 8))  # Visual inspection
plt.show()

# 2. Bivariate (feature-target relationship)
sns.boxplot(data=df, x='category', y='target')  # Category vs target
plt.show()

# 3. Correlations
corr_matrix = df.corr()
sns.heatmap(corr_matrix, annot=True, fmt='.2f')  # Heatmap
plt.show()

# 4. Check for multicollinearity
# High correlation between features → redundant, causes instability
vif = pd.DataFrame({
    'feature': X.columns,
    'VIF': [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
})
print(vif)  # VIF > 5 → problematic multicollinearity

Data Versioning & Reproducibility

Problem: “Which data version trained this model?” Often impossible to answer.

Solution: Data versioning tools

1
2
3
4
5
6
7
8
9
10
# DVC (Data Version Control)
# Track data like you track code (git-like interface)
dvc add data/raw/train.csv
dvc add data/processed/features.pkl
dvc commit -m "Updated training data: removed outliers, added new features"
dvc checkout  # Restore old data versions

# Delta Lake (Databricks)
# Time-travel data; rollback to any version
SELECT * FROM training_data VERSION AS OF 12345

Key Properties by Stage

Stage Typical Duration Effort Tools Common Issues
Collection 1–2 weeks 1 FTE SQL, dbt, Python Data unavailable, compliance issues
Cleaning 2–4 weeks 1–2 FTE Pandas, SQL, custom scripts 80% time spent here
Feature Engineering 3–6 weeks 1–2 FTE scikit-learn, custom Python Over-engineered features
Validation & Splits 1 week 0.5 FTE scikit-learn, pandas Data leakage, imbalance

Implementation Example: Full Data Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedShuffleSplit, train_test_split
from imblearn.over_sampling import SMOTE

# Load raw data
df = pd.read_csv('s3://data-lake/user_interactions_2026_q1.csv')
print(f"Initial shape: {df.shape}")

# ===== Stage 1: Assessment =====
print(f"\nMissing values:\n{df.isnull().sum()}")
print(f"\nData types:\n{df.dtypes}")

# ===== Stage 2: Cleaning =====
# Remove exact duplicates
df = df.drop_duplicates()

# Handle missing values
df['user_age'].fillna(df['user_age'].median(), inplace=True)
df['item_category'].fillna('unknown', inplace=True)

# Remove outliers (purchase amount > 3 std dev)
mean = df['purchase_amount'].mean()
std = df['purchase_amount'].std()
df = df[df['purchase_amount'] <= mean + 3*std]

# ===== Stage 3: Feature Engineering =====
# Temporal features
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek

# User aggregations
user_agg = df.groupby('user_id').agg({
    'purchase_amount': ['mean', 'sum', 'count'],
    'timestamp': lambda x: (pd.Timestamp.now() - x.max()).days
}).reset_index()
user_agg.columns = ['user_id', 'user_avg_purchase', 'user_total_spent',
                     'user_order_count', 'user_days_since_purchase']
df = df.merge(user_agg, on='user_id', how='left')

# Item features
item_agg = df.groupby('item_id')['clicked'].mean().reset_index()
item_agg.columns = ['item_id', 'item_ctr']
df = df.merge(item_agg, on='item_id', how='left')

# Scaling
numeric_cols = ['user_age', 'purchase_amount', 'user_avg_purchase']
scaler = StandardScaler()
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])

# Categorical encoding
df = pd.get_dummies(df, columns=['item_category'], drop_first=True)

# ===== Stage 4: Validation & Splits =====
# Stratified split (preserve class distribution)
X = df.drop('clicked', axis=1)
y = df['clicked']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.125, stratify=y_train
)

print(f"\nTrain: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
print(f"Class distribution (train): {y_train.value_counts().to_dict()}")

# Handle imbalance (SMOTE)
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

print(f"\nAfter SMOTE: {X_train_resampled.shape}")
print(f"Class distribution (resampled): {pd.Series(y_train_resampled).value_counts().to_dict()}")

# Ready for model training!
print("\nData pipeline complete. Ready for Stage 4: Model Development")

How Real Companies Use This

Uber’s Data Processing for Michelangelo Platform: Uber processes 5TB of raw feature data daily across 100+ production ML models, each requiring distinct data pipelines. Data sources: Kafka streams (real-time events from millions of riders/drivers), PostgreSQL (user profiles, trip history), Redis (cached features for low-latency serving). Data cleaning pipeline: deduplication (same trip event fires multiple times), missing value imputation (driver location sometimes unavailable), outlier removal (unrealistic surge multipliers). Feature engineering: temporal aggregations (average driver rating past 7 days, 30 days), geospatial features (grid-based location bucketing for traffic patterns), behavioral signals (driver acceptance rate, cancellation rate). Data quality validation: 200+ automated checks per feature (null rate <5%, cardinality bounds, distribution stability). Train/test split: temporal split (train on past 60 days, validate on next 7 days, test on final 7 days) to prevent leakage in time-sensitive predictions. Pipeline time: 4–6 hours from raw event to training-ready dataset, run daily to capture fresh patterns.

Airbnb’s Golden Dataset for Pricing and Search: Airbnb processes 500TB/day of user activity data but recognized that raw user behavior includes biases (users only see items the ranking system shows). Solution: manually create “golden dataset” (10k properties labeled by internal experts) used as ground truth for offline evaluation and to detect data drift. Data processing stages: label extraction (which users interacted with which property), feature engineering (property embeddings from description text via sentence transformers, image quality scores via ResNet, neighborhood context from crime/transit data). Class imbalance handling: listing availability is imbalanced (popular listings overrepresented), solved via stratified sampling. Data quality: expert annotations validated with inter-annotator agreement >0.85 (Cohen’s Kappa). Feature store implementation: computed features cached in DynamoDB, updated nightly, enables sub-100ms serving. Retraining data pipeline: weekly incremental training on fresh booking data, with automated drift detection to catch distribution shifts.

Stripe’s Fraud Detection Data Processing: Stripe processes 5M+ transactions daily for fraud detection, where labeling is adversarial (fraudsters adapt). Data sources: transaction metadata (card, merchant, amount, location), behavioral signals (historical transaction velocity), external data (IP reputation, email verification). Labeling strategy: programmatic (transactions disputed/chargebacked = fraudulent), but ground truth lags 30–90 days. Class imbalance extreme: 99.9% legitimate, 0.1% fraud. Solution: oversampling fraud cases + undersampling legitimate, with class weights during training. Feature engineering: velocity features (transactions in last hour, last day, last month), temporal features (time of day, day of week), amount ratios (transaction amount vs historical average). Data quality: 99.99% completeness required; missing values cause automatic decline (safety-first). Train/test split: temporal split (train on 60 days, test on 7 days) to simulate production drift. Retraining cadence: daily, with immediate retraining triggered if false positive rate exceeds SLA.

Wayfair’s Demand Forecasting Data Pipeline: Wayfair forecasts demand for 10M+ SKUs daily to optimize inventory. Data sources: historical sales, website traffic, search trends, seasonality calendar. Cleaning challenges: returns and cancellations complicate sales labels; solution is to use “fulfilled demand” (quantity kept, not returned). Feature engineering: trend extraction (exponential moving average over 7/30/90 days), seasonality signals (day-of-week, holiday calendars, markdown events), contextual features (competitor prices scraped daily, inventory levels). Data quality: missing values (product not sold on some days) handled via forward-fill + holiday adjustment. Train/test split: temporal split essential for demand forecasting (train on past 2 years, validate on next 3 months, test on final 30 days). Feature pipeline: runs 6 hours daily, generating 200+ features per SKU. Validation: demand distribution checked for stationarity (if sales suddenly 10x, it’s likely data error, not trend).

LinkedIn’s Profile Feature Pipeline: LinkedIn processes member profile features (skills, experience, connections) for ranking and search. Data sources: member-provided (resume, education), inferred (job transition patterns), derived (skill combinations from job titles). Labeling challenges: member-provided data is voluntary and noisy (typos in skills, outdated experience dates). Cleaning: deduplication of similar skills (Python, Python 3, Py unified), fuzzy matching on company names (Google, Alphabet unified), temporal validation (end-date after start-date). Feature engineering: skill frequency (how many members list this skill?), skill trends (hiring demand over time), skill combination graphs (skills commonly paired). Data quality: profile completeness tracked (missing experience = quality concern). Train/test split: user-level split (same member never in train and test) to prevent leakage, since collaborative filtering uses member similarity. Feature staleness: member profiles update asynchronously; features cached for 24 hours. Retraining cadence: weekly, capturing emerging skill trends and career transitions.


References

This post is licensed under CC BY 4.0 by the author.