Post

Sharding & Partitioning

Horizontal data distribution -- split a large dataset across multiple independent database nodes so no single machine becomes a bottleneck, trading consistency complexity for unlimited write scaling.

Sharding & Partitioning

Sharding Strategies — Comparison Table

Strategy Data Movement Range Queries Hot Spots Complexity Best For
Range-Based High (move ranges) ✅ Single shard ❌ Sequential IDs Low Time-series, natural ranges
Hash-Based Very High (N% of data) ❌ All shards ✅ Uniform Low Uniform workload, no ranges
Consistent Hash Low (1/N of data) ❌ All shards ✅ Uniform Medium Dynamic clusters, growth
Directory-Based Flexible (easy reassign) ❌ All shards ✅ Uniform High Complex policies, geo-aware

How Real Systems Use This

DynamoDB — Consistent Hash + Adaptive Partitions

Amazon DynamoDB uses consistent hashing internally via its partition key distribution, but sits on top of a more sophisticated layer: adaptive partitioning. When a table is created, DynamoDB automatically assigns partition keys to virtual partitions using a token-ring architecture similar to Cassandra. Each partition can hold up to 10GB of data and handle 3000 read capacity units (RCU) or 1000 write capacity units (WCU). When a partition exceeds these limits, DynamoDB automatically splits it (with zero downtime) and redistributes data. The partition key hash determines which partition receives a request. For example, a social media table sharded by user_id distributes 1B users across thousands of partitions; DynamoDB transparently balances load. This design handles hot keys (celebrity users with 100k requests/second) by splitting their partition further. The key insight: DynamoDB decouples the logical shard count (managed by the user) from the physical partition count (managed by AWS), allowing infinite scaling without resharding client code.

Cassandra — Token Ring with Virtual Nodes

Cassandra uses a token ring: each node is assigned a range of hash tokens (e.g., node A owns hashes 0–1000000, node B owns 1000001–2000000). When a row is inserted with a partition key, Cassandra hashes the key and finds which token range it falls into, then routes to that node. By default, each physical node owns one token range, creating hot spots during rebalancing. To solve this, Cassandra introduced virtual nodes (vnodes): each physical machine hosts multiple token ranges (default 256 vnodes per node). When a new node joins, it takes ownership of token ranges from all existing nodes proportionally, spreading the resharding work across the entire cluster. For a 10-node cluster adding an 11th node, each of the 10 existing nodes only moves ~1/11 of its data (about 9%) to the new node, compared to 50% with single-token assignment. Cassandra’s replication factor (default 3) means each partition is stored on multiple nodes; the consistency level (ONE, QUORUM, ALL) determines how many nodes must acknowledge writes before returning to the client.

MongoDB — Hash-Based Sharding with mongos Router

MongoDB’s sh.shardCollection() command assigns documents to shards based on a shard key hash. If you choose user_id as the shard key, MongoDB hashes each user_id and maps it to a 32-bit integer, then ranges of these hashes are assigned to shards (called “chunks”). Each chunk covers a range of hash values; by default, chunks are 64MB of data. When a chunk exceeds 64MB, MongoDB’s balancer automatically splits it and moves half to another shard. The mongos router (a stateless proxy) sits in front of all shards and performs routing: for a write request with user_id=42, mongos hashes 42, finds which chunk owns that hash range, and routes to the appropriate shard. Range queries like find({user_id: {$gte: 1000, $lte: 2000}}) scatter to all shards because the hash function destroys ordering. This is why MongoDB recommends shard keys that match access patterns (e.g., if you always filter by user_id, use that as the shard key). During resharding, MongoDB moves entire chunks; each chunk move is transactional and can be retried, making the operation reliable but not zero-downtime.

Instagram — Sharding by User with Hierarchical Naming

