Post

Sliding Window & Two Pointer

Process sequential data in O(n) with a moving window -- the core algorithm for streaming analytics, rate limiting, and network protocols.

Sliding Window & Two Pointer

Key Properties

Property Value Notes
Time Complexity (Sliding Window) O(n) Single pass, amortized
Space Complexity O(k) or O(1) Window size or tracking variables
Fixed Window Accuracy Low (~80%) Edge-case bursts
Variable Window Accuracy High (>99%) Real-time adjustment
TCP Window Size (typical) 65535 bytes 16-bit header field
Modern TCP RWND Up to 1GB With window scaling (RFC 7323)

When to Use / Avoid

Use Sliding Window When:

  • ✅ Processing unbounded streams (Kafka, time-series data)
  • ✅ Need to detect patterns over fixed time windows
  • ✅ Rate limiting with minimal false positives required
  • ✅ Finding minimum/maximum subarray meeting a condition
  • ✅ Removing duplicates in a stream (with hash set)
  • ✅ Network congestion control or flow control

Avoid When:

  • ❌ Working with unordered data (sliding window assumes sequence)
  • ❌ Need to query arbitrary historical windows (use append-only log instead)
  • ❌ Memory is severely constrained and can’t store window elements
  • ❌ Elements arrive out-of-order by timestamp (use session windows + reordering)

Real Systems Using Sliding Window

System Pattern Configuration Benefit
TCP/IP (RFC 5681) Fixed sliding window + congestion control CWND: 4-64 KB initially, grows exponentially during slow start, linearly during avoidance Prevents network congestion collapse; adapts to 25+ ms RTTs
Cloudflare Rate Limiting Sliding window counter (hybrid) Two counters: current minute + previous minute, weighted blend Scales to millions of domains; 0.003% error rate across 400M requests
Kafka Consumer Groups Variable window for lag tracking Burrow uses 10-minute sliding window to detect trend (increasing/stable/decreasing) Eliminates false positives from traffic spikes; alerts on genuine lag growth
Prometheus Metrics Fixed sliding window with avg_over_time Default time range = 4x scrape interval (e.g., 1 minute for 15-sec scrape) Detects anomalies without noisy instantaneous values
Apache Flink Streaming Tumbling + hopping (sliding) windows Window size 5 sec, slide 1 sec → 5 overlapping windows per element Real-time aggregation of millions of events/sec
Redis Streams + XRANGE Variable time-window range queries Stream ID = (timestamp_ms, sequence); query [start-time, end-time] Sub-millisecond latency for windowed stream processing

How Real Systems Use This

TCP/IP Sliding Window Protocol (RFC 5681)

TCP’s sliding window is the receiver’s advertised window (RWND), which tells the sender how much data it can transmit without waiting for acknowledgments. The sender maintains a congestion window (CWND) that tracks network capacity. During slow start, CWND grows exponentially (doubles each RTT) until the first packet loss; then it enters congestion avoidance and grows linearly (+1 MSS per RTT). On loss detection, CWND drops to ~SSTHRESH/2 (multiplicative decrease). Modern TCP with window scaling (RFC 7323) supports windows up to 1GB, critical for high-bandwidth-delay-product links (e.g., 10 Gbps × 100 ms latency = 125 MB in flight). The sliding window eliminates the need to wait for individual ACKs, increasing throughput from ~1 segment/RTT (stop-and-wait) to full pipeline capacity.

Cloudflare Rate Limiting (Sliding Window Counter)

Cloudflare rate limits at scale using a hybrid approach: instead of storing every request timestamp (prohibitive at millions of domains × millions of IPs), it stores two counters per minute: count_current_minute and count_previous_minute. To estimate requests in the last 60 seconds, it calculates: estimate = count_previous_minute × ((60 - elapsed_seconds) / 60) + count_current_minute. Example: if 15 seconds have passed in the current minute with 18 new requests, and the previous minute had 42 requests, the estimate is 42 × 0.75 + 18 = 49.5 requests. This achieves 0.003% error rate (on 400 million requests) with O(1) memory per key and atomic INCR operations, avoiding contention. The algorithm is deterministic and works beautifully with Redis INCR.

Kafka Consumer Group Lag Monitoring (Burrow)

