Post

Connection Pooling & Query Optimization

Two fundamental performance levers -- pooling eliminates network/auth overhead per request, and optimization eliminates redundant database queries. Master both to build high-throughput applications.

Connection Pooling & Query Optimization

Two fundamental performance levers — pooling eliminates network/auth overhead per request, and optimization eliminates redundant database queries. Master both to build high-throughput applications.

Connection Pooling — Why It Matters

Every new database connection incurs hidden costs:

Cost Latency Description
TCP handshake 1-5ms Network round trips to establish socket
Authentication 2-10ms Database server verifies credentials, assigns session ID
State setup 1-5ms Initialize session variables, charset, timezone, search_path
Total overhead 5-50ms Per new connection

A typical web request that executes a 1ms database query incurs 5-50ms of overhead if it opens a new connection — the query takes 2% of the time, overhead takes 98%.

Solution: Reuse connections. A pool maintains N persistent connections to the database. Applications borrow from the pool (microseconds), execute queries, return the connection. Overhead is amortized across many requests.

Connection Pool Sizing Formula

The formula widely adopted (HikariCP, Hikari’s Bret Copeland):

1
Pool size = Cores × (1 + Wait Time / Service Time)

Explanation:

  • Cores: Physical CPU cores on the application server (e.g., 8 cores)
  • Service Time: How long a database request takes (e.g., 10ms)
  • Wait Time: How long the request waits for I/O (e.g., 90ms for network + disk)
    • Wait Time = Network latency + Database execution + Lock contention
    • Service Time = Time CPU spends processing (usually minimal for DB)
    • Ratio: 90/10 = 9

Example:

  • 8 cores, 10ms service time, 90ms wait time
  • Pool size = 8 × (1 + 90/10) = 8 × 10 = 80 connections

HikariCP default formula:

1
pool_size = cpu_cores × 2 + disk_spindles
  • 8 cores + 1 SSD = 8 × 2 + 1 = 17 connections
  • Conservative; works well for most applications

Key principle: Too small a pool → requests queue, high latency. Too large → database server overwhelmed by connection management, memory usage explodes.

Pool Size Problem
5 Too small for 8 cores; requests queue
20 Good for most workloads
100 Too large; database starved
500+ Connection limit hit (PostgreSQL default 100)

Pooling Modes — Different Strategies

Transaction Pooling (PgBouncer default)

Connections are returned to the pool after each transaction (COMMIT/ROLLBACK), not after each query. Multiple clients can use the same physical connection sequentially.

Pros:

  • Multiplexes 10,000 clients onto 100 database connections
  • Minimal database resource usage
  • Perfect for stateless HTTP requests

Cons:

  • Cannot use connection-scoped state (prepared statements, temporary tables)
  • Connection variables reset between transactions (beware SET search_path)

Use case: Web applications, stateless microservices

Session Pooling

Connections are bound to a session (client). One client = one physical connection, held for the lifetime of the session.

Pros:

  • Connection state is preserved (temporary tables, session variables)
  • Prepared statements work correctly

Cons:

  • 1:1 mapping of clients to connections
  • Limited multiplexing
  • Fewer clients supported per connection pool

Use case: Long-lived connections, stateful applications

Statement Pooling

Connections are returned after each statement (SELECT, INSERT, UPDATE).

Pros:

  • Maximum multiplexing (every statement can use a different connection)

Cons:

  • Impossible to use transactions
  • High overhead if statements are frequent
  • Rarely used

The N+1 Query Problem — Root Cause and All Fix Patterns

Root Cause

Fetching a collection of objects, then for each object, fetching related data.

1
2
3
4
5
6
# Pseudocode: N+1 queries
users = db.query("SELECT * FROM users LIMIT 100")  # Query 1
for user in users:
    orders = db.query(f"SELECT * FROM orders WHERE user_id = {user.id}")  # Queries 2-101
    user.orders = orders
# Total: 101 queries instead of 1

Impact: For 100 users, 100x database round trips. With 5ms latency per query, 100 queries = 500ms. One good query = 10ms.

Fix Pattern 1: JOIN (SQL-side)

Execute a single query with JOIN:

1
2
3
4
SELECT u.id, u.name, o.id, o.amount
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
LIMIT 100;

Pros:

  • Single database round trip
  • SQL optimizations apply
  • Minimal memory overhead

