Lab 05 — Thread Pool

Goal

Implement a bounded ThreadPool with a fixed number of worker threads, a bounded work queue, configurable rejection policy, and graceful shutdown. After this lab you should be able to write a clean ThreadPoolExecutor clone in under 25 minutes and answer the standard concurrency follow-ups.

Background Concepts

A thread pool decouples task submission from task execution by introducing a queue of work items processed by N worker threads. The four design decisions are:

  1. Pool sizing: fixed-size, dynamic (grow/shrink), or bounded with min/max?
  2. Queue policy: bounded (block / reject / drop) or unbounded (memory risk)?
  3. Rejection policy when the queue is full: throw, drop newest, drop oldest, or run-on-caller’s-thread?
  4. Shutdown semantics: stop accepting and finish queue (shutdown), or stop accepting and abandon queue (shutdown_now)?

The textbook implementation (and the one Java’s ThreadPoolExecutor uses) is fixed-size pool + bounded blocking queue + caller-runs rejection + graceful shutdown. This is the answer the interviewer wants by default.

Interview Context

Thread pool is a classic concurrency interview question. It tests whether you understand condition variables / blocking queues, can reason about producer-consumer with backpressure, and can structure shutdown so that submit after shutdown is rejected and in-flight tasks complete cleanly. Java candidates are expected to know that ThreadPoolExecutor’s seven-parameter constructor encodes most of these decisions.

Problem Statement

Implement ThreadPool(n_workers, queue_capacity, on_reject):

  • submit(fn) -> Future — enqueue. If queue full and pool not shut down, apply on_reject.
  • shutdown(wait=True, timeout=None) — stop accepting; if wait, drain the queue and join workers.
  • shutdown_now() -> list[Callable] — stop accepting; abandon queued tasks and return them.

A Future exposes .result(timeout) to retrieve the task’s return value or raise its exception.

Constraints

  • 1 ≤ n_workers ≤ 1000
  • 0 ≤ queue_capacity ≤ 10^4 (0 = SynchronousQueue: hand off directly)
  • Submission rate up to 10^5 / s

Clarifying Questions

  1. Is the queue bounded? (Yes by default; “unbounded queue” is a known antipattern that masks production bugs.)
  2. What happens on full queue? (Reject by default; offer caller-runs as alternative.)
  3. Should workers be daemons? (Depends on language; in Python yes for graceful interpreter shutdown.)
  4. Returns a Future? (Yes — async result is the standard contract.)
  5. Re-entrancy: can a task submit more tasks? (Yes — must not deadlock on a full queue from inside a worker.)

Examples

pool = ThreadPool(n_workers=2, queue_capacity=5)
fut = pool.submit(lambda: 1 + 1)
fut.result()           -> 2
pool.shutdown(wait=True)
pool.submit(lambda: 1) -> raises RuntimeError (pool shut down)

Initial Brute Force

for fn in tasks: threading.Thread(target=fn).start(). No bound, no reuse, no result tracking. Each task pays full thread-creation cost (~1ms on Linux), and the OS can run out of threads at 10^4+.

Brute Force Complexity

Per task: O(thread creation) ≈ 1 ms in Python. Total: O(N · 1ms). At N=10^5, this is 100 seconds — far too slow. Memory: O(N) thread stacks ≈ 8 MB each.

Optimization Path

Pool the threads. Workers spin on a blocking queue. submit enqueues; the queue blocks when full (or rejects). Per-task overhead drops to microseconds (a queue push and pop). Memory is O(n_workers · stack_size + queue_capacity).

Final Expected Approach

A Queue(maxsize=queue_capacity) (Python’s queue.Queue is thread-safe and supports timeouts). N worker threads loop on q.get(), run the task, set its Future, repeat. A sentinel None posted N times signals shutdown. submit checks the shut-down flag, then either q.put_nowait (raise on full) or q.put (block on full); on full and not blocking, invoke on_reject.

Data Structures Used

StructurePurpose
queue.Queue(maxsize=…)producer-consumer with bounded blocking
Future (custom or concurrent.futures.Future)result + exception delivery
Sentinel None posted N timesshutdown signal
_shutdown: bool flagreject post-shutdown submissions

Correctness Argument

Liveness: when a task is enqueued and at least one worker is idle, that worker will dequeue and run it. Provided by queue.Queue’s internal Condition (notify on put, wait on get).

Safety / no lost tasks: every put either succeeds (task will be dequeued by some worker) or is explicitly rejected. The shutdown protocol enforces that no put succeeds after _shutdown=True. When shutdown(wait=True) returns, the queue is empty and all workers have exited (proven by the sentinel pattern: each worker sees exactly one None and exits, so all N workers terminate).