LinkedIn’s Burrow monitors consumer lag using a sliding window trend analysis: rather than alerting when lag > 10,000 (which triggers false positives during traffic spikes), it maintains a 10-minute sliding window of lag measurements and classifies as OK/WARNING/ERR based on whether lag is decreasing (catching up), stable, or increasing (falling behind). If lag increases over two consecutive 1-minute windows, it’s a true problem; if lag spikes but then drops, it’s just a temporary burst. This reduces false positives by 90%+ compared to threshold-based alerting, critical when monitoring thousands of consumer groups.

Prometheus Moving Averages (avg_over_time)

Prometheus queries use avg_over_time(metric[5m]) to compute sliding window averages over the last 5 minutes, with new data arriving every scrape interval (default 15 seconds). The time range for the window should be at least 4x the scrape interval to avoid aliasing; this ensures the window captures at least 20 data points. The database stores compressed time-series blocks, so querying a 5-minute window scans only the latest block, making this O(n) where n is samples in 5 minutes (~20 samples). This smooths out instantaneous spikes and detects genuine trends. Prometheus defaults to 15-day retention, so historical sliding window queries are limited to that window.

Flink splits unbounded streams into finite windows using a window assigner (tumbling vs. sliding), window trigger (when to emit), and window function (what aggregation to compute). A sliding window example: HOP(event_time, 1 second, 5 seconds) creates overlapping windows of 5-second width that slide by 1 second — meaning each event falls into 5 windows. For a stream of 1 million events/second, Flink manages millions of concurrent windows in memory, purging state when windows close. The sliding window allows detecting patterns that span multiple 1-second tumbles (e.g., “alert if rate > 100k events/sec over any 5-second window”).

Redis Streams with XRANGE (Time Windows)

