Post

Kafka Deep Dive

Kafka's power comes from partitioned log structure enabling both ordering per partition and massive parallelism; managing consumer offsets is the key complexity.

Kafka Deep Dive

Kafka’s power comes from partitioned log structure enabling both ordering per partition and massive parallelism; managing consumer offsets is the key complexity.

Key Properties

Property Value / Behavior Notes
Ordering guarantee Per partition FIFO Not global; enables parallelism
Retention Configurable (default 7 days) By time or size (log.retention.ms, log.retention.bytes)
Replication min.insync.replicas (default 1) Set to 2+ for durability (sacrifices latency)
Throughput 1M+ msgs/sec per broker LinkedIn: trillions/day
Latency (at-least-once) 10-100ms end-to-end 50-200ms with exactly-once
Consumer lag Per partition offset tracking Lag = log-end-offset - consumer-offset
Rebalancing time 10-60 seconds Pause during rebalancing

Producer Configuration Deep Dive

Config Default Impact Production Setting
acks 1 Durability vs latency tradeoff -1 (all replicas) for critical; 1 for speed
batch.size 16KB Batching (throughput) 32KB-1MB depending on message size
linger.ms 0 Wait before sending batch 10-100ms (more batching = higher throughput)
compression.type none CPU vs network snappy (balanced) or lz4 (fast)
retries INT_MAX Retry transient failures Leave at max; Kafka handles intelligently
enable.idempotence false Exactly-once by producer true (essential for idempotent writes)
buffer.memory 32MB Total buffer for producer 64-256MB for high-throughput

Consumer Configuration Deep Dive

Config Default Impact Production Setting
group.id required Consumer group for offset tracking Use domain name: “recommendations-svc”
max.poll.records 500 Batch fetch size 500-1000 (balance memory vs latency)
session.timeout.ms 3000 Heartbeat interval 3000-6000ms (longer = fewer false positives)
max.poll.interval.ms 300000 (5 min) Time to process batch Increase if processing is slow (10+ min)
isolation.level read_uncommitted Consistency read_committed (skip uncommitted txns)
auto.offset.reset latest Missing offset strategy earliest (replay from start); latest (skip old)
enable.auto.commit true Auto-commit offset false (manual commit after processing)

When to Use / Avoid

Kafka ✅

  • High-throughput event streaming (millions/sec)
  • Long-term retention & replay needed (audit, debugging)
  • Partitioning by key for ordering per entity (user, account, order)
  • Multiple independent consumer groups
  • Event-driven architecture (multiple services consume same events)
  • Temporal analysis (query events from specific time period)
  • Exactly-once semantics required

Kafka ❌

  • Simple point-to-point work queue (use SQS or RabbitMQ)
  • Operational simplicity critical (Kafka is complex to operate)
  • Sub-millisecond latency required (distributed log adds latency)
  • Small event volume (under 10K msg/sec; overkill)
  • Cost-sensitive (Kafka infrastructure is expensive)
  • All messages have same priority (no priority queue support)

Exactly-Once Semantics ✅

  • Financial transactions, payments, billing
  • Inventory changes (can’t double-count stock)
  • State that must remain consistent
  • Audit trails for compliance (SOX, HIPAA)

At-Least-Once Sufficient ❌ (use it instead)

  • Metrics and monitoring (occasional duplicates acceptable)
  • Recommendations (small bias from duplicates OK)
  • Analytics (0.1% accuracy loss acceptable)
  • Logging (duplicates don’t affect analysis)

Real Systems

System Scale Pattern Key Tuning
LinkedIn (origin) Trillions/day, 1000s partitions Event backbone min.insync.replicas=2, 7-day retention
Netflix Multi-region, thousands of topics Real-time analytics, recommendations Exactly-once for billing, at-least-once for metrics
Uber 1M+ events/sec per region Ride state, surge pricing, ETA Low latency (optimize linger.ms, batch.size)
Stripe Payment processing backbone Charge, refund, dispute events Exactly-once + idempotence, transactions
Shopify Order processing, inventory Distributed transactions Event sourcing with Kafka, projections

How Real Systems Use This

LinkedIn (Event Streaming Backbone)

LinkedIn built Kafka to handle the scale of user activity events (profile updates, messaging, feed interactions) and has since made it the backbone of all event streaming. They ingest 1 trillion messages per day across 1000+ clusters worldwide. Each user’s events are partitioned by user ID, ensuring all changes to a user are sequential (critical for recommendation freshness). LinkedIn’s strategy: at-least-once semantics at massive scale, with idempotent consumers to handle duplicates. They tune producers with linger.ms=10 and batch.size=1MB to maximize throughput (important because cost per message matters at this scale). Consumer lag is monitored via custom dashboards; when lag exceeds 10K messages, they auto-scale consumers.