Cons:

  • Result set duplication (user data repeated for each order)
  • Complex queries can become hard to optimize
  • Not applicable if data comes from different databases (sharded)

When to use: Single database, same table set, predictable joins.

Fix Pattern 2: Batch Query (WHERE IN)

Fetch all users, then fetch all their orders in one batch query:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
users = db.query("SELECT * FROM users LIMIT 100")  # Query 1
user_ids = [u.id for u in users]
orders = db.query(f"SELECT * FROM orders WHERE user_id IN ({user_ids})")  # Query 2

# Map orders back to users: build a dictionary for O(1) lookup
orders_by_user_id = {}
for order in orders:
    if order.user_id not in orders_by_user_id:
        orders_by_user_id[order.user_id] = []
    orders_by_user_id[order.user_id].append(order)

# Now assign to each user in a single pass
for user in users:
    user.orders = orders_by_user_id.get(user.id, [])

# Total: 2 queries

Why the dictionary approach: Mapping orders back to users with a simple list comprehension (O(n²)) is inefficient. Building a dict first (O(n)) then doing O(1) lookups is much faster.

Pros:

  • Works with sharded data (batch queries to each shard)
  • Scales well (2-3 queries regardless of collection size)
  • No JOIN complexity

Cons:

  • O(N) memory overhead for building IN clause
  • Application-side filtering (slower than SQL WHERE)

When to use: Sharded systems, large result sets, complex filtering logic.

Fix Pattern 3: DataLoader Pattern (with Caching)

Defer fetches and batch them using a real async/batching implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import asyncio
from typing import List, Callable, Dict, Any

class DataLoader:
    """
    Batching loader that defers individual requests and executes them in bulk.
    Inspired by graphql-core's DataLoader; avoids N+1 queries by collecting
    requests during the same event loop tick, then executing one batch query.
    """

    def __init__(self, batch_fn: Callable[[List[Any]], List[Any]], batch_size: int = 100):
        """
        Args:
            batch_fn: Function that takes a list of keys and returns a list of results
                     in the same order. E.g., lambda ids: db.query(f"WHERE id IN ({ids})")
            batch_size: How many items to batch before forcing execution
        """
        self.batch_fn = batch_fn
        self.batch_size = batch_size
        self.queue: List[tuple] = []  # [(key, future), ...]
        self.cache: Dict[Any, Any] = {}  # key -> result (memoization)
        self._flush_scheduled = False

    def load(self, key: Any) -> Any:
        """
        Request loading a key.
        If already cached, return immediately.
        Otherwise, enqueue for batch loading and schedule flush if needed.

        Returns: The cached result (if already loaded) or None (will be populated by flush).
                 In async scenarios, return a future/awaitable instead.
        """
        # Check cache first
        if key in self.cache:
            return self.cache[key]

        # Enqueue the key
        self.queue.append(key)

        # If queue reaches batch size, execute immediately
        if len(self.queue) >= self.batch_size:
            self._flush()
            return self.cache.get(key)

        return None  # Not yet executed; caller must call flush_all() before reading

    def _flush(self) -> None:
        """Execute all queued requests in one batch query."""
        if not self.queue:
            return

        keys = self.queue[:]
        self.queue = []

        # Execute batch query: batch_fn receives list of keys, returns list of results
        results = self.batch_fn(keys)

        # Cache results by key
        for key, result in zip(keys, results):
            self.cache[key] = result

    def flush_all(self) -> None:
        """Ensure all queued items are executed (idempotent)."""
        self._flush()

    def clear_cache(self) -> None:
        """Clear memoization cache (useful between request contexts)."""
        self.cache.clear()