Instagram handles ~100M active users with sharding by user_id. Early, they used a database-per-user approach where user_id mod N determined which database server held that user’s data. However, as they scaled beyond one datacenter, they moved to a sharding key service: given a user_id, a lookup service returns the physical shard location (datacenter, cluster, database). This indirection allows them to move users without changing application code. For media (photos), Instagram shards by user_id as well — all photos from user_id=42 live on the same shard as the user’s profile. Cross-shard queries (e.g., “get top 100 photos from users my friends follow”) scatter-gather: the application queries 10+ shards in parallel, aggregates results, and returns to the client. Hot users (celebrities with millions of followers) are replicated: Instagram reads their photos from a read replica on a separate shard to avoid overloading their primary. Writes still go to the primary and are replicated asynchronously.

Vitess — MySQL Sharding Middleware

Vitess, a MySQL query routing and resharding middleware (open-sourced by YouTube), sits between applications and MySQL databases. It exposes multiple keyspaces (logical sharded tables) and handles the physical-to-logical mapping. A keyspace might have 4 shards initially (shard_0, shard_1, shard_2, shard_3), each on a MySQL replica set. When write throughput exceeds capacity, Vitess reshards from 4 to 8 shards: it creates 8 new MySQL replica sets, then uses a streaming copy to move data from the 4 old shards to the 8 new ones. During the copy, writes go to the old shards and are replayed on the new ones. After copy finishes, reads switch to new shards, and finally writes. The entire resharding process, called vReplication, can be monitored and paused/resumed, taking hours for terabyte-scale datasets. Vitess also implements read-after-write consistency: after a write to a shard, the client reads from the master (not a replica) to guarantee consistency, or specifies an acceptable staleness.

Facebook’s Sharding at Scale

Facebook’s earliest data platform (described in their TAO/Memcache papers) sharded all objects (users, posts, comments) by object_id using hash sharding. Each object has a 64-bit ID; the lower bits determine the shard. To handle billions of objects across datacenters, Facebook denormalized heavily: instead of storing references, they stored copies. For example, a post stores its poster’s name directly, avoiding a cross-shard lookup. This reduced cross-shard JOINs dramatically. They also use sharded caching: each cache node is responsible for caching objects in a specific shard range, so a request for an object always goes to the same cache node. This creates cache affinity and reduces cold misses. For global operations (e.g., “count total posts”), Facebook uses MapReduce-style offline jobs, not synchronous queries.

The Hot Spot Problem — Root Causes and Solutions

Root Cause 1: Sequential IDs

Range-based sharding with auto-incrementing IDs writes all new rows to the last shard (highest ID range). The last shard becomes CPU/disk-bound while others remain idle.

Solution: Use hash-based or consistent-hash sharding, or use UUIDs instead of sequential IDs. If you must use range-based sharding, pre-generate shard ranges that align with your actual usage (e.g., partition by user_country first, then by date).

Root Cause 2: Celebrity Users or Viral Content

A single user (or content item) receives disproportionate traffic. With user_id as the shard key, all requests for that celebrity go to one shard.

Solution Options:

  • Replication: Store multiple copies of hot data on different shards; reads scatter randomly, writes go to primary.
  • Key mutation: Instead of sharding by user_id, shard by (user_id, request_hash) where request_hash = hash(timestamp) % K. This splits the celebrity’s requests across K shards, but requires K times more storage. For reads, scatter-gather across all K copies.
  • Separate hot shard: Identify hot keys offline and place them on a dedicated high-capacity shard.

Root Cause 3: Time-Series Data with Recent Bias

All writes go to the current day’s shard (latest time range), while past shards see only reads.

Solution: Partition by time bucket (e.g., day) AND by shard key (e.g., user_id). A row’s key becomes (date, shard_hash(user_id)), distributing writes across multiple time shards.

Cross-Shard Challenges

Challenge: Cross-Shard JOINs

Example: SELECT u.name, COUNT(o.id) FROM users u LEFT JOIN orders o ON u.id = o.user_id GROUP BY u.id

Users and orders are sharded by user_id, so each shard has a user and their orders — the join CAN stay local. However, if orders are sharded by order_id instead, then joining users and orders requires fetching from multiple shards.

Solutions:

  1. Denormalization: Store user name in the orders table. Queries become single-shard.
  2. Scatter-gather: Query all shards for matching rows, join in application memory.
  3. Repartitioning: Ensure join keys align (both tables sharded by user_id).

Challenge: Cross-Shard Transactions

