Lab 06 — Durable Job Queue

Goal

Implement a job queue with at-least-once delivery semantics, idempotency keys, and visibility timeouts. After this lab you should be able to articulate why exactly-once delivery is impractical, design at-least-once with idempotency on the consumer, and implement an in-memory queue that simulates SQS-style semantics in under 35 minutes.

Background Concepts

A durable job queue accepts jobs from producers and delivers them to consumers, surviving consumer crashes without losing work. The three classical delivery semantics:

  • At-most-once: deliver, forget. Fast but loses jobs on crash. Acceptable for fire-and-forget telemetry.
  • At-least-once: deliver, retry until acknowledged. Jobs may be delivered multiple times. Requires consumers to be idempotent.
  • Exactly-once: impossible in a distributed system without two-phase commit between queue and consumer. The “exactly-once” branding in real systems (Kafka, Pulsar) means “exactly-once processing semantics given idempotent consumers” — which is at-least-once + idempotency.

The standard primitive that enables at-least-once is the visibility timeout: when a consumer dequeues a job, the queue marks it “in-flight” with a TTL. If the consumer acks before TTL, the job is deleted. If TTL expires (consumer crashed), the job becomes visible again and is redelivered. The consumer must be idempotent because the same job may be processed twice if ack was lost in transit.

Interview Context

This is the central problem in any interview at AWS (SQS), GCP (Pub/Sub), Confluent (Kafka), or any infrastructure team. The interviewer wants to hear: “exactly-once is impractical because of the two-generals problem; at-least-once with idempotency keys is the production answer; visibility timeout is how we implement it.” Then they want to see you build a small version that demonstrates understanding.

Problem Statement

Design JobQueue:

  • enqueue(payload, idempotency_key=None) -> job_id — push. If idempotency_key is non-None and matches a recent job, deduplicate.
  • dequeue(visibility_timeout=30.0) -> Job | None — pop a visible job; mark in-flight with TTL.
  • ack(job_id) — confirm successful processing; permanently delete.
  • nack(job_id, requeue_delay=0) — release back; optionally with a delay.
  • Background scavenger: jobs whose visibility TTL has expired return to visible state.

Constraints

  • Up to 10^5 in-flight jobs
  • 10^4 enqueues / second
  • Single-process in-memory; persistence is a follow-up

Clarifying Questions

  1. FIFO or best-effort ordering? (Best-effort is standard SQS; FIFO costs more.)
  2. Visibility timeout: per-call or queue default? (Per-call, with queue default.)
  3. Idempotency key TTL — how long do we dedupe? (Typically 5 minutes; configurable.)
  4. Max retries before DLQ? (Often a follow-up; default unlimited.)
  5. What happens on nack? (Requeue, optionally with a delay; this is the natural retry path.)

Examples

q = JobQueue()
job_id = q.enqueue("send-email-123", idempotency_key="email-abc")
q.enqueue("send-email-123", idempotency_key="email-abc")  # dedup; same job_id

job = q.dequeue(visibility_timeout=10)
# … process …
q.ack(job.job_id)  # done

# Crash scenario:
job = q.dequeue(visibility_timeout=10)
# consumer crashes, never acks
# 11 seconds later:
job_again = q.dequeue()    # same payload, same job_id, redelivery_count=2

Initial Brute Force

A list of jobs and a single mutex. dequeue pops the head, sets in-flight; ack removes; nack re-prepends. No visibility timeout. Loses jobs on crash.

Brute Force Complexity

O(1) per op for a deque, O(N) if implemented over a list with re-prepend. Fundamentally wrong for at-least-once because there’s no scavenger.

Optimization Path

Add: (a) visible: deque[Job] — jobs ready to be dequeued; (b) in_flight: dict[job_id, (Job, expires_at)] — taken but not acked; (c) idempotency: dict[key, job_id] with TTL for dedup; (d) a background scavenger that moves expired in-flight jobs back to visible. The scavenger can be lazy (check on each dequeue) instead of a dedicated thread.

Final Expected Approach

Use deque for visible, dict for in-flight with (job, expires_at), dict for idempotency cache. Each dequeue first sweeps in_flight for expired entries (move them to the front of visible). All operations under a single lock — the queue is fast, sub-millisecond critical sections.

Data Structures Used

StructurePurpose
deque[Job]visible jobs, FIFO best-effort
dict[job_id, (Job, expires_at)]in-flight tracking
dict[idempotency_key, job_id]dedup cache
Locksingle-process atomicity

Correctness Argument

