Lab 12 — In-Memory Pub/Sub
Goal
Implement an in-process publish-subscribe system with topic-based routing, wildcard subscriptions (a.b.*, a.#), and per-subscriber backpressure. After this lab you should be able to write a clean pub/sub broker in under 30 minutes and articulate the topic-matching design tradeoffs.
Background Concepts
Pub/sub differs from a job queue: subscribers don’t compete for messages; each subscriber receives every matching message. Two routing models:
- Topic-based (channel name as a string):
"orders.created","users.signup". Wildcards (*= one segment,#= many segments) come from MQTT/AMQP. - Content-based: subscribers register a predicate over message content. More flexible, much harder to scale (every message must be evaluated against every subscriber’s predicate).
Topic-based with wildcards is the standard. The implementation challenge is the wildcard matcher: a subscription on "orders.*" should match "orders.created" but not "orders.created.fraud". We can solve this with a topic trie (segment-by-segment) for O(L) per match where L is segment count, or a regex per subscription for O(N) per dispatch where N is subscription count. The trie is the production answer for systems with many subscriptions.
Interview Context
Pub/sub design is asked at messaging companies (Confluent, IBM MQ, Solace, AWS SNS), at real-time platforms (Pusher, Ably, Twilio), and broadly at any senior+ design-coding round. The interview wants both code and the design reasoning around routing, backpressure, and subscription matching.
Problem Statement
Design PubSub:
subscribe(topic_pattern, on_message) -> subscription_id—topic_patternmay include*(single segment) or#(multi-segment, must be last).unsubscribe(subscription_id)publish(topic, message)— callon_message(topic, message)on every matching subscriber.- Per-subscriber callback wrapping for backpressure (queue + drop policy).
Constraints
- Up to 10^4 active subscriptions
- Up to 10^5 publishes / second
- Subscription matching: < 100 µs per publish
- Per-subscriber callbacks may be slow; must not block the publisher
Clarifying Questions
- Wildcard syntax — MQTT (
+/#), AMQP (*/#), or other? (Pick AMQP-style:*= one segment,#= ≥ 0 segments at end.) - Synchronous or async callback delivery? (Async with per-subscriber queue is the production answer; simpler synchronous version is acceptable for the basic case.)
- Topic separator:
.or/? (.is the AMQP convention; either is fine.) - Ordering guarantees? (Per-subscriber: messages arrive in publish-order. Across subscribers: not guaranteed.)
- Replay / retain / persistence? (No by default; pure in-memory.)
Examples
ps = PubSub()
sid = ps.subscribe("orders.*", lambda topic, msg: print(f"got {topic}: {msg}"))
ps.publish("orders.created", {"id": 1}) # fires
ps.publish("orders.created.fraud", {"id": 2}) # does NOT fire (* is one segment)
ps.subscribe("orders.#", lambda t, m: log(t, m))
ps.publish("orders.created.fraud", {}) # fires (# matches multi)
ps.unsubscribe(sid)
ps.publish("orders.created", {}) # only the # subscription fires
Initial Brute Force
dict[topic_pattern, list[callback]]. On publish, iterate all subscriptions, regex-match each pattern against the topic. O(N · pattern-cost) per publish where N is subscription count.
Brute Force Complexity
At N=10^4 subscriptions and 10^5 pub/s, this is 10^9 regex matches / sec — too slow. Wildcard regex compile and match dominate.
Optimization Path
A topic trie: each node represents a segment. Children include literal-segment children plus a * and # child for wildcards. Match by walking the trie segment-by-segment, exploring each node’s literal child plus its * child plus (if at end) any # ancestor’s catch-all subscriptions.
Per-publish cost becomes O(L · branching factor) ≈ O(L) for typical trees, where L is the topic depth.
Final Expected Approach
Build a topic trie. Each node has:
children: dict[str, Node]— literal subsegmentsstar: Node | None— single-segment wildcardhash_subscriptions: list[Sub]—#catch-all (matches everything below this node)subs: list[Sub]— exact matches at this node
Publishing walks the trie segment-by-segment, at each step checking the literal child and the star child; collect matching subs at terminal nodes. Collect hash_subscriptions along the entire path.
Each subscriber owns a per-subscriber bounded queue; publish enqueues to the queue (non-blocking, drops on full); a worker thread per subscriber drains the queue and calls the callback.
Data Structures Used
| Structure | Purpose |
|---|---|
| Topic trie | O(L) routing |
dict[sub_id, Subscription] | unregister lookup |
deque per subscriber + lock | per-subscriber queue |
| Worker thread per subscriber | invoke callback off the publish path |
Correctness Argument
Routing: a subscription A.B.C matches publish topic A.B.C iff the trie walk reaches the node carrying that subscription with all segments consumed. A * matches any single segment (one node-level wildcard step). A # at a node matches any zero-or-more remaining segments — equivalent to attaching a list of “catch-all” subscribers to that node.
Per-subscriber ordering: each subscriber’s queue is FIFO; the worker drains in FIFO order. Therefore subscriber sees messages in publish order.
Publisher non-blocking: publish only enqueues; no subscriber callback runs on the publish thread. Even a callback that takes 1 second doesn’t slow publish.
Complexity
subscribe: O(L) for trie insertpublish: O(L · F · M) where L = topic depth, F = trie branching, M = matching subscribersunsubscribe: O(L) to walk the trie node and remove
Implementation Requirements
import threading, itertools
from collections import deque
from typing import Callable, Any, Optional
class _Sub:
__slots__ = ("sub_id", "callback", "queue", "lock", "cond", "capacity", "drops",
"alive", "thread")
def __init__(self, sub_id: int, callback: Callable, capacity: int = 1024):
self.sub_id = sub_id; self.callback = callback
self.queue: deque = deque()
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.capacity = capacity
self.drops = 0
self.alive = True
self.thread: Optional[threading.Thread] = None
def deliver(self, topic: str, msg: Any) -> None:
with self.lock:
if len(self.queue) >= self.capacity:
self.queue.popleft(); self.drops += 1
self.queue.append((topic, msg))
self.cond.notify()
def stop(self) -> None:
with self.lock:
self.alive = False
self.cond.notify()
def run(self) -> None:
while True:
with self.lock:
while self.alive and not self.queue:
self.cond.wait()
if not self.alive and not self.queue:
return
topic, msg = self.queue.popleft()
try:
self.callback(topic, msg)
except Exception:
pass # don't kill worker on bad callback
class _Node:
__slots__ = ("children", "star", "subs", "hash_subs")
def __init__(self):
self.children: dict[str, _Node] = {}
self.star: Optional[_Node] = None
self.subs: list[_Sub] = []
self.hash_subs: list[_Sub] = []
class PubSub:
def __init__(self, separator: str = "."):
self._sep = separator
self._root = _Node()
self._subs: dict[int, tuple[_Sub, list[str]]] = {} # id -> (sub, pattern segments)
self._next_id = itertools.count(1)
self._lock = threading.RLock()
def subscribe(self, pattern: str, callback: Callable[[str, Any], None],
queue_capacity: int = 1024) -> int:
segments = pattern.split(self._sep)
sid = next(self._next_id)
sub = _Sub(sid, callback, queue_capacity)
with self._lock:
node = self._root
for i, seg in enumerate(segments):
if seg == "#":
if i != len(segments) - 1:
raise ValueError("# must be the last segment")
node.hash_subs.append(sub)
break
if seg == "*":
if node.star is None: node.star = _Node()
node = node.star
else:
node = node.children.setdefault(seg, _Node())
else:
node.subs.append(sub)
self._subs[sid] = (sub, segments)
sub.thread = threading.Thread(target=sub.run, daemon=True)
sub.thread.start()
return sid
def unsubscribe(self, sub_id: int) -> None:
with self._lock:
entry = self._subs.pop(sub_id, None)
if entry is None: return
sub, segments = entry
self._remove_from_trie(self._root, segments, 0, sub)
sub.stop()
sub.thread.join(timeout=1.0)
def _remove_from_trie(self, node: _Node, segments: list[str],
i: int, sub: _Sub) -> None:
if i == len(segments):
try: node.subs.remove(sub)
except ValueError: pass
return
seg = segments[i]
if seg == "#":
try: node.hash_subs.remove(sub)
except ValueError: pass
return
nxt = node.star if seg == "*" else node.children.get(seg)
if nxt is not None:
self._remove_from_trie(nxt, segments, i + 1, sub)
def publish(self, topic: str, message: Any) -> None:
segments = topic.split(self._sep)
with self._lock:
self._match(self._root, segments, 0, topic, message)
def _match(self, node: _Node, segments: list[str], i: int,
topic: str, msg: Any) -> None:
# # at this node matches everything below — fire now
for s in node.hash_subs:
s.deliver(topic, msg)
if i == len(segments):
for s in node.subs:
s.deliver(topic, msg)
return
seg = segments[i]
child = node.children.get(seg)
if child is not None:
self._match(child, segments, i + 1, topic, msg)
if node.star is not None:
self._match(node.star, segments, i + 1, topic, msg)
Tests
import unittest, time
class TestPubSub(unittest.TestCase):
def _collect(self, ps, sid_buf):
buf = []
def cb(t, m): buf.append((t, m))
sid = ps.subscribe(sid_buf, cb)
return sid, buf
def test_exact_match(self):
ps = PubSub()
sid, buf = self._collect(ps, "a.b")
ps.publish("a.b", 1); ps.publish("a.b.c", 2); ps.publish("a", 3)
time.sleep(0.05)
self.assertEqual(buf, [("a.b", 1)])
ps.unsubscribe(sid)
def test_star_one_segment(self):
ps = PubSub()
sid, buf = self._collect(ps, "a.*")
ps.publish("a.b", 1); ps.publish("a.c", 2)
ps.publish("a.b.c", 3); ps.publish("a", 4)
time.sleep(0.05)
self.assertEqual(sorted(buf), [("a.b", 1), ("a.c", 2)])
ps.unsubscribe(sid)
def test_hash_multi_segment(self):
ps = PubSub()
sid, buf = self._collect(ps, "a.#")
ps.publish("a", 0)
ps.publish("a.b", 1); ps.publish("a.b.c", 2); ps.publish("x.y", 3)
time.sleep(0.05)
# # matches zero or more, so a, a.b, a.b.c all match
self.assertEqual(sorted(buf), [("a", 0), ("a.b", 1), ("a.b.c", 2)])
ps.unsubscribe(sid)
def test_multiple_subscribers(self):
ps = PubSub()
sid1, b1 = self._collect(ps, "topic")
sid2, b2 = self._collect(ps, "topic")
ps.publish("topic", "msg")
time.sleep(0.05)
self.assertEqual(b1, [("topic", "msg")])
self.assertEqual(b2, [("topic", "msg")])
ps.unsubscribe(sid1); ps.unsubscribe(sid2)
Follow-up Questions
(7) Backpressure? Per-subscriber bounded queue with drop-oldest on full (shown). Alternatives: block the publisher (rejected — one slow subscriber stalls the world), drop-newest (loses recent state — rarely the right answer for pub/sub).
(3) Scale to N nodes? Distributed pub/sub is its own discipline. Models: (a) broker-based (Redis Pub/Sub, NATS): central broker fans out. (b) broker-less mesh (pgossip): peers gossip subscriptions; each publish goes to relevant peers. (c) partitioned log (Kafka): no fan-out; consumers tail logs. The trie matcher works locally in any model; the network layer is the harder design.
(2) Persist state? Pure pub/sub is volatile — late subscribers miss messages. To persist, layer a replay log: every publish appends to a durable log; new subscribers can opt-in to read from offset 0 or “latest”. This is essentially Kafka’s design.
(4) Observe / monitor? Per-subscriber drop count, queue depth, throughput. Subscription count gauge. Publish rate counter. p99 publish-to-deliver latency histogram (for the per-subscriber path).
(11) Configuration knobs? Per-subscription queue capacity, on-full policy. Global: max subscriptions, separator character. Knobs not to expose: trie internal layout.
Product Extension
MQTT brokers, AMQP exchanges, Redis Pub/Sub, NATS, ZeroMQ — all use topic-based routing with some wildcard syntax. Cloud Pub/Sub products (AWS SNS, GCP Pub/Sub, Azure Event Grid) add durability, retries, and ordering. The ergonomic difference between MQTT and AMQP wildcards (+/# vs */#) is purely syntactic.
Language/Runtime Follow-ups
- Python: this implementation. The per-subscriber thread approach scales to ~1000 subscribers; beyond, switch to an event loop (asyncio) with a single dispatcher coroutine.
- Java:
EventBus(Guava) is the lightweight in-process pub/sub. For wildcards, MQTT clients (Paho) or Kafka. - Go: channels per subscriber; idiomatic.
nats-serveris the production-grade Go choice. - C++: Boost.Signals2 is the in-process equivalent; no wildcards.
- JS/TS: Node’s
EventEmitteris the in-process equivalent; no wildcards. RxJS for reactive streams.
Common Bugs
- Synchronous callback dispatch from
publish— one slow subscriber stalls everyone. Always use per-subscriber worker threads. - Trie cleanup on unsubscribe: removing from the leaf but leaving empty intermediate nodes. Memory leak; matters at high churn.
#not at end: validate at subscription time.- Not propagating exceptions out of subscriber callbacks (silent failures). Log them.
- Race: subscribing during a publish — the subscription’s callback may or may not see the in-flight message. Document the semantics.
Debugging Strategy
For “missed message”: print the trie state at the matching point and the topic segments. For wildcard surprises, hand-trace the match: which child did we descend into? Did we visit *? Did # fire at the right level? For “callback didn’t run”: check that the worker thread is alive (sub.thread.is_alive()); a callback exception kills the worker if not caught.
Mastery Criteria
- Implemented topic trie + wildcard matching in <30 minutes.
- All four tests pass first run.
- Stated trie-vs-regex tradeoff (trie wins at scale; regex is simpler for few subscriptions).
- Articulated per-subscriber queue isolates slow subscribers.
- Answered follow-ups #2 (replay log), #3, #4, #7, #11.
- Compared topic vs content-based routing and stated when to use each.