Future correctness: the worker’s try/except block sets either set_result(value) or set_exception(ex). Future.result() blocks on a Condition until one of the two is set. Linearizable.

Complexity

  • submit: O(1) amortized
  • Worker step: O(1) plus task duration
  • Memory: O(n_workers + queue_capacity)

Implementation Requirements

import threading, queue
from typing import Callable, Any, Optional
from concurrent.futures import Future

_SENTINEL = object()

class RejectedExecutionError(RuntimeError):
    pass

class ThreadPool:
    def __init__(self, n_workers: int, queue_capacity: int = 1024,
                 on_reject: Optional[Callable] = None):
        if n_workers <= 0:
            raise ValueError("n_workers must be positive")
        self._q: queue.Queue = queue.Queue(maxsize=queue_capacity)
        self._workers: list[threading.Thread] = []
        self._shutdown = False
        self._lock = threading.Lock()
        self._on_reject = on_reject or self._default_reject
        for i in range(n_workers):
            t = threading.Thread(target=self._run, name=f"pool-w{i}", daemon=True)
            t.start()
            self._workers.append(t)

    @staticmethod
    def _default_reject(fn, *args, **kwargs):
        raise RejectedExecutionError("queue full")

    def submit(self, fn: Callable, *args, **kwargs) -> Future:
        with self._lock:
            if self._shutdown:
                raise RejectedExecutionError("pool shut down")
        fut: Future = Future()
        try:
            self._q.put_nowait((fn, args, kwargs, fut))
        except queue.Full:
            try:
                self._on_reject(fn, *args, **kwargs)
            except Exception as ex:
                fut.set_exception(ex)
            else:
                fut.set_exception(RejectedExecutionError("rejected"))
        return fut

    def shutdown(self, wait: bool = True, timeout: Optional[float] = None) -> None:
        with self._lock:
            if self._shutdown:
                return
            self._shutdown = True
        for _ in self._workers:
            self._q.put(_SENTINEL)
        if wait:
            for w in self._workers:
                w.join(timeout=timeout)

    def shutdown_now(self) -> list[Callable]:
        """Stop accepting; abandon queued tasks; return abandoned callables."""
        with self._lock:
            self._shutdown = True
        abandoned: list[Callable] = []
        try:
            while True:
                item = self._q.get_nowait()
                if item is _SENTINEL: continue
                fn, _, _, fut = item
                fut.set_exception(RejectedExecutionError("shutdown_now"))
                abandoned.append(fn)
        except queue.Empty:
            pass
        for _ in self._workers:
            self._q.put(_SENTINEL)
        return abandoned

    def _run(self) -> None:
        while True:
            item = self._q.get()
            if item is _SENTINEL:
                return
            fn, args, kwargs, fut = item
            if not fut.set_running_or_notify_cancel():
                continue
            try:
                result = fn(*args, **kwargs)
            except BaseException as ex:
                fut.set_exception(ex)
            else:
                fut.set_result(result)


# A useful policy: caller-runs (executes inline if queue is full)
def caller_runs(fn, *args, **kwargs):
    fn(*args, **kwargs)

Tests

import unittest, time, threading

class TestPool(unittest.TestCase):
    def test_basic(self):
        pool = ThreadPool(n_workers=2, queue_capacity=10)
        futs = [pool.submit(lambda x=i: x * 2) for i in range(10)]
        results = [f.result(timeout=2.0) for f in futs]
        self.assertEqual(sorted(results), [0, 2, 4, 6, 8, 10, 12, 14, 16, 18])
        pool.shutdown()

    def test_exception_propagated(self):
        pool = ThreadPool(n_workers=1, queue_capacity=10)
        f = pool.submit(lambda: 1 / 0)
        with self.assertRaises(ZeroDivisionError):
            f.result(timeout=2.0)
        pool.shutdown()

    def test_rejection_when_queue_full(self):
        block = threading.Event()
        pool = ThreadPool(n_workers=1, queue_capacity=1)
        pool.submit(lambda: block.wait())   # occupies the worker
        pool.submit(lambda: None)           # fills the queue
        f = pool.submit(lambda: None)       # rejected
        with self.assertRaises(RejectedExecutionError):
            f.result(timeout=1.0)
        block.set()
        pool.shutdown()

    def test_shutdown_rejects_new(self):
        pool = ThreadPool(n_workers=1)
        pool.shutdown()
        with self.assertRaises(RejectedExecutionError):
            pool.submit(lambda: None)

    def test_concurrent_submit(self):
        pool = ThreadPool(n_workers=8, queue_capacity=200)
        results = []
        lock = threading.Lock()
        def task(x):
            with lock: results.append(x)
        futs = [pool.submit(task, i) for i in range(200)]
        for f in futs: f.result(timeout=2.0)
        pool.shutdown()
        self.assertEqual(sorted(results), list(range(200)))