Example: Transfer $100 from user A’s account to user B’s account. Both accounts are on different shards.

Solutions:

  1. Saga Pattern (eventual consistency): Debit A (shard 1), publish event “transfer started”, credit B (shard 2), publish “transfer done”. If shard 2 fails, compensation logic debits B and credits A.
  2. Two-Phase Commit (strong consistency): All shards must agree to commit or abort. High latency (one extra round trip per shard) and prone to failure if a shard is unreachable during commit phase.
  3. Avoid it: Design schemas to avoid cross-shard transactions (e.g., store both accounts on same shard, or use eventual consistency).

Challenge: Global Aggregations (ORDER BY, COUNT)

Example: SELECT * FROM orders ORDER BY created_at DESC LIMIT 100

Orders are sharded by user_id; the created_at column varies per shard, so the global top 100 requires querying all shards.

Solutions:

  1. Merge sort: Query all shards for their top 100 sorted by created_at, merge the results in application, return top 100. Cost scales with number of shards.
  2. Scatter-gather with filtering: Query all shards with a filter (LIMIT 200), merge and sort, return top 100. Heuristic; may need retries if result set is small.
  3. Denormalized view: Maintain a separate non-sharded table of recent orders (using Kafka log, async indexing) for analytics.

Resharding — Mechanics and Trade-offs

When adding shards (scaling out) or removing shards, data must move. The cost is proportional to dataset size and the degree of change.

Example: Resharding from 4 to 8 shards.

With hash-based sharding (hash(key) % N), changing N from 4 to 8 means every single row’s shard assignment changes. hash(key) % 4 produces 0-3; hash(key) % 8 produces 0-7, so ~50% of keys land on different shards. This requires copying the entire dataset.

With consistent hashing, adding a node requires moving only 1/(N+1) of the data. For 4 shards → 5 shards, only 20% of data moves.

Resharding strategies:

  1. Double-write during transition: Write to both old and new shard assignments, then copy remaining data.
  2. Read from old if not in new: Query new shards; if miss, query old shards and backfill into new.
  3. Online resharding (Vitess vReplication): Stream changes from old to new while accepting writes, pause writes to old during final switch.

Key Properties Table

Property Value
Data movement on new shard (consistent hash) ~1/N of dataset
Data movement (hash-based rebalance) ~(N’-N)/N’ of dataset
DynamoDB partition limits 10GB data, 3000 RCU, 1000 WCU per partition
MongoDB chunk size 64MB (auto-split)
Cassandra default vnodes 256 per node
Vitess resharding time (1TB) 1-4 hours (bandwidth-limited)
Cross-shard join cost O(shards x network latency)
Typical resharding frequency Once per 1-2 years (after 5-10x growth)

When to Shard / When to Avoid

Shard When:

  • ✅ Single-node write throughput exceeds capacity (>10k writes/sec)
  • ✅ Data size exceeds single-node storage (>1TB)
  • ✅ Read latency suffers due to contention, even with replicas
  • ✅ Multi-datacenter distribution needed (avoid cross-shard consistency issues)

Don’t Shard (or delay) When:

  • ❌ Data fits on modern hardware (<500GB)
  • ❌ Read throughput is bottleneck (add read replicas instead)
  • ❌ Access patterns are random without clear shard key
  • ❌ Team is small (operational complexity » benefit)
  • ❌ You haven’t exhausted vertical scaling, caching, indexing
  • ❌ Sharding will introduce cross-shard transactions (redesign schema instead)

Scaling Priority Order:

  1. Optimize queries (indexes, EXPLAIN ANALYZE)
  2. Add read replicas (read throughput)
  3. Vertical scale (bigger machine)
  4. Cache layer (Redis, memcache)
  5. Shard (write throughput + data size)

Implementation: Consistent Hash Ring with Virtual Nodes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import hashlib
import bisect