# Real-world usage example with SQLAlchemy
def demo_dataloader_with_batch():
    """
    Demonstrates DataLoader pattern to fix N+1 in a real scenario.
    Batch-loads all orders for multiple users in a single query.
    """
    # Mock database batch query function
    def batch_load_orders(user_ids):
        """
        Given a list of user IDs, fetch all their orders in one query.
        Returns a list of order lists, one for each user ID (in same order).
        """
        # In production: db.query(Order).filter(Order.user_id.in_(user_ids))
        # For demo, simulate:
        orders_by_user = {
            1: [{"id": 101, "amount": 50}, {"id": 102, "amount": 75}],
            2: [{"id": 103, "amount": 100}],
            3: [],
        }
        # Return in the same order as input
        return [orders_by_user.get(uid, []) for uid in user_ids]

    loader = DataLoader(batch_load_orders, batch_size=10)

    # Simulate fetching users
    users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Charlie"}]

    # Enqueue loads (no queries executed yet)
    for user in users:
        loader.load(user["id"])

    # Execute all at once (1 batch query instead of 3 individual queries)
    loader.flush_all()

    # Now read results
    for user in users:
        orders = loader.cache.get(user["id"], [])
        print(f"{user['name']}: {len(orders)} orders")

Pros:

  • Automatic batching within a request context
  • Minimal code changes
  • Works with ORM lazy loading

Cons:

  • Requires promise/async handling
  • Adds complexity to codebase
  • Tuning batch size needed

When to use: GraphQL APIs, complex nested queries, ORM-heavy codebases.

Use ORM select_related (JOIN) or prefetch_related (batch):

1
2
3
4
5
6
7
8
9
10
11
12
# SQLAlchemy example
users = db.query(User).options(
    selectinload(User.orders)  # Batch load orders
).limit(100).all()
# Generates 2 queries: one for users, one for all orders

# OR

users = db.query(User).options(
    joinedload(User.orders)  # JOIN orders
).limit(100).all()
# Generates 1 query with JOIN

Pros:

  • Declarative, integrates with ORM
  • Automatic eager loading detection
  • Zero application code for batching

Cons:

  • ORM-specific syntax
  • Less flexibility than manual batching

When to use: ORM-based applications, predictable relationship loading.

Fix Pattern 5: Caching (Redis / Memcache)

Cache the related data:

1
2
3
4
5
6
7
users = db.query("SELECT * FROM users LIMIT 100")
for user in users:
    orders = cache.get(f"orders:{user.id}")
    if not orders:
        orders = db.query(f"SELECT * FROM orders WHERE user_id = {user.id}")
        cache.set(f"orders:{user.id}", orders, ttl=3600)
    user.orders = orders

Pros:

  • Reduces database load
  • Multiple requests share cached data
  • Handles hotspots (celebrity users)

Cons:

  • Cache invalidation complexity
  • Stale data possible
  • Memory overhead for cache

When to use: Frequently-accessed data, acceptable staleness, high read load.

EXPLAIN ANALYZE — Reading Query Plans

Run EXPLAIN ANALYZE on a slow query:

1
2
3
4
5
6
7
EXPLAIN ANALYZE
SELECT u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2024-01-01'
GROUP BY u.id
LIMIT 100;

Output:

1
2
3
4
5
6
7
8
9
10
Limit  (cost=1500.00..1510.00 rows=100 width=40) (actual time=25.432..25.501 rows=100 loops=1)
  ->  GroupAggregate  (cost=1500.00..3000.00 rows=50000 width=40) (actual time=5.123..25.432 rows=100 loops=1)
        ->  Sort  (cost=1500.00..1600.00 rows=1000 width=40) (actual time=5.100..8.234 rows=50000 loops=1)
              Sort Key: u.id
              ->  Hash Join  (cost=100.00..1200.00 rows=50000 width=40) (actual time=2.234..5.123 rows=50000 loops=1)
                    Hash Cond: (o.user_id = u.id)
                    ->  Seq Scan on orders o  (cost=0.00..500.00 rows=1000000 width=4) (actual time=0.123..2.456 rows=1000000 loops=1)
                    ->  Hash  (cost=50.00..50.00 rows=5000 width=36) (actual time=1.567..1.567 rows=5000 loops=1)
                          ->  Seq Scan on users u  (cost=0.00..50.00 rows=5000 width=36) (actual time=0.012..0.234 rows=5000 loops=1)
                                Filter: (created_at > '2024-01-01'::date)

Key things to look for:

Term Good? Problem
Seq Scan ❌ on large table Should use Index Scan; add index on WHERE column
Index Scan Efficient; only fetches matching rows
Hash Join Efficient for large result sets
Nested Loop ❌ if outer large Slow; consider hash join or index on join column
Sort 🟡 If not needed (e.g., no ORDER BY), remove
Filter 🟡 Applied after scan; move to WHERE if possible
actual time Compare to cost If actual » cost estimate, stats are stale (ANALYZE table)

