Lab 04 — Task Scheduler

Goal

Implement a TaskScheduler that accepts tasks with priorities, executes them in priority order via a worker pool, retries on failure with exponential backoff, and routes permanently-failed tasks to a dead-letter queue. After this lab you should be able to design and implement a small in-memory task queue with retry semantics in under 35 minutes.

Background Concepts

A task scheduler is the in-memory cousin of Celery / Sidekiq / RQ. The four moving parts are:

  1. Priority queue of pending tasks (heap keyed on priority + scheduled-execution-time).
  2. Worker pool that pops tasks and runs them.
  3. Retry policy that decides if and when a failed task is re-enqueued (with delayed visibility).
  4. Dead-letter queue (DLQ) for tasks that have exhausted retries.

The non-trivial design question is how to handle delayed re-enqueue for retries. The clean answer is to use a single priority queue keyed by (priority, ready_at), and have workers wait_until_ready on the head of the queue. This unifies “high-priority now” and “low-priority retry-in-30-seconds” under one data structure.

Interview Context

Task scheduler problems are popular at infrastructure-heavy companies (Uber, Cloudflare, AWS Lambda team, Datadog) because they touch concurrency, priority queues, retry semantics, and DLQ design — the building blocks of every async-job system. The interviewer will probe whether you’ve thought about idempotency, exactly-once vs at-least-once, and observability.

Problem Statement

Design TaskScheduler(n_workers, max_retries):

  • submit(task: Callable, priority: int, max_attempts: int = 3) -> task_id — enqueue a task with given priority. Lower numeric priority = runs first.
  • start() — start the worker pool.
  • shutdown(timeout) — stop accepting new tasks; finish in-flight up to timeout; return.
  • dead_letters() -> list[FailedTask] — return tasks that exhausted retries.

Behavior: failures (raised exception) → exponential backoff retry up to max_attempts; permanent failure → DLQ.

Constraints

  • Up to 10^4 pending tasks
  • Up to 100 workers
  • Per-task max execution time: 60s (a configurable per-task timeout is a follow-up)
  • Tasks may be of arbitrary type but assumed to be deterministic-ish

Clarifying Questions

  1. Are tasks idempotent? (We’ll assume yes; idempotency is the user’s responsibility for at-least-once correctness.)
  2. Priority semantics: lower = higher? (Yes by convention, like a min-heap.)
  3. What does retry mean — the same task is re-run, or a new attempt object? (Same callable, same args, attempt counter incremented.)
  4. Should retries preserve original priority? (Yes by convention.)
  5. Cancellation? (Often a follow-up; default no.)

Examples

sched = TaskScheduler(n_workers=2, max_retries=3)
sched.start()
sched.submit(lambda: print("a"), priority=1)
sched.submit(lambda: print("b"), priority=0)   # runs first
sched.submit(failing_task, priority=2, max_attempts=2)
                                                # 1st attempt fails; retry after backoff
                                                # 2nd attempt fails; → DLQ
sched.shutdown(timeout=5.0)
sched.dead_letters()  # [FailedTask(failing_task, attempts=2, last_error=...)]

Initial Brute Force

A single thread polling a list sorted by priority. Run, retry inline. Single worker, no parallelism. O(N log N) per submit.

Brute Force Complexity

Per submit: O(N log N) on the sort. Per dispatch: O(N) on the linear scan. Acceptable only for tens of tasks.

Optimization Path

Replace the sorted list with a heapq. Replace the single thread with a worker pool of n_workers threads. Add a Condition variable so workers block when the queue is empty. Add a delayed-execution facility: instead of time.sleep(backoff) in the worker, push the task back with ready_at = now + backoff and key the heap on (ready_at, priority).

Final Expected Approach

Single heap (ready_at, priority, attempt, task_id, callable). A condition variable not_empty wakes workers when something becomes available. Workers loop: peek heap → if ready_at > now, wait until ready_at (or until notified). Pop, run, on success: done. On failure: increment attempt, if under max_attempts, push back with ready_at = now + backoff(attempt); else push to DLQ list. shutdown sets a flag, broadcasts the condition, joins all workers with a deadline.

Data Structures Used

StructurePurpose
heapq of tuplespriority + delayed-readiness
threading.Conditionwait/notify for empty queue and ready-time
list (DLQ)failed tasks
dict[task_id, attempt_count]retry tracking

Correctness Argument

Priority ordering: heap orders by (ready_at, priority); workers always pop the smallest. When ready_at <= now, this is the highest-priority ready task. Ties on ready_at go to the smaller priority — correct.

Retry semantics: failure → push back with attempt+1, ready_at = now + 2^attempt * base + jitter. After max_attempts attempts, push to DLQ. The task is never lost: it is either running, in the heap, or in the DLQ — invariant maintained by every transition.