No lost jobs (at-least-once): every job is in exactly one of three states: visible, in_flight, or acked (deleted). Transitions: enqueue → visible; dequeue → in_flight; ack → deleted; nack → visible; scavenger → visible (from in_flight on TTL expiry). No transition discards a job before ack. Therefore, until acked, every job remains in the system and will eventually be redelivered.

Idempotency dedup: if idempotency_key matches a job in either visible or in_flight (or recently acked, within the dedup window), enqueue returns the existing job_id without creating a new job. This makes producer retries safe.

At-least-once, not exactly-once: a consumer that successfully processes the job and crashes before sending ack will see the same job redelivered. The consumer must idempotent-key the work it does (e.g., the email service must dedupe by email-abc).

Complexity

  • enqueue: O(1)
  • dequeue: O(K) where K is the number of expired in-flight entries swept (amortized O(1))
  • ack / nack: O(1)

Implementation Requirements

import threading, time, itertools
from collections import deque
from dataclasses import dataclass
from typing import Optional, Any

@dataclass
class Job:
    job_id: int
    payload: Any
    delivery_count: int
    enqueued_at: float

class JobQueue:
    def __init__(self, default_visibility_timeout: float = 30.0,
                 idempotency_ttl: float = 300.0):
        self._visible: deque[Job] = deque()
        self._in_flight: dict[int, tuple[Job, float]] = {}     # id -> (job, expires_at)
        self._idem: dict[str, tuple[int, float]] = {}          # key -> (job_id, expires_at)
        self._default_vt = default_visibility_timeout
        self._idem_ttl = idempotency_ttl
        self._lock = threading.Lock()
        self._next_id = itertools.count(1)

    def enqueue(self, payload: Any, idempotency_key: Optional[str] = None) -> int:
        now = time.monotonic()
        with self._lock:
            self._sweep_idem(now)
            if idempotency_key is not None and idempotency_key in self._idem:
                existing_id, _ = self._idem[idempotency_key]
                return existing_id
            job_id = next(self._next_id)
            job = Job(job_id, payload, delivery_count=0, enqueued_at=now)
            self._visible.append(job)
            if idempotency_key is not None:
                self._idem[idempotency_key] = (job_id, now + self._idem_ttl)
            return job_id

    def dequeue(self, visibility_timeout: Optional[float] = None) -> Optional[Job]:
        vt = visibility_timeout if visibility_timeout is not None else self._default_vt
        now = time.monotonic()
        with self._lock:
            self._sweep_in_flight(now)
            if not self._visible:
                return None
            job = self._visible.popleft()
            job.delivery_count += 1
            self._in_flight[job.job_id] = (job, now + vt)
            return job

    def ack(self, job_id: int) -> bool:
        with self._lock:
            return self._in_flight.pop(job_id, None) is not None

    def nack(self, job_id: int, requeue_delay: float = 0.0) -> bool:
        now = time.monotonic()
        with self._lock:
            entry = self._in_flight.pop(job_id, None)
            if entry is None:
                return False
            job, _ = entry
            if requeue_delay > 0:
                # For simplicity, treat delay as a delayed visibility:
                # park as in-flight with expires_at = now + delay.
                self._in_flight[job_id] = (job, now + requeue_delay)
            else:
                self._visible.appendleft(job)   # head, so it's seen first
            return True

    def stats(self) -> dict:
        with self._lock:
            return {
                "visible": len(self._visible),
                "in_flight": len(self._in_flight),
                "dedup_keys": len(self._idem),
            }

    def _sweep_in_flight(self, now: float) -> None:
        expired = [(jid, j) for jid, (j, t) in self._in_flight.items() if t <= now]
        for jid, j in expired:
            del self._in_flight[jid]
            self._visible.appendleft(j)        # redelivery: front-load

    def _sweep_idem(self, now: float) -> None:
        # Lazy: only keep recent entries. O(N) but called infrequently.
        if len(self._idem) > 4096:
            self._idem = {k: v for k, v in self._idem.items() if v[1] > now}

Tests

import unittest, time

class TestJobQueue(unittest.TestCase):
    def test_basic_enqueue_ack(self):
        q = JobQueue()
        jid = q.enqueue("hello")
        job = q.dequeue()
        self.assertEqual(job.job_id, jid)
        self.assertTrue(q.ack(jid))
        self.assertIsNone(q.dequeue())

    def test_idempotency_dedup(self):
        q = JobQueue()
        a = q.enqueue("x", idempotency_key="k1")
        b = q.enqueue("x", idempotency_key="k1")
        self.assertEqual(a, b)
        self.assertEqual(q.stats()["visible"], 1)

    def test_visibility_timeout_redelivery(self):
        q = JobQueue(default_visibility_timeout=0.1)
        jid = q.enqueue("x")
        job1 = q.dequeue()
        time.sleep(0.15)
        job2 = q.dequeue()
        self.assertEqual(job1.job_id, job2.job_id)
        self.assertEqual(job2.delivery_count, 2)
        q.ack(job2.job_id)

    def test_nack_requeue(self):
        q = JobQueue()
        jid = q.enqueue("x")
        job = q.dequeue()
        q.nack(jid)
        job2 = q.dequeue()
        self.assertEqual(job.job_id, job2.job_id)
        self.assertEqual(job2.delivery_count, 2)