Programmatic EXPLAIN Analysis

Most slow queries have one of a few patterns. A simple parser can flag them:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import re

def analyze_explain_output(explain_text):
    """
    Parse EXPLAIN ANALYZE output and flag performance issues.
    Returns a dict of findings: {"sequential_scans": [...], "expensive_sorts": [...], etc.}
    """
    findings = {
        "sequential_scans": [],
        "expensive_sorts": [],
        "nested_loops": [],
        "high_cost_estimates": [],
    }

    lines = explain_text.split('\n')
    for i, line in enumerate(lines):
        # Flag Seq Scan on large tables
        if "Seq Scan" in line:
            findings["sequential_scans"].append({
                "line": i,
                "text": line.strip(),
                "fix": "Add index on WHERE column or join column"
            })

        # Flag Sort operations (often unnecessary)
        if "Sort" in line and "cost=" in line:
            # Extract cost estimate
            cost_match = re.search(r'cost=([\d.]+)\.\.([\d.]+)', line)
            if cost_match:
                cost = float(cost_match.group(2))
                if cost > 1000:  # Expensive sort
                    findings["expensive_sorts"].append({
                        "line": i,
                        "cost": cost,
                        "text": line.strip(),
                        "fix": "Check if ORDER BY is necessary; consider index on sort key"
                    })

        # Flag Nested Loop joins (slow for large outer relation)
        if "Nested Loop" in line:
            findings["nested_loops"].append({
                "line": i,
                "text": line.strip(),
                "fix": "Consider Hash Join or index on join column"
            })

        # Flag high-cost operations
        if "cost=" in line:
            cost_match = re.search(r'cost=([\d.]+)\.\.([\d.]+)', line)
            if cost_match:
                cost = float(cost_match.group(2))
                if cost > 10000:
                    findings["high_cost_estimates"].append({
                        "line": i,
                        "cost": cost,
                        "text": line.strip(),
                    })

    return findings

# Example usage
explain_output = """
Seq Scan on orders o  (cost=0.00..500.00 rows=1000000 width=4)
Sort  (cost=1500.00..1600.00 rows=1000 width=40)
Nested Loop  (cost=100.00..1200.00 rows=50000 width=40)
"""

issues = analyze_explain_output(explain_output)
for issue_type, issues_list in issues.items():
    if issues_list:
        print(f"\n{issue_type}:")
        for issue in issues_list:
            print(f"  - {issue['text']}")
            if 'fix' in issue:
                print(f"    Fix: {issue['fix']}")

Query Optimization Patterns

Pattern 1: Missing Index

Symptom: Seq Scan on large_table in EXPLAIN ANALYZE

Fix: Add index on WHERE/JOIN column:

1
2
3
CREATE INDEX idx_orders_user_id ON orders(user_id);
EXPLAIN ANALYZE SELECT * FROM orders WHERE user_id = 42;
-- Now: Index Scan using idx_orders_user_id

Pattern 2: Covering Index

Symptom: Index Scan, then Heap Fetch (fetching full row after index scan)

Fix: Add SELECT columns to index:

1
2
3
CREATE INDEX idx_orders_user_id_amount ON orders(user_id) INCLUDE (amount);
EXPLAIN ANALYZE SELECT user_id, amount FROM orders WHERE user_id = 42;
-- Now: Index Only Scan (no heap fetch)

Pattern 3: Wrong Type in WHERE Clause

Symptom: Index not used despite having index on column

Cause: Type mismatch (user_id is INT, but query does WHERE user_id = ‘42’ (string))

Fix: Ensure types match:

1
2
3
4
5
-- Bad: user_id is INT, '42' is STRING
SELECT * FROM orders WHERE user_id = '42';  -- Seq Scan (tries implicit cast)

-- Good: both INT
SELECT * FROM orders WHERE user_id = 42;  -- Index Scan

Pattern 4: Subquery vs JOIN

Symptom: Correlated subquery in SELECT

Bad:

1
2
3
SELECT u.name, (SELECT COUNT(*) FROM orders WHERE user_id = u.id) as order_count
FROM users u;
-- Executes subquery once PER USER (N+1 pattern in SQL)

Good:

1
2
3
4
5
SELECT u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id;
-- Single scan + group