Shutdown: setting _stopping = True and broadcasting wakes every blocked worker. Each worker checks _stopping after the wait and exits if true. The join(timeout) per worker bounds total shutdown time.

Complexity

  • submit: O(log N)
  • Worker dispatch: O(log N) per task
  • Memory: O(pending + dlq)

Implementation Requirements

import heapq, threading, time, itertools, random, traceback
from dataclasses import dataclass, field
from typing import Callable, Any

@dataclass
class FailedTask:
    task_id: int
    callable_repr: str
    attempts: int
    last_error: str

@dataclass(order=True)
class _Heap_Entry:
    ready_at: float
    priority: int
    seq: int
    task_id: int = field(compare=False)
    fn: Callable = field(compare=False)
    attempt: int = field(compare=False)
    max_attempts: int = field(compare=False)


class TaskScheduler:
    def __init__(self, n_workers: int = 4, base_backoff: float = 0.1):
        self._n_workers = n_workers
        self._base = base_backoff
        self._heap: list[_Heap_Entry] = []
        self._dlq: list[FailedTask] = []
        self._cond = threading.Condition()
        self._stopping = False
        self._workers: list[threading.Thread] = []
        self._seq = itertools.count()
        self._next_id = itertools.count(1)

    def submit(self, fn: Callable, priority: int = 5, max_attempts: int = 3) -> int:
        if max_attempts <= 0:
            raise ValueError("max_attempts must be positive")
        tid = next(self._next_id)
        e = _Heap_Entry(time.monotonic(), priority, next(self._seq),
                        tid, fn, attempt=0, max_attempts=max_attempts)
        with self._cond:
            heapq.heappush(self._heap, e)
            self._cond.notify()
        return tid

    def start(self) -> None:
        for i in range(self._n_workers):
            t = threading.Thread(target=self._run_worker, name=f"w{i}", daemon=True)
            t.start()
            self._workers.append(t)

    def shutdown(self, timeout: float = 5.0) -> None:
        with self._cond:
            self._stopping = True
            self._cond.notify_all()
        deadline = time.monotonic() + timeout
        for w in self._workers:
            w.join(timeout=max(0.0, deadline - time.monotonic()))

    def dead_letters(self) -> list[FailedTask]:
        with self._cond:
            return list(self._dlq)

    def _run_worker(self) -> None:
        while True:
            with self._cond:
                while not self._heap and not self._stopping:
                    self._cond.wait()
                if self._stopping and not self._heap:
                    return
                head = self._heap[0]
                wait = head.ready_at - time.monotonic()
                if wait > 0:
                    self._cond.wait(timeout=wait)
                    continue
                e = heapq.heappop(self._heap)
            try:
                e.fn()
            except Exception as ex:
                e.attempt += 1
                if e.attempt >= e.max_attempts:
                    with self._cond:
                        self._dlq.append(FailedTask(
                            e.task_id, repr(e.fn), e.attempt,
                            f"{type(ex).__name__}: {ex}"))
                else:
                    backoff = self._base * (2 ** (e.attempt - 1))
                    backoff *= 0.5 + random.random()      # jitter [0.5, 1.5]
                    e.ready_at = time.monotonic() + backoff
                    e.seq = next(self._seq)
                    with self._cond:
                        heapq.heappush(self._heap, e)
                        self._cond.notify()

Tests

import unittest, time, threading

class TestScheduler(unittest.TestCase):
    def test_priority_order(self):
        order = []
        sched = TaskScheduler(n_workers=1)
        sched.start()
        for p in [3, 1, 2]:
            sched.submit((lambda x=p: order.append(x)), priority=p)
        time.sleep(0.5)
        sched.shutdown(timeout=2.0)
        self.assertEqual(order, [1, 2, 3])

    def test_retry_then_dlq(self):
        attempts = []
        def always_fail():
            attempts.append(1)
            raise RuntimeError("boom")
        sched = TaskScheduler(n_workers=1, base_backoff=0.01)
        sched.start()
        sched.submit(always_fail, priority=0, max_attempts=3)
        time.sleep(1.0)
        sched.shutdown(timeout=2.0)
        self.assertEqual(len(attempts), 3)
        self.assertEqual(len(sched.dead_letters()), 1)

    def test_concurrent_submit(self):
        results = []
        sched = TaskScheduler(n_workers=4)
        sched.start()
        def push(i):
            for j in range(50):
                sched.submit((lambda x=(i, j): results.append(x)), priority=0)
        threads = [threading.Thread(target=push, args=(i,)) for i in range(4)]
        for t in threads: t.start()
        for t in threads: t.join()
        time.sleep(0.5)
        sched.shutdown(timeout=2.0)
        self.assertEqual(len(results), 200)