Netflix (Real-Time Personalization)

Netflix uses Kafka for real-time event streaming of user interactions (play, pause, search, rating). Each user interaction is an event published to a topic partitioned by user ID. The recommendations engine consumes these events at low latency (< 100ms) to update personalization scores. Netflix configured isolation.level=read_committed to ensure they never process partially committed transactions. They use Kafka Streams for windowed aggregations (e.g., “what did this user watch in last hour?”) and maintain exactly-once semantics via idempotent writes. Critical config: max.poll.interval.ms=30000 (30s) because processing 1000 events can take 5-10 seconds. Lag exceeding 5 seconds triggers alerts.

Uber (Distributed Transactions)

Uber uses Kafka for ride state transitions (ride requested, matched, driver arrived, trip ended, payment collected). These events must be processed in order per ride and with exactly-once semantics (can’t double-charge). Uber partitions by ride ID and enables enable.idempotence=true plus Kafka transactions (isolation.level=read_committed). The consumer logic: consume event, apply to database, commit offset atomically (via transactions). Producer settings: acks=-1 (all replicas) and min.insync.replicas=2 for durability (latency cost justified by financial stakes). Consumers configured with session.timeout.ms=6000 to tolerate brief network hiccups without rebalancing.

Stripe (Payment Processing)

Stripe models payments as immutable events in Kafka with exactly-once semantics. Each payment attempt, charge, refund, and dispute is an event. Stripe’s consumer is idempotent: a charge event with ID ch_123 always produces the same database state, even if processed twice. Configuration: enable.idempotence=true on producer, isolation.level=read_committed on consumer, min.insync.replicas=2, and explicit transaction handling. They replicate across regions for disaster recovery and retain events for 7 years (regulatory requirement). Consumer lag threshold: 10 messages triggers alerting (payment processing must be near real-time).

Shopify (Event Sourcing with Projections)

Shopify uses Kafka for event sourcing of orders: each state change (created, payment_authorized, shipped, delivered, refunded) is immutable event in Kafka. Multiple consumer groups consume the same topic to build different projections: order service rebuilds order state, accounting system builds revenue ledger, analytics builds sales dashboard. Topics partitioned by order ID for ordering. Shopify runs snapshot jobs daily (store order state at 2024-01-01 EOD) to speed up state reconstruction. Consumer configuration: auto.offset.reset=earliest for new consumers (replay from start), max.poll.records=100 (large batches, each order can have 20+ events). Rebalancing carefully managed because it pauses consumption and impacts order processing latency.

Implementation: Producer with Idempotence

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import uuid
from typing import Dict, Callable, Optional

class IdempotentProducer:
    """
    Kafka producer that guarantees exactly-once semantics through idempotence.

    Configuration:
    - enable.idempotence=true: Kafka deduplicates by producer ID + sequence
    - acks=-1: Wait for all in-sync replicas
    - retries=MAX: Retry transient failures indefinitely
    """

    def __init__(self, bootstrap_servers: list, topic: str):
        """Initialize Kafka producer with exactly-once config."""
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            # Idempotence: producer ID + sequence number prevents duplicates
            enable_idempotence=True,
            # Durability: wait for all replicas
            acks='all',
            # Retry transient failures (e.g., rebalancing, brief outages)
            retries=2147483647,  # MAX_INT, retry forever
            # Batching for throughput
            batch_size=32768,  # 32KB
            linger_ms=10,  # Wait 10ms to batch messages
            compression_type='snappy',
            # Transactional writes (needed for exactly-once with consumer)
            transactional_id=f"producer-{uuid.uuid4()}",
        )
        self.topic = topic

    def send_event(self, event: Dict, key: Optional[str] = None,
                   callback: Optional[Callable] = None) -> str:
        """
        Send event idempotently.

        Args:
            event: Dict to send (will be JSON serialized)
            key: Partition key (e.g., user_id for ordering by user)
            callback: Optional callback(metadata) on success/failure

        Returns:
            Message ID for tracking

        Raises:
            KafkaError: If unrecoverable failure
        """
        message_id = event.get('message_id', str(uuid.uuid4()))
        event['message_id'] = message_id

        future = self.producer.send(
            self.topic,
            value=event,
            key=key.encode('utf-8') if key else None
        )

        # Wait for send to complete with timeout
        try:
            metadata = future.get(timeout=10)
            print(f"Sent {message_id} to partition {metadata.partition} offset {metadata.offset}")
            if callback:
                callback(metadata)
            return message_id
        except KafkaError as e:
            print(f"Failed to send {message_id}: {e}")
            raise

    def begin_transaction(self):
        """Begin transactional writes (for atomicity with consumer)."""
        self.producer.begin_transaction()

    def commit_transaction(self):
        """Commit transactional writes."""
        self.producer.commit_transaction()

    def abort_transaction(self):
        """Abort transactional writes."""
        self.producer.abort_transaction()

    def flush(self):
        """Flush all pending messages."""
        self.producer.flush()

    def close(self):
        """Close producer and wait for pending sends."""
        self.producer.close()