Pattern 5: SELECT * (unnecessary columns)

Symptom: Fetching all columns when you only need a few

Bad:

1
2
SELECT * FROM orders WHERE user_id = 42;
-- Fetches: id, user_id, amount, address, metadata, notes, ... (30 columns)

Good:

1
2
SELECT id, amount FROM orders WHERE user_id = 42;
-- Fetches only needed columns

Key Properties Table

Property Value
TCP connection overhead 5-50ms
Authentication overhead 2-10ms
State setup overhead 1-5ms
HikariCP default pool size cores × 2 + disk_spindles
PgBouncer throughput (transaction pooling) 10,000+ clients on 100 connections
Connection limit (PostgreSQL) ~500-1000 (OS file descriptor limit)
Optimal pool size (HikariCP formula) cores × (1 + wait_time/service_time)
N+1 problem: 100 users 101 queries vs 1 query = 100x latency
Batch query overhead 2-3 queries (WHERE IN scales sublinearly)
EXPLAIN ANALYZE startup cost Estimates query initialization
EXPLAIN ANALYZE actual time Wall-clock time (includes I/O)

When to Use Each Pooling Strategy

Strategy Best For
Transaction pooling Stateless web apps, HTTP requests, microservices
Session pooling Long-lived connections, stateful apps, batch jobs
Statement pooling Rarely used; extreme multiplexing only

Real-World Pool Configuration

PgBouncer (PostgreSQL Multiplexing Proxy)

PgBouncer accepts connections from 10,000+ clients and multiplexes them onto ~100 database connections. In transaction pooling mode, after each COMMIT, the database connection is returned to the pool.

Configuration:

1
2
3
4
5
6
7
8
[databases]
mydb = host=localhost port=5432 dbname=mydb

[pgbouncer]
pool_mode = transaction
max_client_conn = 10000
default_pool_size = 25
reserve_pool_size = 5

Concrete numbers: PgBouncer on a modern server can handle 100k connections (not all active simultaneously); throughput is 50k-100k req/sec depending on query size. Latency overhead: ~0.5ms per pooling layer.

HikariCP (Java Connection Pool)

HikariCP is the default pooling library for JVM applications (Spring Boot, Quarkus, etc.).

Configuration:

1
2
3
4
5
6
7
8
9
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost/mydb");
config.setUsername("postgres");
config.setPassword("password");
config.setMaximumPoolSize(20);  // cores × 2 + disk_spindles
config.setMinimumIdle(5);
config.setIdleTimeout(600000);  // 10 min
config.setMaxLifetime(1800000); // 30 min
config.setConnectionTimeout(20000);  // 20 sec wait before error

Concrete numbers: HikariCP default max pool size = 10 (conservative). A server with 8 cores would use 8 × 2 + 1 = 17 connections. Throughput: 10k-50k queries/sec per application instance depending on query latency.

AWS RDS Proxy

RDS Proxy is a managed proxy layer between applications and RDS databases. It multiplexes connections and provides automatic failover.

Benefits:

  • Handles connection pooling (applications don’t need to)
  • Faster scaling (new app instances connect instantly)
  • Transparent failover (switches to read replica on primary failure)

Configuration via AWS Console:

  • Database: mydb on RDS
  • Max connections: 100
  • Min connections: 10
  • Connection borrow timeout: 120 seconds

Concrete numbers: RDS Proxy adds ~1-2ms latency per query (proxy processing). Throughput: 10k-50k queries/sec depending on backend RDS instance size.

Pgpool-II (PostgreSQL Load Balancing + Pooling)

Pgpool-II combines connection pooling with read-write splitting and replication monitoring.

Features:

  • Connection pooling (like PgBouncer)
  • Load balancing across read replicas
  • Automatic failover to standby
  • Query caching (optional)

Configuration:

1
2
3
4
5
6
port = 9999
num_init_children = 32  # One for each possible child process
max_pool = 4  # Connections per child (32 × 4 = 128 total to DB)
backend_hostname0 = 'primary.db'
backend_hostname1 = 'replica.db'
backend_data_directory0 = '/var/lib/postgresql/data'

Concrete numbers: Pgpool-II can handle 1000+ concurrent client connections multiplexed to a smaller backend pool. Failover detection: 3-10 seconds. Read-write splitting adds ~1-2ms latency.