Follow-up Questions

(2) Persist state across restarts? Three layers: (a) WAL: every enqueue, ack, nack is appended to a log; on boot, replay. (b) Snapshot: periodic full state dump. (c) Combined: snapshot every N seconds, WAL between snapshots; recovery = latest snapshot + log replay since. SQS uses a replicated multi-AZ store; for an interview, WAL is the right answer.

(8) Partial failure? That’s the entire point of visibility timeout. Consumer crashes mid-processing → TTL expires → job redelivered. The consumer is responsible for idempotency. The queue is responsible for delivering at-least-once.

(9) Eviction / cleanup? Stale in-flight entries (consumer crashed and never acked) are swept on every dequeue. The idempotency cache TTL bounds dedup memory. DLQ (not implemented above) would catch jobs after N redeliveries — a follow-up to add.

(10) Consistency model? Linearizable per-job in a single process; redelivery breaks “exactly-once” but preserves “every job is processed at least once”. Replicated: consensus (Raft) for the metadata, leader-based delivery, replicated log for durability.

(11) Configuration knobs? default_visibility_timeout, idempotency_ttl, max_redeliveries (DLQ trigger), dlq_handler. Knobs not to expose: internal sweep cadence.

(12) Shutdown? On shutdown, refuse new enqueues, sweep in-flight back to visible (so consumers don’t redeliver after restart with stale TTLs), persist state, exit. The graceful invariant: no in-flight at shutdown time.

Product Extension

This is a simplified SQS / Cloud Pub/Sub / Azure Service Bus. Real systems add: replication for durability (across hosts/AZs), partitioning for throughput (multiple shards), DLQ as a separate queue with its own retention, FIFO ordering as an opt-in higher-cost mode, and ordering keys (per-key FIFO with cross-key parallelism — Kafka’s model). Kafka explicitly doesn’t have visibility timeouts; it uses offset-based delivery with consumer-managed checkpoints, which is a different design point.

Language/Runtime Follow-ups

  • Python: this implementation. For high-throughput, sharded queues with per-shard locks scale better than the single global lock.
  • Java: ArrayBlockingQueue is too simple (no visibility timeout). The right reference is java.util.concurrent.DelayQueue for visibility, plus a ConcurrentHashMap for in-flight tracking. Production: ActiveMQ, RabbitMQ.
  • Go: a channel-based implementation works for visible queue; in-flight is a sync.Map; sweeper is a goroutine. NATS JetStream is the production-grade Go choice.
  • C++: roll-your-own with std::deque + std::unordered_map + std::mutex. Boost has thread-safe queue templates.
  • JS/TS: BullMQ (Redis-backed) is the de-facto Node choice; visibility timeout is implemented via Redis sorted sets.

Common Bugs

  1. Idempotency cache that never expires — memory leak.
  2. Sweeping in-flight in a separate thread without coordinating the lock — races with dequeue. Lazy sweep on dequeue (as shown) avoids the extra thread.
  3. Forgetting to increment delivery_count on redelivery — alerting can’t detect poison-pill jobs (jobs that always crash consumers).
  4. nack with delay implemented by sleeping — blocks the consumer that called nack instead of just delaying re-visibility.
  5. Treating idempotency dedup as global / forever — if the dedup window is too long, retries after intentional re-submission are silently dropped.

Debugging Strategy

Print stats() periodically to track visible / in-flight counts. A growing in-flight count without acks → consumers are crashing or hanging. Stuck visible count → no consumers are running. For “duplicate processing” complaints, capture the redelivery-count distribution; high tail = consumers crashing or visibility timeout too short.

Mastery Criteria

  • Implemented in <40 minutes from blank screen.
  • Stated “exactly-once is impractical; at-least-once + idempotency is the answer” without prompting.
  • All four tests pass first run.
  • Articulated visibility timeout, idempotency keys, and DLQ design in <90 seconds.
  • Answered follow-ups #2, #8, #10, #11, #12.
  • Compared SQS-style (visibility timeout) vs Kafka-style (offset-based) delivery in <60 seconds.