# Example usage: send payment events
if __name__ == "__main__":
    producer = IdempotentProducer(
        bootstrap_servers=['localhost:9092'],
        topic='payments'
    )

    try:
        # Send charge event (partitioned by user_id for ordering)
        charge_event = {
            'event_type': 'charge_created',
            'user_id': 'user_789',
            'amount': 9999,  # $99.99 in cents
            'currency': 'USD'
        }
        message_id = producer.send_event(
            event=charge_event,
            key='user_789',  # Partition by user for ordering
            callback=lambda meta: print(f"Stored at offset {meta.offset}")
        )

        # Send duplicate intentionally; Kafka deduplicates via idempotence
        for i in range(2):
            producer.send_event(charge_event, key='user_789')
            # Only first succeeds; duplicates are deduplicated by Kafka

        producer.flush()
        print("All events sent with idempotence guarantee")

    finally:
        producer.close()

Implementation: Consumer with Exactly-Once

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import time

class ExactlyOnceConsumer:
    """
    Kafka consumer that processes events exactly-once by:
    1. Reading committed offsets (skips uncommitted messages)
    2. Processing message
    3. Committing offset atomically

    Configuration:
    - isolation.level=read_committed: Skip uncommitted transactions
    - enable.auto.commit=false: Manual offset commits after processing
    """

    def __init__(self, bootstrap_servers: list, group_id: str,
                 topics: list, handler: callable):
        """
        Initialize consumer.

        Args:
            bootstrap_servers: Kafka brokers
            group_id: Consumer group for offset tracking
            topics: Topics to subscribe to
            handler: Function(event) to process each message
        """
        self.group_id = group_id
        self.handler = handler
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            # Exactly-once: skip uncommitted messages from other producers
            isolation_level='read_committed',
            # Manual commits: control when offset is saved
            enable_auto_commit=False,
            # Fetch batch size
            max_poll_records=100,
            # Heartbeat interval (detect failures)
            session_timeout_ms=6000,
            # Time limit for processing batch
            max_poll_interval_ms=300000,  # 5 minutes
            # Resume strategy
            auto_offset_reset='earliest',  # Replay from start for new group
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        )

    def run(self):
        """Main consumer loop with exactly-once processing."""
        try:
            while True:
                # Fetch batch of messages (up to max_poll_records)
                messages = self.consumer.poll(timeout_ms=1000)

                if not messages:
                    continue

                for topic_partition, records in messages.items():
                    for record in records:
                        try:
                            # Process message
                            self.handler(record.value)

                            # Commit offset AFTER successful processing
                            # If process fails, message is redelivered on restart
                            self.consumer.commit(offsets={
                                topic_partition: record.offset + 1
                            })
                            print(f"Processed {record.key} offset {record.offset}")

                        except Exception as e:
                            print(f"Processing failed for offset {record.offset}: {e}")
                            # Don't commit; message will be redelivered
                            # Optionally send to DLQ after N retries
                            raise

        except KeyboardInterrupt:
            print("Shutting down consumer")
        finally:
            self.consumer.close()

# Example: payment processor
def process_payment(event: dict):
    """
    Process payment event idempotently.
    Guaranteed to be called exactly-once per event.
    """
    charge_id = event['message_id']
    user_id = event['user_id']
    amount = event['amount']

    # This is idempotent: if called twice, DB unique constraint prevents duplicate charge
    # INSERT INTO charges (id, user_id, amount) VALUES (charge_id, user_id, amount)
    # ON DUPLICATE KEY UPDATE ... (no-op)
    print(f"Processing charge {charge_id} for user {user_id}: ${amount/100:.2f}")

if __name__ == "__main__":
    consumer = ExactlyOnceConsumer(
        bootstrap_servers=['localhost:9092'],
        group_id='payment-processor',
        topics=['payments'],
        handler=process_payment
    )
    consumer.run()

References

This post is licensed under CC BY 4.0 by the author.