Implementation: N+1 Detection and Fix (SQLAlchemy)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
import time

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    orders = relationship('Order', back_populates='user')

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    amount = Column(Integer)
    user = relationship('User', back_populates='orders')

# Setup
engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)

def demo_n_plus_one():
    """Demonstrate N+1 problem."""
    session = Session()

    print("=" * 60)
    print("DEMO 1: N+1 Query Problem (SLOW)")
    print("=" * 60)

    start = time.time()
    users = session.query(User).limit(10).all()  # Query 1
    for user in users:
        # This triggers a lazy query for each user (Queries 2-11)
        print(f"{user.name}: {len(user.orders)} orders")
    elapsed = time.time() - start
    print(f"Total time: {elapsed:.3f}s (11 database queries)\n")

    print("=" * 60)
    print("DEMO 2: Using joinedload (FIX 1 — JOIN)")
    print("=" * 60)

    from sqlalchemy.orm import joinedload

    start = time.time()
    users = session.query(User).options(
        joinedload(User.orders)
    ).limit(10).all()  # Single query with JOIN
    for user in users:
        print(f"{user.name}: {len(user.orders)} orders")
    elapsed = time.time() - start
    print(f"Total time: {elapsed:.3f}s (1 database query)\n")

    print("=" * 60)
    print("DEMO 3: Using selectinload (FIX 2 — BATCH)")
    print("=" * 60)

    from sqlalchemy.orm import selectinload

    start = time.time()
    users = session.query(User).options(
        selectinload(User.orders)
    ).limit(10).all()  # Two queries: users, then orders in batch
    for user in users:
        print(f"{user.name}: {len(user.orders)} orders")
    elapsed = time.time() - start
    print(f"Total time: {elapsed:.3f}s (2 database queries)\n")

    session.close()

class DataLoader:
    """
    Batching loader — automatically batches requests and executes in bulk.
    """

    def __init__(self, batch_fn, batch_size=100):
        """
        Args:
            batch_fn: Function that takes a list of IDs and returns results
            batch_size: How many items to batch before executing
        """
        self.batch_fn = batch_fn
        self.batch_size = batch_size
        self.queue = []
        self.cache = {}

    def load(self, key):
        """
        Request loading a key.
        If cached, return immediately.
        Otherwise, enqueue for batch loading.
        """
        if key in self.cache:
            return self.cache[key]

        self.queue.append(key)
        if len(self.queue) >= self.batch_size:
            self._flush()

        return None  # Will be populated by _flush

    def _flush(self):
        """Execute all queued requests in one batch."""
        if not self.queue:
            return

        keys = self.queue
        results = self.batch_fn(keys)

        for key, result in zip(keys, results):
            self.cache[key] = result

        self.queue = []

    def flush_all(self):
        """Ensure all queued items are loaded."""
        self._flush()


def demo_dataloader():
    """Demonstrate DataLoader pattern."""
    session = Session()

    print("=" * 60)
    print("DEMO 4: DataLoader Pattern (FIX 3 — MANUAL BATCH)")
    print("=" * 60)

    def batch_load_orders(user_ids):
        """Load all orders for multiple users in one query."""
        orders_dict = {}
        orders = session.query(Order).filter(Order.user_id.in_(user_ids)).all()
        for order in orders:
            if order.user_id not in orders_dict:
                orders_dict[order.user_id] = []
            orders_dict[order.user_id].append(order)
        # Return in same order as input
        return [orders_dict.get(uid, []) for uid in user_ids]

    loader = DataLoader(batch_load_orders, batch_size=10)

    start = time.time()
    users = session.query(User).limit(10).all()  # Query 1

    # Enqueue loading but don't execute yet
    for user in users:
        loader.load(user.id)

    # Execute all at once
    loader.flush_all()  # Query 2 (batch loads all orders)

    for user in users:
        orders = loader.cache.get(user.id, [])
        print(f"{user.name}: {len(orders)} orders")

    elapsed = time.time() - start
    print(f"Total time: {elapsed:.3f}s (2 database queries, automatic batching)\n")

    session.close()

if __name__ == "__main__":
    # demo_n_plus_one()
    # demo_dataloader()
    print("(Uncomment demo functions to run against a real database)")

References

This post is licensed under CC BY 4.0 by the author.