Follow-up Questions

(2) Persist state across restarts? Tasks live in memory and are lost on restart. To persist, choose: (a) write each submit to a WAL; on boot, replay; on completion, append a “done” marker. (b) Snapshot the heap periodically and write a delta log. The DLQ should be persisted regardless — losing failed tasks is the worst outcome because nobody knows why a job didn’t run.

(8) Partial failure? The interesting case: a worker pops a task and crashes mid-execution. The task is now lost (it’s not in the heap and it didn’t complete). Solution: at-least-once via visibility timeout — the heap pops the task to an “in-flight” map with a TTL; if the worker doesn’t ack before TTL, the task returns to the heap. Idempotency keys make this safe. This is the SQS / Cloud Tasks model.

(9) Eviction / cleanup? The DLQ grows unbounded. Either: cap its size and drop oldest, retain a sliding-window of the last N failures, or persist to durable storage and prune from memory after a TTL. Always emit a per-task DLQ event so downstream alerting can fire.

(11) Configuration knobs? n_workers, base_backoff, default max_attempts. Per-task: priority, max_attempts, optionally timeout. Knobs not to expose: jitter strategy (use decorrelated jitter), heap implementation.

(12) Shutdown / draining? Two modes: graceful (stop accepting; wait for in-flight; return) and forceful (stop accepting; abandon in-flight; return immediately). Always offer both. Default to graceful with a deadline.

(13) Poison pill? A task that always crashes the worker (segfault, unhandled OS exception, infinite loop). Run tasks in subprocess isolation (or with a cooperative timeout). Blacklist by hash of (callable, args) after N consecutive crashes.

Product Extension

This is the heart of Celery, Sidekiq, RQ, AWS SQS + Lambda, GCP Cloud Tasks, and Temporal. Real systems add: visibility timeouts (the in-flight TTL), distributed coordination (multiple workers across hosts), durable storage (RDBMS or Redis with persistence), scheduling (cron-like time-based triggers), and workflow orchestration (Temporal). The core is what you wrote here.

Language/Runtime Follow-ups

  • Python: GIL means worker threads don’t parallelize CPU work. For CPU-bound tasks, use a ProcessPoolExecutor instead. The implementation above is fine for I/O-bound tasks.
  • Java: ScheduledThreadPoolExecutor is the textbook fit — submit with a delay, retries via re-submission. RetryTemplate (Spring Retry) for the policy. DeadLetterPublishingRecoverer (Kafka).
  • Go: a single channel of tasks plus N goroutines; for delayed retry, use time.AfterFunc to push back to the channel. Or use golang.org/x/sync/errgroup for the worker pool.
  • C++: a std::priority_queue plus condition variable. Tasks as std::function<void()>.
  • JS/TS: not concurrent (single event loop), but BullMQ (Redis-backed) is the de-facto Node task queue.

Common Bugs

  1. Workers spinning when the heap head is in the future — wait ready_at - now exactly, not poll-loop.
  2. Notifying only one worker on submit (notify) but notify_all on shutdown — fine, but check that the heap-shrink case (the popper sees head.ready_at > now and sleeps) doesn’t miss a wakeup when a higher-priority task is pushed during the sleep.
  3. Forgetting to update e.seq on re-push — the heap entry’s identity matters for tie-breaking, but Python’s heapq compares the full tuple in order, so missing seq updates can cause the same (ready_at, priority, seq) to compare equal and the comparison to fall through to the un-comparable Callable. Always bump seq.
  4. Catching Exception but letting BaseException (e.g., KeyboardInterrupt) escape — workers die silently. Catch BaseException with care, or at minimum catch Exception and log unexpected escapes.
  5. DLQ growing forever — see follow-up #9.

Debugging Strategy

Add a worker trace: every transition (pop, run-start, run-end, retry, dlq) gets a log line with (worker_id, task_id, ts). Replay the log to see the timeline. For “task didn’t run” bugs, walk the heap state at submit time and check that notify was called. For shutdown deadlocks, take a thread dump (Python: faulthandler.dump_traceback_later()) — usually a worker is blocked on wait because notify_all was missed.

Mastery Criteria

  • Implemented in <40 minutes; <30 on second attempt.
  • All three tests pass.
  • Articulated visibility-timeout / at-least-once vs lost-on-crash tradeoff in <90 seconds.
  • Answered follow-ups #2, #8, #9, #12, #13 crisply.
  • Added at-least-once semantics in <15 minutes when prompted.
  • Stated why decorrelated jitter beats fixed jitter in retry backoff.