Lab 11 — Message Dispatcher
Goal
Implement a message dispatcher that fans out messages to N consumers with fairness, per-consumer priority levels, and per-consumer backpressure. After this lab you should be able to design a multi-consumer dispatcher that doesn’t starve slow consumers and doesn’t get stalled by slow ones, in under 30 minutes.
Background Concepts
A dispatcher accepts messages from one (or more) producers and routes them to N consumers. Three classical problems:
- Fairness: round-robin across consumers vs weighted by priority. Strict round-robin is unfair if some consumers have higher priority. Weighted-fair-queueing (WFQ) gives each consumer a share proportional to its weight.
- Slow-consumer problem: a slow consumer’s queue fills up. If we share a single queue across all consumers, the slow one stalls everyone. Solution: per-consumer queues with bounded capacity.
- Backpressure: when a consumer’s queue is full, what do we do? Options: (a) block the producer (slowest fair), (b) drop oldest, (c) drop newest, (d) reject and signal upstream. Default to (b) for telemetry, (a) for orders/payments.
Interview Context
Dispatcher problems show up at message-bus companies (Confluent, Solace), real-time platforms (Twilio, PubNub, Pusher), and any backend with fan-out. The interviewer wants to hear: per-consumer queue, priority-aware scheduling, explicit backpressure policy. Common failure: a single shared queue with consumers competing — works for two consumers, breaks at scale.
Problem Statement
Design Dispatcher:
register_consumer(consumer_id, priority=1, queue_capacity=1024, on_full="drop_oldest")dispatch(message)— fan out to all registered consumers (broadcast).consume(consumer_id) -> Message | None— non-blocking dequeue for a consumer.consume_blocking(consumer_id, timeout)— blocking dequeue.unregister(consumer_id)stats() -> dict— per-consumer queue size, drops, throughput.
Constraints
- 1 ≤ consumers ≤ 1000
- 10^5 dispatches / second
- Per-consumer latency: < 1 ms p99 from dispatch to availability
Clarifying Questions
- Broadcast (every consumer gets every message) or partition (each message goes to one)? (Pick one; we’ll do broadcast — the harder problem.)
- Strict ordering across consumers? (Each consumer sees messages in dispatch order; cross-consumer ordering not guaranteed.)
- Priority semantics: priority is a weight, not a strict precedence? (Weight is the standard.)
- Should consumers be threads, or should
consumebe polled by external code? (External polling — simpler.)
Examples
d = Dispatcher()
d.register_consumer("fast", priority=2, queue_capacity=100)
d.register_consumer("slow", priority=1, queue_capacity=10, on_full="drop_oldest")
for i in range(20):
d.dispatch({"i": i})
# slow consumer dropped 10 oldest; only sees last 10
slow_msgs = []
while True:
m = d.consume("slow")
if m is None: break
slow_msgs.append(m)
# slow_msgs has the 10 most recent messages
Initial Brute Force
A single shared deque and a for c in consumers: c.recv(msg) loop on dispatch. If a consumer is slow, the dispatch blocks on it. The whole pipeline stalls.
Brute Force Complexity
Per dispatch: O(N · consumer-receive-time). Worst-case latency is bounded by the slowest consumer.
Optimization Path
Per-consumer queue. Dispatch is now O(N · queue-push-time) ≈ O(N) — fast and bounded by the dispatcher’s own work, not by any consumer’s processing. Slow consumers fill their own queue and trigger their own backpressure policy without affecting others.
Final Expected Approach
A dict[consumer_id, ConsumerQueue]. Each ConsumerQueue has a deque, a Lock (or use queue.Queue), a capacity, an on-full policy, and counters. dispatch iterates the dict and pushes to each queue, applying the policy on full. consume dequeues from the named queue. The single shared lock would serialize dispatches; the per-queue lock parallelizes them with the cost that there’s no atomic “dispatch sees a consistent registration” — acceptable trade.
Data Structures Used
| Structure | Purpose |
|---|---|
dict[id, ConsumerQueue] | per-consumer state |
deque per consumer | bounded FIFO |
Lock per consumer | concurrent push/pop safety |
Condition per consumer | blocking consume |
Correctness Argument
No starvation: every consumer has its own queue; a slow one cannot block dispatch. Producer dispatch latency is bounded by O(N) (the dict iteration), independent of consumer speed.
Priority weighting: at dispatch time, we don’t apply weight (every consumer gets every message — broadcast). Priority is used in the consume order if we have a single consumer thread that polls all queues in priority order; in that variant, weight determines how many messages we consume from each per round (e.g., 2 from priority=2, 1 from priority=1).
Backpressure: when a queue is at capacity, the consumer-defined policy fires. Each consumer’s drops are tracked separately; the dispatcher itself never blocks.
Complexity
dispatch: O(N) where N = number of consumersconsume: O(1)- Space: O(N · capacity)
Implementation Requirements
import threading, time
from collections import deque
from typing import Any, Optional
class _ConsumerQueue:
def __init__(self, capacity: int, on_full: str = "drop_oldest", priority: int = 1):
self.capacity = capacity
self.on_full = on_full # "drop_oldest" | "drop_newest" | "block"
self.priority = priority
self.q: deque = deque()
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.dropped = 0
self.delivered = 0
class Dispatcher:
def __init__(self):
self._consumers: dict[str, _ConsumerQueue] = {}
self._reg_lock = threading.RLock()
def register_consumer(self, consumer_id: str, priority: int = 1,
queue_capacity: int = 1024,
on_full: str = "drop_oldest") -> None:
with self._reg_lock:
if consumer_id in self._consumers:
raise ValueError(f"{consumer_id} already registered")
self._consumers[consumer_id] = _ConsumerQueue(queue_capacity, on_full, priority)
def unregister(self, consumer_id: str) -> None:
with self._reg_lock:
self._consumers.pop(consumer_id, None)
def dispatch(self, message: Any) -> None:
with self._reg_lock:
consumers = list(self._consumers.values())
for cq in consumers:
with cq.lock:
if len(cq.q) >= cq.capacity:
if cq.on_full == "drop_oldest":
cq.q.popleft()
cq.dropped += 1
cq.q.append(message)
elif cq.on_full == "drop_newest":
cq.dropped += 1
continue
elif cq.on_full == "block":
while len(cq.q) >= cq.capacity:
cq.cond.wait()
cq.q.append(message)
else:
cq.q.append(message)
cq.cond.notify()
def consume(self, consumer_id: str) -> Optional[Any]:
cq = self._consumers.get(consumer_id)
if cq is None: return None
with cq.lock:
if not cq.q: return None
m = cq.q.popleft()
cq.delivered += 1
cq.cond.notify() # producer waiting on "block" policy
return m
def consume_blocking(self, consumer_id: str, timeout: Optional[float] = None) -> Optional[Any]:
cq = self._consumers.get(consumer_id)
if cq is None: return None
deadline = None if timeout is None else time.monotonic() + timeout
with cq.lock:
while not cq.q:
if deadline is None:
cq.cond.wait()
else:
rem = deadline - time.monotonic()
if rem <= 0: return None
cq.cond.wait(timeout=rem)
m = cq.q.popleft()
cq.delivered += 1
cq.cond.notify()
return m
def stats(self) -> dict:
with self._reg_lock:
return {
cid: {
"queue_size": len(cq.q),
"dropped": cq.dropped,
"delivered": cq.delivered,
"capacity": cq.capacity,
"priority": cq.priority,
}
for cid, cq in self._consumers.items()
}
Tests
import unittest, threading, time
class TestDispatcher(unittest.TestCase):
def test_broadcast(self):
d = Dispatcher()
d.register_consumer("a"); d.register_consumer("b")
for i in range(5): d.dispatch(i)
a = [d.consume("a") for _ in range(5)]
b = [d.consume("b") for _ in range(5)]
self.assertEqual(a, [0, 1, 2, 3, 4])
self.assertEqual(b, [0, 1, 2, 3, 4])
def test_drop_oldest_on_full(self):
d = Dispatcher()
d.register_consumer("a", queue_capacity=3, on_full="drop_oldest")
for i in range(5): d.dispatch(i)
out = []
while True:
m = d.consume("a")
if m is None: break
out.append(m)
self.assertEqual(out, [2, 3, 4])
self.assertEqual(d.stats()["a"]["dropped"], 2)
def test_slow_consumer_does_not_block_fast(self):
d = Dispatcher()
d.register_consumer("slow", queue_capacity=2, on_full="drop_oldest")
d.register_consumer("fast", queue_capacity=100)
for i in range(50):
d.dispatch(i)
# fast got everything
fast = []
while True:
m = d.consume("fast")
if m is None: break
fast.append(m)
self.assertEqual(len(fast), 50)
# slow has 2 (capacity) — most-recent
slow = [d.consume("slow") for _ in range(2)]
self.assertEqual(slow, [48, 49])
def test_blocking_consume_wakes_on_dispatch(self):
d = Dispatcher()
d.register_consumer("c", queue_capacity=10)
results = []
def consumer():
results.append(d.consume_blocking("c", timeout=2.0))
t = threading.Thread(target=consumer); t.start()
time.sleep(0.05)
d.dispatch("hello")
t.join(timeout=2.0)
self.assertEqual(results, ["hello"])
Follow-up Questions
(7) Backpressure? This is the core problem. Three policies built in: drop-oldest (best for telemetry), drop-newest (best for “first message matters”), block (best for ordered streams where loss is unacceptable). The right pick depends on the data semantics; expose it per-consumer.
(3) Scale to N nodes? Distributed dispatch: each consumer “subscribes” via a network connection; the dispatcher fans out over those connections. Bottleneck shifts from queue-push to per-consumer network round-trip. For very high consumer counts, a hierarchical dispatcher (dispatch to N regional dispatchers, each of which dispatches to M local consumers) reduces the per-message broadcast cost.
(4) Observe / monitor? Per-consumer queue depth, drop rate, throughput. The drop-rate heatmap by consumer is the first dashboard you draw. Alert when any consumer has > 1% drop rate.
(8) Partial failure? A consumer that connects, then disappears: the dispatcher must detect (via missed heartbeat or socket close) and unregister; otherwise its queue grows unbounded. Heartbeat / TTL on consumer registration.
(11) Configuration knobs? Per-consumer: priority, queue_capacity, on_full policy. Global: max consumers. Knobs not to expose: the lock granularity (per-consumer is correct).
Product Extension
This is the in-process version of Kafka consumer groups, RabbitMQ exchange-to-queue fan-out, and Redis Pub/Sub. Real systems add: durable per-consumer logs (Kafka’s offset model), dynamic rebalancing as consumers join/leave (Kafka group coordinator), and message acknowledgment (AMQP). The core problem of “don’t let slow consumers stall fast ones” is solved everywhere by per-consumer storage.
Language/Runtime Follow-ups
- Python:
queue.Queueper consumer is also fine and simpler — built-in blocking and capacity. Custom_ConsumerQueueshown for control over the on-full policy. - Java:
LinkedBlockingQueueper consumer;RingBuffer(Disruptor pattern) for high-throughput. - Go: idiomatic Go:
chan Messageper consumer of sizecapacity.selectwithdefaultfor non-blocking. Simple and fast. - C++:
boost::lockfree::spsc_queueper consumer for single-producer/single-consumer; otherwise mutex + deque. - JS/TS: single event loop, so no real “consumer threads”; use EventEmitter with bounded buffers per listener. RxJS
SubjectwithbufferCountoperators.
Common Bugs
- Single shared lock for the dispatcher: serializes everything; dispatch latency = O(N · consumer-time).
blockpolicy without notifying onconsume: producer waits forever.- Forgetting to copy the consumer dict before iterating in
dispatch— concurrent unregister mutates the dict mid-iteration. - Drop-oldest implementation:
popleftthen append succeeds, but if the lock is dropped between, ordering breaks. Both ops under the same lock. - Counting a “dropped” message as both dropped and delivered (double count) when on
drop_oldestyou replace, not skip.
Debugging Strategy
Print stats() periodically. Slow consumer = high queue_size and rising dropped. Stuck consumer = blocking with no notifies — check producer dispatch isn’t stuck on a different consumer’s block policy. For “missed messages”: log per-message dispatch with consumer enumeration and replay against delivered counts.
Mastery Criteria
- Implemented per-consumer queue + dispatcher in <30 minutes.
- All four tests pass.
- Stated “per-consumer queue isolates slow consumers” before coding.
- Listed three backpressure policies (drop-oldest/newest/block) and their use cases.
- Answered follow-ups #3, #4, #7, #8, #11 crisply.
- Identified that broadcast is the harder variant; partition is simpler.