class ConsistentHashRing:
    """
    Consistent hash ring for distributed sharding.
    Maps keys to nodes with minimal data movement on node addition/removal.
    Virtual nodes reduce hot spots and balance load.
    """

    def __init__(self, nodes=None, virtual_nodes=160):
        """
        Initialize the hash ring.

        Args:
            nodes: List of shard identifiers (e.g., ['shard_0', 'shard_1', 'shard_2'])
            virtual_nodes: Number of virtual nodes per physical node (higher = better balance)
        """
        self.virtual_nodes = virtual_nodes
        self.ring = {}  # hash_value -> node_name
        self.sorted_keys = []
        self.nodes = set()

        if nodes:
            for node in nodes:
                self.add_node(node)

    def _hash(self, key):
        """Generate hash position for a key (0 to 2^32-1)."""
        return int(hashlib.md5(str(key).encode()).hexdigest(), 16) % (2 ** 32)

    def add_node(self, node):
        """
        Add a node to the ring.
        Creates virtual_nodes virtual copies distributed around the ring.
        """
        if node in self.nodes:
            return
        self.nodes.add(node)

        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:{i}"
            hash_pos = self._hash(virtual_key)
            self.ring[hash_pos] = node
            bisect.insort(self.sorted_keys, hash_pos)

    def remove_node(self, node):
        """Remove a node from the ring (and all its virtual nodes)."""
        if node not in self.nodes:
            return
        self.nodes.discard(node)

        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:{i}"
            hash_pos = self._hash(virtual_key)
            del self.ring[hash_pos]
            self.sorted_keys.remove(hash_pos)

    def get_node(self, key):
        """
        Find the node responsible for a key.
        Returns the first node in the ring whose hash >= key's hash (clockwise).
        """
        if not self.ring:
            return None

        hash_pos = self._hash(key)

        # Binary search for the first hash_pos >= this key's hash
        idx = bisect.bisect_left(self.sorted_keys, hash_pos)

        # If no node is >= hash_pos, wrap around to the first node
        if idx == len(self.sorted_keys):
            idx = 0

        return self.ring[self.sorted_keys[idx]]

    def get_nodes(self, key, num_replicas=3):
        """
        Get multiple nodes for a key (for replication).
        Returns num_replicas distinct nodes in ring order.
        """
        if not self.ring or num_replicas <= 0:
            return []

        nodes = []
        hash_pos = self._hash(key)
        idx = bisect.bisect_left(self.sorted_keys, hash_pos)

        if idx == len(self.sorted_keys):
            idx = 0

        seen_nodes = set()
        attempts = 0
        max_attempts = len(self.sorted_keys)

        while len(nodes) < num_replicas and attempts < max_attempts:
            node = self.ring[self.sorted_keys[idx]]
            if node not in seen_nodes:
                nodes.append(node)
                seen_nodes.add(node)
            idx = (idx + 1) % len(self.sorted_keys)
            attempts += 1

        return nodes


# Example usage demonstrating even distribution and minimal movement
if __name__ == "__main__":
    # Initialize ring with 4 shards
    ring = ConsistentHashRing(
        nodes=['shard_0', 'shard_1', 'shard_2', 'shard_3'],
        virtual_nodes=160
    )

    # Distribute 1000 keys across shards
    shard_counts = {node: 0 for node in ring.nodes}
    for i in range(1000):
        node = ring.get_node(f"user_{i}")
        shard_counts[node] += 1

    print("Distribution with 4 shards:")
    for shard, count in sorted(shard_counts.items()):
        print(f"  {shard}: {count} keys (~{count/10:.1f}%)")

    # Add a 5th shard (scale out)
    ring.add_node('shard_4')

    # Recount keys
    shard_counts = {node: 0 for node in ring.nodes}
    moved_keys = 0
    for i in range(1000):
        node = ring.get_node(f"user_{i}")
        shard_counts[node] += 1

    print("\nDistribution after adding shard_4:")
    for shard, count in sorted(shard_counts.items()):
        print(f"  {shard}: {count} keys (~{count/10:.1f}%)")

    print(f"\nExpected keys to move: ~200 (20% of 1000)")
    print(f"Uniform distribution across 5 shards: ~200 per shard")

    # Demonstrate replication (get 3 replicas for a key)
    print(f"\nReplica placement for user_42:")
    replicas = ring.get_nodes("user_42", num_replicas=3)
    print(f"  Replicas: {replicas}")

References

This post is licensed under CC BY 4.0 by the author.