Follow-up Questions

(1) Thread-safe? Already designed for concurrency; the Queue handles producer-consumer atomicity. The _shutdown flag is read under a lock to avoid races between submit and shutdown.

(7) Backpressure? The bounded queue is the backpressure. Three policies: (a) put blocks the producer (default queue.put) — pushes backpressure to the caller. (b) reject (raise) — caller decides. (c) caller-runs — caller does the work; throttles naturally. (d) drop oldest — for non-critical telemetry-style work. Pick one explicitly per pool.

(12) Shutdown / draining? shutdown(wait=True) drains the queue (graceful). shutdown_now() abandons the queue and returns abandoned tasks (forceful). The graceful path is the production default; expose shutdown_now for SIGTERM after a deadline.

(8) Partial failure? A worker thread that crashes on an uncaught exception leaves the pool with N-1 workers permanently. Solutions: (a) catch BaseException around the task body (shown), (b) supervise — periodically check live worker count and respawn dead workers. The simplest production design catches and logs, never lets the worker die.

(13) Poison pill? A task that runs forever or consumes all memory blocks one worker permanently. Mitigations: per-task timeout (cooperative with a watchdog thread), memory accounting (rare in Python), or run untrusted tasks in subprocesses. Stating this awareness is the bar.

(11) Configuration knobs? n_workers (often CPU_count or 2 * CPU_count for I/O-bound); queue_capacity (rule of thumb: enough to absorb a ~1 second burst); on_reject policy. Knobs not to expose: queue type, worker thread name (auto-generate).

Product Extension

java.util.concurrent.ThreadPoolExecutor is the textbook reference. Python’s concurrent.futures.ThreadPoolExecutor is the canonical stdlib equivalent, with an unbounded queue by default — your implementation is more correct than the stdlib because you bounded the queue. AWS Lambda’s worker runtime, NGINX worker processes, and most application servers use variants of this pattern.

Language/Runtime Follow-ups

  • Python: GIL serializes CPU work; ThreadPool is for I/O-bound tasks. Use ProcessPoolExecutor for CPU-bound. concurrent.futures.ThreadPoolExecutor ships with Python — but its queue is unbounded by default, which is a footgun.
  • Java: new ThreadPoolExecutor(core, max, keepAlive, unit, queue, factory, rejectedHandler). Memorize the seven parameters and the four built-in RejectedExecutionHandler policies (Abort, CallerRuns, Discard, DiscardOldest).
  • Go: idiomatic Go does not use thread pools — goroutines are cheap. The pattern is a worker-pool of N goroutines reading from a channel of work items. The bounded channel is the bounded queue.
  • C++: std::thread per worker; std::condition_variable + std::queue for the work queue. Boost.Asio’s thread pool is production-ready.
  • JS/TS: single event loop; use worker_threads for CPU work. Libraries: piscina (worker pool for Node).

Common Bugs

  1. Unbounded queue: Queue() without maxsize — masks production overload as memory growth.
  2. Daemons vs non-daemons: in Python, daemon workers die abruptly on interpreter exit, abandoning in-flight tasks. Non-daemons require explicit shutdown or the program hangs. Pick deliberately.
  3. Catching Exception but not BaseException lets KeyboardInterrupt kill workers silently. Catch BaseException, restore worker.
  4. submit after shutdown race: check the flag under the lock and put under the same critical section, or accept that a few enqueues may sneak in between check and put (and handle them in the worker by checking the flag before running).
  5. Forgetting to set_running_or_notify_cancel() on the Future — cancelled futures still get run.

Debugging Strategy

For deadlocks, take a thread dump: in Python, import faulthandler; faulthandler.dump_traceback_later(5) then trigger the hang. For lost tasks, instrument every put/get/set_result/set_exception with a sequence number and replay. For worker death, log every worker exit with its reason.

Mastery Criteria

  • Implemented ThreadPool in <30 minutes; tests pass first run.
  • Articulated bounded-queue + rejection-policy design without prompting.
  • Listed the four standard rejection policies (abort / caller-runs / discard / discard-oldest).
  • Answered follow-ups #7, #8, #12, #13 in <90 seconds each.
  • Stated when to use ThreadPool vs ProcessPool in Python in <30 seconds.
  • Refactored to add per-task timeout in <10 minutes when prompted.