Lab 04 — Task Scheduler
Goal
Implement a TaskScheduler that accepts tasks with priorities, executes them in priority order via a worker pool, retries on failure with exponential backoff, and routes permanently-failed tasks to a dead-letter queue. After this lab you should be able to design and implement a small in-memory task queue with retry semantics in under 35 minutes.
Background Concepts
A task scheduler is the in-memory cousin of Celery / Sidekiq / RQ. The four moving parts are:
- Priority queue of pending tasks (heap keyed on priority + scheduled-execution-time).
- Worker pool that pops tasks and runs them.
- Retry policy that decides if and when a failed task is re-enqueued (with delayed visibility).
- Dead-letter queue (DLQ) for tasks that have exhausted retries.
The non-trivial design question is how to handle delayed re-enqueue for retries. The clean answer is to use a single priority queue keyed by (priority, ready_at), and have workers wait_until_ready on the head of the queue. This unifies “high-priority now” and “low-priority retry-in-30-seconds” under one data structure.
Interview Context
Task scheduler problems are popular at infrastructure-heavy companies (Uber, Cloudflare, AWS Lambda team, Datadog) because they touch concurrency, priority queues, retry semantics, and DLQ design — the building blocks of every async-job system. The interviewer will probe whether you’ve thought about idempotency, exactly-once vs at-least-once, and observability.
Problem Statement
Design TaskScheduler(n_workers, max_retries):
submit(task: Callable, priority: int, max_attempts: int = 3) -> task_id— enqueue a task with given priority. Lower numeric priority = runs first.start()— start the worker pool.shutdown(timeout)— stop accepting new tasks; finish in-flight up to timeout; return.dead_letters() -> list[FailedTask]— return tasks that exhausted retries.
Behavior: failures (raised exception) → exponential backoff retry up to max_attempts; permanent failure → DLQ.
Constraints
- Up to 10^4 pending tasks
- Up to 100 workers
- Per-task max execution time: 60s (a configurable per-task timeout is a follow-up)
- Tasks may be of arbitrary type but assumed to be deterministic-ish
Clarifying Questions
- Are tasks idempotent? (We’ll assume yes; idempotency is the user’s responsibility for at-least-once correctness.)
- Priority semantics: lower = higher? (Yes by convention, like a min-heap.)
- What does retry mean — the same task is re-run, or a new attempt object? (Same callable, same args, attempt counter incremented.)
- Should retries preserve original priority? (Yes by convention.)
- Cancellation? (Often a follow-up; default no.)
Examples
sched = TaskScheduler(n_workers=2, max_retries=3)
sched.start()
sched.submit(lambda: print("a"), priority=1)
sched.submit(lambda: print("b"), priority=0) # runs first
sched.submit(failing_task, priority=2, max_attempts=2)
# 1st attempt fails; retry after backoff
# 2nd attempt fails; → DLQ
sched.shutdown(timeout=5.0)
sched.dead_letters() # [FailedTask(failing_task, attempts=2, last_error=...)]
Initial Brute Force
A single thread polling a list sorted by priority. Run, retry inline. Single worker, no parallelism. O(N log N) per submit.
Brute Force Complexity
Per submit: O(N log N) on the sort. Per dispatch: O(N) on the linear scan. Acceptable only for tens of tasks.
Optimization Path
Replace the sorted list with a heapq. Replace the single thread with a worker pool of n_workers threads. Add a Condition variable so workers block when the queue is empty. Add a delayed-execution facility: instead of time.sleep(backoff) in the worker, push the task back with ready_at = now + backoff and key the heap on (ready_at, priority).
Final Expected Approach
Single heap (ready_at, priority, attempt, task_id, callable). A condition variable not_empty wakes workers when something becomes available. Workers loop: peek heap → if ready_at > now, wait until ready_at (or until notified). Pop, run, on success: done. On failure: increment attempt, if under max_attempts, push back with ready_at = now + backoff(attempt); else push to DLQ list. shutdown sets a flag, broadcasts the condition, joins all workers with a deadline.
Data Structures Used
| Structure | Purpose |
|---|---|
heapq of tuples | priority + delayed-readiness |
threading.Condition | wait/notify for empty queue and ready-time |
list (DLQ) | failed tasks |
dict[task_id, attempt_count] | retry tracking |
Correctness Argument
Priority ordering: heap orders by (ready_at, priority); workers always pop the smallest. When ready_at <= now, this is the highest-priority ready task. Ties on ready_at go to the smaller priority — correct.
Retry semantics: failure → push back with attempt+1, ready_at = now + 2^attempt * base + jitter. After max_attempts attempts, push to DLQ. The task is never lost: it is either running, in the heap, or in the DLQ — invariant maintained by every transition.
Shutdown: setting _stopping = True and broadcasting wakes every blocked worker. Each worker checks _stopping after the wait and exits if true. The join(timeout) per worker bounds total shutdown time.
Complexity
submit: O(log N)- Worker dispatch: O(log N) per task
- Memory: O(pending + dlq)
Implementation Requirements
import heapq, threading, time, itertools, random, traceback
from dataclasses import dataclass, field
from typing import Callable, Any
@dataclass
class FailedTask:
task_id: int
callable_repr: str
attempts: int
last_error: str
@dataclass(order=True)
class _Heap_Entry:
ready_at: float
priority: int
seq: int
task_id: int = field(compare=False)
fn: Callable = field(compare=False)
attempt: int = field(compare=False)
max_attempts: int = field(compare=False)
class TaskScheduler:
def __init__(self, n_workers: int = 4, base_backoff: float = 0.1):
self._n_workers = n_workers
self._base = base_backoff
self._heap: list[_Heap_Entry] = []
self._dlq: list[FailedTask] = []
self._cond = threading.Condition()
self._stopping = False
self._workers: list[threading.Thread] = []
self._seq = itertools.count()
self._next_id = itertools.count(1)
def submit(self, fn: Callable, priority: int = 5, max_attempts: int = 3) -> int:
if max_attempts <= 0:
raise ValueError("max_attempts must be positive")
tid = next(self._next_id)
e = _Heap_Entry(time.monotonic(), priority, next(self._seq),
tid, fn, attempt=0, max_attempts=max_attempts)
with self._cond:
heapq.heappush(self._heap, e)
self._cond.notify()
return tid
def start(self) -> None:
for i in range(self._n_workers):
t = threading.Thread(target=self._run_worker, name=f"w{i}", daemon=True)
t.start()
self._workers.append(t)
def shutdown(self, timeout: float = 5.0) -> None:
with self._cond:
self._stopping = True
self._cond.notify_all()
deadline = time.monotonic() + timeout
for w in self._workers:
w.join(timeout=max(0.0, deadline - time.monotonic()))
def dead_letters(self) -> list[FailedTask]:
with self._cond:
return list(self._dlq)
def _run_worker(self) -> None:
while True:
with self._cond:
while not self._heap and not self._stopping:
self._cond.wait()
if self._stopping and not self._heap:
return
head = self._heap[0]
wait = head.ready_at - time.monotonic()
if wait > 0:
self._cond.wait(timeout=wait)
continue
e = heapq.heappop(self._heap)
try:
e.fn()
except Exception as ex:
e.attempt += 1
if e.attempt >= e.max_attempts:
with self._cond:
self._dlq.append(FailedTask(
e.task_id, repr(e.fn), e.attempt,
f"{type(ex).__name__}: {ex}"))
else:
backoff = self._base * (2 ** (e.attempt - 1))
backoff *= 0.5 + random.random() # jitter [0.5, 1.5]
e.ready_at = time.monotonic() + backoff
e.seq = next(self._seq)
with self._cond:
heapq.heappush(self._heap, e)
self._cond.notify()
Tests
import unittest, time, threading
class TestScheduler(unittest.TestCase):
def test_priority_order(self):
order = []
sched = TaskScheduler(n_workers=1)
sched.start()
for p in [3, 1, 2]:
sched.submit((lambda x=p: order.append(x)), priority=p)
time.sleep(0.5)
sched.shutdown(timeout=2.0)
self.assertEqual(order, [1, 2, 3])
def test_retry_then_dlq(self):
attempts = []
def always_fail():
attempts.append(1)
raise RuntimeError("boom")
sched = TaskScheduler(n_workers=1, base_backoff=0.01)
sched.start()
sched.submit(always_fail, priority=0, max_attempts=3)
time.sleep(1.0)
sched.shutdown(timeout=2.0)
self.assertEqual(len(attempts), 3)
self.assertEqual(len(sched.dead_letters()), 1)
def test_concurrent_submit(self):
results = []
sched = TaskScheduler(n_workers=4)
sched.start()
def push(i):
for j in range(50):
sched.submit((lambda x=(i, j): results.append(x)), priority=0)
threads = [threading.Thread(target=push, args=(i,)) for i in range(4)]
for t in threads: t.start()
for t in threads: t.join()
time.sleep(0.5)
sched.shutdown(timeout=2.0)
self.assertEqual(len(results), 200)
Follow-up Questions
(2) Persist state across restarts? Tasks live in memory and are lost on restart. To persist, choose: (a) write each submit to a WAL; on boot, replay; on completion, append a “done” marker. (b) Snapshot the heap periodically and write a delta log. The DLQ should be persisted regardless — losing failed tasks is the worst outcome because nobody knows why a job didn’t run.
(8) Partial failure? The interesting case: a worker pops a task and crashes mid-execution. The task is now lost (it’s not in the heap and it didn’t complete). Solution: at-least-once via visibility timeout — the heap pops the task to an “in-flight” map with a TTL; if the worker doesn’t ack before TTL, the task returns to the heap. Idempotency keys make this safe. This is the SQS / Cloud Tasks model.
(9) Eviction / cleanup? The DLQ grows unbounded. Either: cap its size and drop oldest, retain a sliding-window of the last N failures, or persist to durable storage and prune from memory after a TTL. Always emit a per-task DLQ event so downstream alerting can fire.
(11) Configuration knobs? n_workers, base_backoff, default max_attempts. Per-task: priority, max_attempts, optionally timeout. Knobs not to expose: jitter strategy (use decorrelated jitter), heap implementation.
(12) Shutdown / draining? Two modes: graceful (stop accepting; wait for in-flight; return) and forceful (stop accepting; abandon in-flight; return immediately). Always offer both. Default to graceful with a deadline.
(13) Poison pill? A task that always crashes the worker (segfault, unhandled OS exception, infinite loop). Run tasks in subprocess isolation (or with a cooperative timeout). Blacklist by hash of (callable, args) after N consecutive crashes.
Product Extension
This is the heart of Celery, Sidekiq, RQ, AWS SQS + Lambda, GCP Cloud Tasks, and Temporal. Real systems add: visibility timeouts (the in-flight TTL), distributed coordination (multiple workers across hosts), durable storage (RDBMS or Redis with persistence), scheduling (cron-like time-based triggers), and workflow orchestration (Temporal). The core is what you wrote here.
Language/Runtime Follow-ups
- Python: GIL means worker threads don’t parallelize CPU work. For CPU-bound tasks, use a
ProcessPoolExecutorinstead. The implementation above is fine for I/O-bound tasks. - Java:
ScheduledThreadPoolExecutoris the textbook fit — submit with a delay, retries via re-submission.RetryTemplate(Spring Retry) for the policy.DeadLetterPublishingRecoverer(Kafka). - Go: a single channel of tasks plus N goroutines; for delayed retry, use
time.AfterFuncto push back to the channel. Or usegolang.org/x/sync/errgroupfor the worker pool. - C++: a
std::priority_queueplus condition variable. Tasks asstd::function<void()>. - JS/TS: not concurrent (single event loop), but BullMQ (Redis-backed) is the de-facto Node task queue.
Common Bugs
- Workers spinning when the heap head is in the future — wait
ready_at - nowexactly, not poll-loop. - Notifying only one worker on
submit(notify) butnotify_allon shutdown — fine, but check that the heap-shrink case (the popper seeshead.ready_at > nowand sleeps) doesn’t miss a wakeup when a higher-priority task is pushed during the sleep. - Forgetting to update
e.seqon re-push — the heap entry’s identity matters for tie-breaking, but Python’s heapq compares the full tuple in order, so missingsequpdates can cause the same(ready_at, priority, seq)to compare equal and the comparison to fall through to the un-comparableCallable. Always bumpseq. - Catching
Exceptionbut lettingBaseException(e.g.,KeyboardInterrupt) escape — workers die silently. CatchBaseExceptionwith care, or at minimum catchExceptionand log unexpected escapes. - DLQ growing forever — see follow-up #9.
Debugging Strategy
Add a worker trace: every transition (pop, run-start, run-end, retry, dlq) gets a log line with (worker_id, task_id, ts). Replay the log to see the timeline. For “task didn’t run” bugs, walk the heap state at submit time and check that notify was called. For shutdown deadlocks, take a thread dump (Python: faulthandler.dump_traceback_later()) — usually a worker is blocked on wait because notify_all was missed.
Mastery Criteria
- Implemented in <40 minutes; <30 on second attempt.
- All three tests pass.
- Articulated visibility-timeout / at-least-once vs lost-on-crash tradeoff in <90 seconds.
- Answered follow-ups #2, #8, #9, #12, #13 crisply.
- Added at-least-once semantics in <15 minutes when prompted.
- Stated why decorrelated jitter beats fixed jitter in retry backoff.