Kafka Deep Dive
Kafka's power comes from partitioned log structure enabling both ordering per partition and massive parallelism; managing consumer offsets is the key complexity.
Kafka’s power comes from partitioned log structure enabling both ordering per partition and massive parallelism; managing consumer offsets is the key complexity.
Key Properties
| Property | Value / Behavior | Notes |
|---|---|---|
| Ordering guarantee | Per partition FIFO | Not global; enables parallelism |
| Retention | Configurable (default 7 days) | By time or size (log.retention.ms, log.retention.bytes) |
| Replication | min.insync.replicas (default 1) | Set to 2+ for durability (sacrifices latency) |
| Throughput | 1M+ msgs/sec per broker | LinkedIn: trillions/day |
| Latency (at-least-once) | 10-100ms end-to-end | 50-200ms with exactly-once |
| Consumer lag | Per partition offset tracking | Lag = log-end-offset - consumer-offset |
| Rebalancing time | 10-60 seconds | Pause during rebalancing |
Producer Configuration Deep Dive
| Config | Default | Impact | Production Setting |
|---|---|---|---|
| acks | 1 | Durability vs latency tradeoff | -1 (all replicas) for critical; 1 for speed |
| batch.size | 16KB | Batching (throughput) | 32KB-1MB depending on message size |
| linger.ms | 0 | Wait before sending batch | 10-100ms (more batching = higher throughput) |
| compression.type | none | CPU vs network | snappy (balanced) or lz4 (fast) |
| retries | INT_MAX | Retry transient failures | Leave at max; Kafka handles intelligently |
| enable.idempotence | false | Exactly-once by producer | true (essential for idempotent writes) |
| buffer.memory | 32MB | Total buffer for producer | 64-256MB for high-throughput |
Consumer Configuration Deep Dive
| Config | Default | Impact | Production Setting |
|---|---|---|---|
| group.id | required | Consumer group for offset tracking | Use domain name: “recommendations-svc” |
| max.poll.records | 500 | Batch fetch size | 500-1000 (balance memory vs latency) |
| session.timeout.ms | 3000 | Heartbeat interval | 3000-6000ms (longer = fewer false positives) |
| max.poll.interval.ms | 300000 (5 min) | Time to process batch | Increase if processing is slow (10+ min) |
| isolation.level | read_uncommitted | Consistency | read_committed (skip uncommitted txns) |
| auto.offset.reset | latest | Missing offset strategy | earliest (replay from start); latest (skip old) |
| enable.auto.commit | true | Auto-commit offset | false (manual commit after processing) |
When to Use / Avoid
Kafka ✅
- High-throughput event streaming (millions/sec)
- Long-term retention & replay needed (audit, debugging)
- Partitioning by key for ordering per entity (user, account, order)
- Multiple independent consumer groups
- Event-driven architecture (multiple services consume same events)
- Temporal analysis (query events from specific time period)
- Exactly-once semantics required
Kafka ❌
- Simple point-to-point work queue (use SQS or RabbitMQ)
- Operational simplicity critical (Kafka is complex to operate)
- Sub-millisecond latency required (distributed log adds latency)
- Small event volume (under 10K msg/sec; overkill)
- Cost-sensitive (Kafka infrastructure is expensive)
- All messages have same priority (no priority queue support)
Exactly-Once Semantics ✅
- Financial transactions, payments, billing
- Inventory changes (can’t double-count stock)
- State that must remain consistent
- Audit trails for compliance (SOX, HIPAA)
At-Least-Once Sufficient ❌ (use it instead)
- Metrics and monitoring (occasional duplicates acceptable)
- Recommendations (small bias from duplicates OK)
- Analytics (0.1% accuracy loss acceptable)
- Logging (duplicates don’t affect analysis)
Real Systems
| System | Scale | Pattern | Key Tuning |
|---|---|---|---|
| LinkedIn (origin) | Trillions/day, 1000s partitions | Event backbone | min.insync.replicas=2, 7-day retention |
| Netflix | Multi-region, thousands of topics | Real-time analytics, recommendations | Exactly-once for billing, at-least-once for metrics |
| Uber | 1M+ events/sec per region | Ride state, surge pricing, ETA | Low latency (optimize linger.ms, batch.size) |
| Stripe | Payment processing backbone | Charge, refund, dispute events | Exactly-once + idempotence, transactions |
| Shopify | Order processing, inventory | Distributed transactions | Event sourcing with Kafka, projections |
How Real Systems Use This
LinkedIn (Event Streaming Backbone)
LinkedIn built Kafka to handle the scale of user activity events (profile updates, messaging, feed interactions) and has since made it the backbone of all event streaming. They ingest 1 trillion messages per day across 1000+ clusters worldwide. Each user’s events are partitioned by user ID, ensuring all changes to a user are sequential (critical for recommendation freshness). LinkedIn’s strategy: at-least-once semantics at massive scale, with idempotent consumers to handle duplicates. They tune producers with linger.ms=10 and batch.size=1MB to maximize throughput (important because cost per message matters at this scale). Consumer lag is monitored via custom dashboards; when lag exceeds 10K messages, they auto-scale consumers.
Netflix (Real-Time Personalization)
Netflix uses Kafka for real-time event streaming of user interactions (play, pause, search, rating). Each user interaction is an event published to a topic partitioned by user ID. The recommendations engine consumes these events at low latency (< 100ms) to update personalization scores. Netflix configured isolation.level=read_committed to ensure they never process partially committed transactions. They use Kafka Streams for windowed aggregations (e.g., “what did this user watch in last hour?”) and maintain exactly-once semantics via idempotent writes. Critical config: max.poll.interval.ms=30000 (30s) because processing 1000 events can take 5-10 seconds. Lag exceeding 5 seconds triggers alerts.
Uber (Distributed Transactions)
Uber uses Kafka for ride state transitions (ride requested, matched, driver arrived, trip ended, payment collected). These events must be processed in order per ride and with exactly-once semantics (can’t double-charge). Uber partitions by ride ID and enables enable.idempotence=true plus Kafka transactions (isolation.level=read_committed). The consumer logic: consume event, apply to database, commit offset atomically (via transactions). Producer settings: acks=-1 (all replicas) and min.insync.replicas=2 for durability (latency cost justified by financial stakes). Consumers configured with session.timeout.ms=6000 to tolerate brief network hiccups without rebalancing.
Stripe (Payment Processing)
Stripe models payments as immutable events in Kafka with exactly-once semantics. Each payment attempt, charge, refund, and dispute is an event. Stripe’s consumer is idempotent: a charge event with ID ch_123 always produces the same database state, even if processed twice. Configuration: enable.idempotence=true on producer, isolation.level=read_committed on consumer, min.insync.replicas=2, and explicit transaction handling. They replicate across regions for disaster recovery and retain events for 7 years (regulatory requirement). Consumer lag threshold: 10 messages triggers alerting (payment processing must be near real-time).
Shopify (Event Sourcing with Projections)
Shopify uses Kafka for event sourcing of orders: each state change (created, payment_authorized, shipped, delivered, refunded) is immutable event in Kafka. Multiple consumer groups consume the same topic to build different projections: order service rebuilds order state, accounting system builds revenue ledger, analytics builds sales dashboard. Topics partitioned by order ID for ordering. Shopify runs snapshot jobs daily (store order state at 2024-01-01 EOD) to speed up state reconstruction. Consumer configuration: auto.offset.reset=earliest for new consumers (replay from start), max.poll.records=100 (large batches, each order can have 20+ events). Rebalancing carefully managed because it pauses consumption and impacts order processing latency.
Implementation: Producer with Idempotence
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import uuid
from typing import Dict, Callable, Optional
class IdempotentProducer:
"""
Kafka producer that guarantees exactly-once semantics through idempotence.
Configuration:
- enable.idempotence=true: Kafka deduplicates by producer ID + sequence
- acks=-1: Wait for all in-sync replicas
- retries=MAX: Retry transient failures indefinitely
"""
def __init__(self, bootstrap_servers: list, topic: str):
"""Initialize Kafka producer with exactly-once config."""
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# Idempotence: producer ID + sequence number prevents duplicates
enable_idempotence=True,
# Durability: wait for all replicas
acks='all',
# Retry transient failures (e.g., rebalancing, brief outages)
retries=2147483647, # MAX_INT, retry forever
# Batching for throughput
batch_size=32768, # 32KB
linger_ms=10, # Wait 10ms to batch messages
compression_type='snappy',
# Transactional writes (needed for exactly-once with consumer)
transactional_id=f"producer-{uuid.uuid4()}",
)
self.topic = topic
def send_event(self, event: Dict, key: Optional[str] = None,
callback: Optional[Callable] = None) -> str:
"""
Send event idempotently.
Args:
event: Dict to send (will be JSON serialized)
key: Partition key (e.g., user_id for ordering by user)
callback: Optional callback(metadata) on success/failure
Returns:
Message ID for tracking
Raises:
KafkaError: If unrecoverable failure
"""
message_id = event.get('message_id', str(uuid.uuid4()))
event['message_id'] = message_id
future = self.producer.send(
self.topic,
value=event,
key=key.encode('utf-8') if key else None
)
# Wait for send to complete with timeout
try:
metadata = future.get(timeout=10)
print(f"Sent {message_id} to partition {metadata.partition} offset {metadata.offset}")
if callback:
callback(metadata)
return message_id
except KafkaError as e:
print(f"Failed to send {message_id}: {e}")
raise
def begin_transaction(self):
"""Begin transactional writes (for atomicity with consumer)."""
self.producer.begin_transaction()
def commit_transaction(self):
"""Commit transactional writes."""
self.producer.commit_transaction()
def abort_transaction(self):
"""Abort transactional writes."""
self.producer.abort_transaction()
def flush(self):
"""Flush all pending messages."""
self.producer.flush()
def close(self):
"""Close producer and wait for pending sends."""
self.producer.close()
# Example usage: send payment events
if __name__ == "__main__":
producer = IdempotentProducer(
bootstrap_servers=['localhost:9092'],
topic='payments'
)
try:
# Send charge event (partitioned by user_id for ordering)
charge_event = {
'event_type': 'charge_created',
'user_id': 'user_789',
'amount': 9999, # $99.99 in cents
'currency': 'USD'
}
message_id = producer.send_event(
event=charge_event,
key='user_789', # Partition by user for ordering
callback=lambda meta: print(f"Stored at offset {meta.offset}")
)
# Send duplicate intentionally; Kafka deduplicates via idempotence
for i in range(2):
producer.send_event(charge_event, key='user_789')
# Only first succeeds; duplicates are deduplicated by Kafka
producer.flush()
print("All events sent with idempotence guarantee")
finally:
producer.close()
Implementation: Consumer with Exactly-Once
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import time
class ExactlyOnceConsumer:
"""
Kafka consumer that processes events exactly-once by:
1. Reading committed offsets (skips uncommitted messages)
2. Processing message
3. Committing offset atomically
Configuration:
- isolation.level=read_committed: Skip uncommitted transactions
- enable.auto.commit=false: Manual offset commits after processing
"""
def __init__(self, bootstrap_servers: list, group_id: str,
topics: list, handler: callable):
"""
Initialize consumer.
Args:
bootstrap_servers: Kafka brokers
group_id: Consumer group for offset tracking
topics: Topics to subscribe to
handler: Function(event) to process each message
"""
self.group_id = group_id
self.handler = handler
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
# Exactly-once: skip uncommitted messages from other producers
isolation_level='read_committed',
# Manual commits: control when offset is saved
enable_auto_commit=False,
# Fetch batch size
max_poll_records=100,
# Heartbeat interval (detect failures)
session_timeout_ms=6000,
# Time limit for processing batch
max_poll_interval_ms=300000, # 5 minutes
# Resume strategy
auto_offset_reset='earliest', # Replay from start for new group
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
def run(self):
"""Main consumer loop with exactly-once processing."""
try:
while True:
# Fetch batch of messages (up to max_poll_records)
messages = self.consumer.poll(timeout_ms=1000)
if not messages:
continue
for topic_partition, records in messages.items():
for record in records:
try:
# Process message
self.handler(record.value)
# Commit offset AFTER successful processing
# If process fails, message is redelivered on restart
self.consumer.commit(offsets={
topic_partition: record.offset + 1
})
print(f"Processed {record.key} offset {record.offset}")
except Exception as e:
print(f"Processing failed for offset {record.offset}: {e}")
# Don't commit; message will be redelivered
# Optionally send to DLQ after N retries
raise
except KeyboardInterrupt:
print("Shutting down consumer")
finally:
self.consumer.close()
# Example: payment processor
def process_payment(event: dict):
"""
Process payment event idempotently.
Guaranteed to be called exactly-once per event.
"""
charge_id = event['message_id']
user_id = event['user_id']
amount = event['amount']
# This is idempotent: if called twice, DB unique constraint prevents duplicate charge
# INSERT INTO charges (id, user_id, amount) VALUES (charge_id, user_id, amount)
# ON DUPLICATE KEY UPDATE ... (no-op)
print(f"Processing charge {charge_id} for user {user_id}: ${amount/100:.2f}")
if __name__ == "__main__":
consumer = ExactlyOnceConsumer(
bootstrap_servers=['localhost:9092'],
group_id='payment-processor',
topics=['payments'],
handler=process_payment
)
consumer.run()
References
- Kafka: A Distributed Messaging System for Log Processing – Kreps, Narkhede, Rao (2011) – Original LinkedIn paper introducing Kafka architecture.
- ByteByteGo – Kafka Architecture & Internals – Visual walkthrough of brokers, partitions, and consumer groups.
- Apache Kafka Documentation – Exactly-Once Semantics – Official guide to idempotence and transactions.
- Designing Data-Intensive Applications – Chapter 11 “Stream Processing” – Kleppmann – Treatment of log structure, ordering, and fault tolerance.
- Confluent Blog – Exactly-Once Semantics in Apache Kafka – Production patterns and trade-offs.
- Kafka in Action – Chapter 3 “Brokers” – O’Reilly – Deep dive on replication, ISR, and compaction.