Redis Streams store entries with IDs in format timestamp-ms:sequence-number (e.g., 1680000000000-0). XRANGE retrieves entries within a time window: XRANGE mystream (1680000000000 1680000060000 returns all entries in that 60-second range. Consumer groups track lag: the stream maintains entries_added (total entries ever added) and each consumer group maintains its last_delivered_id. Lag = entries_added - consumer_group.last_delivered_id, updated in real-time with O(1) complexity. This enables sub-millisecond latency windowed stream processing without scanning the entire stream; each XRANGE does binary search on the stream index.

Implementation

Fixed-Size Sliding Window

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def max_in_sliding_window(arr, k):
    """
    Find maximum value in each k-sized window of array.
    Uses deque to maintain indices of useful elements.

    Args:
        arr: list of integers
        k: window size

    Returns:
        list of maximum values for each window

    Time: O(n), Space: O(k)
    """
    from collections import deque

    if not arr or k <= 0:
        return []

    deq = deque()  # Stores indices of useful elements
    result = []

    for i in range(len(arr)):
        # Remove indices outside current window
        while deq and deq[0] < i - k + 1:
            deq.popleft()

        # Remove indices of elements smaller than current
        # (they can never be max while current is in window)
        while deq and arr[deq[-1]] <= arr[i]:
            deq.pop()

        # Add current element index
        deq.append(i)

        # Once window is full, the front of deque is the max
        if i >= k - 1:
            result.append(arr[deq[0]])

    return result


# Example usage
arr = [1, 3, 1, 2, 0, 5]
k = 3
print(max_in_sliding_window(arr, k))  # Output: [3, 3, 2, 5]

Variable-Size Sliding Window

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def min_window_substring(s, t):
    """
    Find minimum window in s that contains all characters of t.
    Uses two-pointer expansion/contraction pattern.

    Args:
        s: source string
        t: target string

    Returns:
        minimum window substring

    Time: O(|s| + |t|), Space: O(|t|) for char count
    """
    if not s or not t:
        return ""

    dict_t = {}
    for char in t:
        dict_t[char] = dict_t.get(char, 0) + 1

    required = len(dict_t)  # Unique chars in t that must be present
    formed = 0  # Unique chars in current window with desired frequency

    window_counts = {}
    left, right = 0, 0
    min_window = (float("inf"), None, None)  # (length, left, right)

    while right < len(s):
        # Expand: add character from right
        char = s[right]
        window_counts[char] = window_counts.get(char, 0) + 1

        # Check if frequency of current character matches desired count
        if char in dict_t and window_counts[char] == dict_t[char]:
            formed += 1

        # Contract: shrink from left while window is valid
        while left <= right and formed == required:
            char = s[left]

            # Update result if current window is smaller
            if right - left + 1 < min_window[0]:
                min_window = (right - left + 1, left, right)

            # Remove character from left of window
            window_counts[char] -= 1
            if char in dict_t and window_counts[char] < dict_t[char]:
                formed -= 1

            left += 1

        right += 1

    return "" if min_window[0] == float("inf") else s[min_window[1] : min_window[2] + 1]


# Example usage
s = "ADOBECODEBANC"
t = "ABC"
print(min_window_substring(s, t))  # Output: "BANC"

Two-Pointer Patterns

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def two_sum_sorted(arr, target):
    """
    Find two numbers in sorted array that sum to target.
    Uses opposite-ends two-pointer pattern.

    Args:
        arr: sorted list of integers
        target: sum target

    Returns:
        tuple of indices (i, j) or None

    Time: O(n), Space: O(1)
    """
    left, right = 0, len(arr) - 1

    while left < right:
        current_sum = arr[left] + arr[right]

        if current_sum == target:
            return (left, right)
        elif current_sum < target:
            left += 1  # Need larger sum
        else:
            right -= 1  # Need smaller sum

    return None


def remove_duplicates(arr):
    """
    Remove duplicates from sorted array in-place using two pointers.
    Fast pointer (i) scans; slow pointer (j) marks write position.

    Args:
        arr: sorted list with duplicates

    Returns:
        length of array after removing duplicates

    Time: O(n), Space: O(1)
    """
    if not arr:
        return 0

    j = 0  # Pointer for position to write unique element
    for i in range(1, len(arr)):
        if arr[i] != arr[j]:
            j += 1
            arr[j] = arr[i]

    return j + 1


# Example usage
print(two_sum_sorted([2, 7, 11, 15], 9))  # Output: (0, 1)

arr = [0, 0, 1, 1, 1, 2, 2, 3, 3, 4]
length = remove_duplicates(arr)
print(f"Unique elements: {arr[:length]}")  # Output: [0, 1, 2, 3, 4]

Real-World Rate Limiting Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import time
from collections import defaultdict

class SlidingWindowRateLimiter:
    """
    Production-style rate limiter using sliding window counter.
    Mimics Cloudflare's approach: two counters per time window.
    """

    def __init__(self, capacity, window_seconds):
        """
        Args:
            capacity: max requests allowed per window
            window_seconds: duration of sliding window
        """
        self.capacity = capacity
        self.window_seconds = window_seconds
        self.requests = defaultdict(lambda: {"current": 0, "previous": 0})
        self.last_reset = {}

    def _get_window_info(self, key):
        """Get current window and elapsed time."""
        current_time = time.time()
        if key not in self.last_reset:
            self.last_reset[key] = current_time
            return 0, 0

        elapsed = current_time - self.last_reset[key]
        if elapsed > self.window_seconds:
            # Rotate windows: current becomes previous
            self.requests[key]["previous"] = self.requests[key]["current"]
            self.requests[key]["current"] = 0
            self.last_reset[key] = current_time
            elapsed = 0

        return elapsed, current_time

    def allow_request(self, key):
        """
        Check if request is allowed under rate limit.

        Returns:
            (allowed: bool, estimated_rate: float)
        """
        elapsed, _ = self._get_window_info(key)

        # Sliding window counter: weighted average of two windows
        previous = self.requests[key]["previous"]
        current = self.requests[key]["current"]
        weight = (self.window_seconds - elapsed) / self.window_seconds

        estimated_rate = previous * weight + current

        allowed = estimated_rate < self.capacity

        if allowed:
            self.requests[key]["current"] += 1

        return allowed, estimated_rate


# Example: Rate limit to 5 requests per 10 seconds
limiter = SlidingWindowRateLimiter(capacity=5, window_seconds=10)

for i in range(12):
    allowed, rate = limiter.allow_request("client_123")
    print(f"Request {i+1}: {'✓ Allowed' if allowed else '✗ Denied'} (rate={rate:.1f})")
    time.sleep(0.5)

References

This post is licensed under CC BY 4.0 by the author.