Lab 12 — In-Memory Pub/Sub

Goal

Implement an in-process publish-subscribe system with topic-based routing, wildcard subscriptions (a.b.*, a.#), and per-subscriber backpressure. After this lab you should be able to write a clean pub/sub broker in under 30 minutes and articulate the topic-matching design tradeoffs.

Background Concepts

Pub/sub differs from a job queue: subscribers don’t compete for messages; each subscriber receives every matching message. Two routing models:

  1. Topic-based (channel name as a string): "orders.created", "users.signup". Wildcards (* = one segment, # = many segments) come from MQTT/AMQP.
  2. Content-based: subscribers register a predicate over message content. More flexible, much harder to scale (every message must be evaluated against every subscriber’s predicate).

Topic-based with wildcards is the standard. The implementation challenge is the wildcard matcher: a subscription on "orders.*" should match "orders.created" but not "orders.created.fraud". We can solve this with a topic trie (segment-by-segment) for O(L) per match where L is segment count, or a regex per subscription for O(N) per dispatch where N is subscription count. The trie is the production answer for systems with many subscriptions.

Interview Context

Pub/sub design is asked at messaging companies (Confluent, IBM MQ, Solace, AWS SNS), at real-time platforms (Pusher, Ably, Twilio), and broadly at any senior+ design-coding round. The interview wants both code and the design reasoning around routing, backpressure, and subscription matching.

Problem Statement

Design PubSub:

  • subscribe(topic_pattern, on_message) -> subscription_idtopic_pattern may include * (single segment) or # (multi-segment, must be last).
  • unsubscribe(subscription_id)
  • publish(topic, message) — call on_message(topic, message) on every matching subscriber.
  • Per-subscriber callback wrapping for backpressure (queue + drop policy).

Constraints

  • Up to 10^4 active subscriptions
  • Up to 10^5 publishes / second
  • Subscription matching: < 100 µs per publish
  • Per-subscriber callbacks may be slow; must not block the publisher

Clarifying Questions

  1. Wildcard syntax — MQTT (+/#), AMQP (*/#), or other? (Pick AMQP-style: * = one segment, # = ≥ 0 segments at end.)
  2. Synchronous or async callback delivery? (Async with per-subscriber queue is the production answer; simpler synchronous version is acceptable for the basic case.)
  3. Topic separator: . or /? (. is the AMQP convention; either is fine.)
  4. Ordering guarantees? (Per-subscriber: messages arrive in publish-order. Across subscribers: not guaranteed.)
  5. Replay / retain / persistence? (No by default; pure in-memory.)

Examples

ps = PubSub()
sid = ps.subscribe("orders.*", lambda topic, msg: print(f"got {topic}: {msg}"))
ps.publish("orders.created", {"id": 1})         # fires
ps.publish("orders.created.fraud", {"id": 2})   # does NOT fire (* is one segment)

ps.subscribe("orders.#", lambda t, m: log(t, m))
ps.publish("orders.created.fraud", {})          # fires (# matches multi)

ps.unsubscribe(sid)
ps.publish("orders.created", {})                # only the # subscription fires

Initial Brute Force

dict[topic_pattern, list[callback]]. On publish, iterate all subscriptions, regex-match each pattern against the topic. O(N · pattern-cost) per publish where N is subscription count.

Brute Force Complexity

At N=10^4 subscriptions and 10^5 pub/s, this is 10^9 regex matches / sec — too slow. Wildcard regex compile and match dominate.

Optimization Path

A topic trie: each node represents a segment. Children include literal-segment children plus a * and # child for wildcards. Match by walking the trie segment-by-segment, exploring each node’s literal child plus its * child plus (if at end) any # ancestor’s catch-all subscriptions.

Per-publish cost becomes O(L · branching factor) ≈ O(L) for typical trees, where L is the topic depth.

Final Expected Approach

Build a topic trie. Each node has:

  • children: dict[str, Node] — literal subsegments
  • star: Node | None — single-segment wildcard
  • hash_subscriptions: list[Sub]# catch-all (matches everything below this node)
  • subs: list[Sub] — exact matches at this node

Publishing walks the trie segment-by-segment, at each step checking the literal child and the star child; collect matching subs at terminal nodes. Collect hash_subscriptions along the entire path.

Each subscriber owns a per-subscriber bounded queue; publish enqueues to the queue (non-blocking, drops on full); a worker thread per subscriber drains the queue and calls the callback.

Data Structures Used

StructurePurpose
Topic trieO(L) routing
dict[sub_id, Subscription]unregister lookup
deque per subscriber + lockper-subscriber queue
Worker thread per subscriberinvoke callback off the publish path

Correctness Argument

Routing: a subscription A.B.C matches publish topic A.B.C iff the trie walk reaches the node carrying that subscription with all segments consumed. A * matches any single segment (one node-level wildcard step). A # at a node matches any zero-or-more remaining segments — equivalent to attaching a list of “catch-all” subscribers to that node.

Per-subscriber ordering: each subscriber’s queue is FIFO; the worker drains in FIFO order. Therefore subscriber sees messages in publish order.

Publisher non-blocking: publish only enqueues; no subscriber callback runs on the publish thread. Even a callback that takes 1 second doesn’t slow publish.

Complexity

  • subscribe: O(L) for trie insert
  • publish: O(L · F · M) where L = topic depth, F = trie branching, M = matching subscribers
  • unsubscribe: O(L) to walk the trie node and remove

Implementation Requirements

import threading, itertools
from collections import deque
from typing import Callable, Any, Optional

class _Sub:
    __slots__ = ("sub_id", "callback", "queue", "lock", "cond", "capacity", "drops",
                 "alive", "thread")
    def __init__(self, sub_id: int, callback: Callable, capacity: int = 1024):
        self.sub_id = sub_id; self.callback = callback
        self.queue: deque = deque()
        self.lock = threading.Lock()
        self.cond = threading.Condition(self.lock)
        self.capacity = capacity
        self.drops = 0
        self.alive = True
        self.thread: Optional[threading.Thread] = None

    def deliver(self, topic: str, msg: Any) -> None:
        with self.lock:
            if len(self.queue) >= self.capacity:
                self.queue.popleft(); self.drops += 1
            self.queue.append((topic, msg))
            self.cond.notify()

    def stop(self) -> None:
        with self.lock:
            self.alive = False
            self.cond.notify()

    def run(self) -> None:
        while True:
            with self.lock:
                while self.alive and not self.queue:
                    self.cond.wait()
                if not self.alive and not self.queue:
                    return
                topic, msg = self.queue.popleft()
            try:
                self.callback(topic, msg)
            except Exception:
                pass    # don't kill worker on bad callback


class _Node:
    __slots__ = ("children", "star", "subs", "hash_subs")
    def __init__(self):
        self.children: dict[str, _Node] = {}
        self.star: Optional[_Node] = None
        self.subs: list[_Sub] = []
        self.hash_subs: list[_Sub] = []


class PubSub:
    def __init__(self, separator: str = "."):
        self._sep = separator
        self._root = _Node()
        self._subs: dict[int, tuple[_Sub, list[str]]] = {}  # id -> (sub, pattern segments)
        self._next_id = itertools.count(1)
        self._lock = threading.RLock()

    def subscribe(self, pattern: str, callback: Callable[[str, Any], None],
                  queue_capacity: int = 1024) -> int:
        segments = pattern.split(self._sep)
        sid = next(self._next_id)
        sub = _Sub(sid, callback, queue_capacity)
        with self._lock:
            node = self._root
            for i, seg in enumerate(segments):
                if seg == "#":
                    if i != len(segments) - 1:
                        raise ValueError("# must be the last segment")
                    node.hash_subs.append(sub)
                    break
                if seg == "*":
                    if node.star is None: node.star = _Node()
                    node = node.star
                else:
                    node = node.children.setdefault(seg, _Node())
            else:
                node.subs.append(sub)
            self._subs[sid] = (sub, segments)
        sub.thread = threading.Thread(target=sub.run, daemon=True)
        sub.thread.start()
        return sid

    def unsubscribe(self, sub_id: int) -> None:
        with self._lock:
            entry = self._subs.pop(sub_id, None)
            if entry is None: return
            sub, segments = entry
            self._remove_from_trie(self._root, segments, 0, sub)
        sub.stop()
        sub.thread.join(timeout=1.0)

    def _remove_from_trie(self, node: _Node, segments: list[str],
                          i: int, sub: _Sub) -> None:
        if i == len(segments):
            try: node.subs.remove(sub)
            except ValueError: pass
            return
        seg = segments[i]
        if seg == "#":
            try: node.hash_subs.remove(sub)
            except ValueError: pass
            return
        nxt = node.star if seg == "*" else node.children.get(seg)
        if nxt is not None:
            self._remove_from_trie(nxt, segments, i + 1, sub)

    def publish(self, topic: str, message: Any) -> None:
        segments = topic.split(self._sep)
        with self._lock:
            self._match(self._root, segments, 0, topic, message)

    def _match(self, node: _Node, segments: list[str], i: int,
               topic: str, msg: Any) -> None:
        # # at this node matches everything below — fire now
        for s in node.hash_subs:
            s.deliver(topic, msg)
        if i == len(segments):
            for s in node.subs:
                s.deliver(topic, msg)
            return
        seg = segments[i]
        child = node.children.get(seg)
        if child is not None:
            self._match(child, segments, i + 1, topic, msg)
        if node.star is not None:
            self._match(node.star, segments, i + 1, topic, msg)

Tests

import unittest, time

class TestPubSub(unittest.TestCase):
    def _collect(self, ps, sid_buf):
        buf = []
        def cb(t, m): buf.append((t, m))
        sid = ps.subscribe(sid_buf, cb)
        return sid, buf

    def test_exact_match(self):
        ps = PubSub()
        sid, buf = self._collect(ps, "a.b")
        ps.publish("a.b", 1); ps.publish("a.b.c", 2); ps.publish("a", 3)
        time.sleep(0.05)
        self.assertEqual(buf, [("a.b", 1)])
        ps.unsubscribe(sid)

    def test_star_one_segment(self):
        ps = PubSub()
        sid, buf = self._collect(ps, "a.*")
        ps.publish("a.b", 1); ps.publish("a.c", 2)
        ps.publish("a.b.c", 3); ps.publish("a", 4)
        time.sleep(0.05)
        self.assertEqual(sorted(buf), [("a.b", 1), ("a.c", 2)])
        ps.unsubscribe(sid)

    def test_hash_multi_segment(self):
        ps = PubSub()
        sid, buf = self._collect(ps, "a.#")
        ps.publish("a", 0)
        ps.publish("a.b", 1); ps.publish("a.b.c", 2); ps.publish("x.y", 3)
        time.sleep(0.05)
        # # matches zero or more, so a, a.b, a.b.c all match
        self.assertEqual(sorted(buf), [("a", 0), ("a.b", 1), ("a.b.c", 2)])
        ps.unsubscribe(sid)

    def test_multiple_subscribers(self):
        ps = PubSub()
        sid1, b1 = self._collect(ps, "topic")
        sid2, b2 = self._collect(ps, "topic")
        ps.publish("topic", "msg")
        time.sleep(0.05)
        self.assertEqual(b1, [("topic", "msg")])
        self.assertEqual(b2, [("topic", "msg")])
        ps.unsubscribe(sid1); ps.unsubscribe(sid2)

Follow-up Questions

(7) Backpressure? Per-subscriber bounded queue with drop-oldest on full (shown). Alternatives: block the publisher (rejected — one slow subscriber stalls the world), drop-newest (loses recent state — rarely the right answer for pub/sub).

(3) Scale to N nodes? Distributed pub/sub is its own discipline. Models: (a) broker-based (Redis Pub/Sub, NATS): central broker fans out. (b) broker-less mesh (pgossip): peers gossip subscriptions; each publish goes to relevant peers. (c) partitioned log (Kafka): no fan-out; consumers tail logs. The trie matcher works locally in any model; the network layer is the harder design.

(2) Persist state? Pure pub/sub is volatile — late subscribers miss messages. To persist, layer a replay log: every publish appends to a durable log; new subscribers can opt-in to read from offset 0 or “latest”. This is essentially Kafka’s design.

(4) Observe / monitor? Per-subscriber drop count, queue depth, throughput. Subscription count gauge. Publish rate counter. p99 publish-to-deliver latency histogram (for the per-subscriber path).

(11) Configuration knobs? Per-subscription queue capacity, on-full policy. Global: max subscriptions, separator character. Knobs not to expose: trie internal layout.

Product Extension

MQTT brokers, AMQP exchanges, Redis Pub/Sub, NATS, ZeroMQ — all use topic-based routing with some wildcard syntax. Cloud Pub/Sub products (AWS SNS, GCP Pub/Sub, Azure Event Grid) add durability, retries, and ordering. The ergonomic difference between MQTT and AMQP wildcards (+/# vs */#) is purely syntactic.

Language/Runtime Follow-ups

  • Python: this implementation. The per-subscriber thread approach scales to ~1000 subscribers; beyond, switch to an event loop (asyncio) with a single dispatcher coroutine.
  • Java: EventBus (Guava) is the lightweight in-process pub/sub. For wildcards, MQTT clients (Paho) or Kafka.
  • Go: channels per subscriber; idiomatic. nats-server is the production-grade Go choice.
  • C++: Boost.Signals2 is the in-process equivalent; no wildcards.
  • JS/TS: Node’s EventEmitter is the in-process equivalent; no wildcards. RxJS for reactive streams.

Common Bugs

  1. Synchronous callback dispatch from publish — one slow subscriber stalls everyone. Always use per-subscriber worker threads.
  2. Trie cleanup on unsubscribe: removing from the leaf but leaving empty intermediate nodes. Memory leak; matters at high churn.
  3. # not at end: validate at subscription time.
  4. Not propagating exceptions out of subscriber callbacks (silent failures). Log them.
  5. Race: subscribing during a publish — the subscription’s callback may or may not see the in-flight message. Document the semantics.

Debugging Strategy

For “missed message”: print the trie state at the matching point and the topic segments. For wildcard surprises, hand-trace the match: which child did we descend into? Did we visit *? Did # fire at the right level? For “callback didn’t run”: check that the worker thread is alive (sub.thread.is_alive()); a callback exception kills the worker if not caught.

Mastery Criteria

  • Implemented topic trie + wildcard matching in <30 minutes.
  • All four tests pass first run.
  • Stated trie-vs-regex tradeoff (trie wins at scale; regex is simpler for few subscriptions).
  • Articulated per-subscriber queue isolates slow subscribers.
  • Answered follow-ups #2 (replay log), #3, #4, #7, #11.
  • Compared topic vs content-based routing and stated when to use each.