Lab 16 — Circuit Breaker

Goal

Implement a thread-safe circuit breaker with three states — CLOSED, OPEN, HALF_OPEN — that protects a downstream call by failing fast once a sliding-window failure rate threshold is crossed, then probes for recovery after a cooldown. After this lab you should be able to draw the state diagram, name every transition, write the implementation in <25 minutes, and answer “what’s the difference between a retry and a circuit breaker” in <30 seconds.

Background Concepts

A circuit breaker is the operational dual of a retry. A retry keeps trying until the downstream is probably up; a circuit breaker stops trying once the downstream is probably down. Without a breaker, every caller retries the full schedule and amplifies the outage. With one, callers fail fast for a cooldown window and only a single probe call is sent during recovery — preventing the retry storm that otherwise prolongs outages.

The three states:

  • CLOSED — normal operation; calls go through; failures are counted in a sliding window.
  • OPEN — the failure threshold was crossed; all calls are short-circuited with CircuitOpenError for cooldown_s seconds.
  • HALF_OPEN — cooldown elapsed; a single probe call is allowed. If it succeeds, transition to CLOSED and reset counters. If it fails, transition back to OPEN and start a fresh cooldown.

Two failure-counting windows are common: count-based (last N calls) and time-based (last T seconds). Time-based is preferred for low-traffic services because count-based windows can stay stale indefinitely. Both are easy to implement on top of a deque of timestamps.

Interview Context

This is the canonical follow-up to Lab 15 (retry) and a top-15 practical problem at Stripe, Netflix, Uber, and any team with a microservice mesh. The Hystrix library popularized this pattern; its successor Resilience4j is the modern reference. Candidates often hand-roll only the state transitions and miss the half-open single-probe constraint — a clear signal of “knows the diagram, hasn’t operated one in production”.

Problem Statement

Implement CircuitBreaker(failure_threshold, window_s, cooldown_s) with method call(fn) that either calls fn() (and updates the breaker state from the result) or raises CircuitOpenError if the breaker is open. Internally track failure count over the last window_s seconds; transition to OPEN when the count reaches failure_threshold. After cooldown_s in OPEN, the next call enters HALF_OPEN and is the sole probe; success → CLOSED, failure → OPEN again.

Constraints

  • Thread-safe; multiple goroutines/threads may call call() concurrently.
  • In HALF_OPEN, exactly one probe is in flight. Concurrent callers see CircuitOpenError until the probe completes.
  • failure_threshold ≥ 1, window_s > 0, cooldown_s > 0.
  • Successful calls in CLOSED decrement (or do not affect) the failure window — choose and document.

Clarifying Questions

  1. Are timeouts counted as failures? (Default yes — they almost always indicate downstream unhealth.)
  2. Are application errors (4xx vs 5xx) treated identically? (No — 4xx is the caller’s fault; only 5xx and timeouts should trip. Inject a is_failure(exception) predicate.)
  3. What’s the recovery semantics — strict half-open (single probe) or “let N requests through”? (Single probe by default; named RECOVERY_QUOTA if needed.)
  4. Do we need per-resource breakers or a global one? (Per-resource is correct — a breaker per downstream identity.)
  5. Should successes in CLOSED reset the failure count? (Most implementations don’t reset; only the sliding window aging removes failures. Tunable.)

Examples

breaker = CircuitBreaker(failure_threshold=5, window_s=10, cooldown_s=30)
breaker.call(lambda: http_get(url))   # raises if downstream raises
# After 5 failures within 10s: state -> OPEN
breaker.call(...)                      # raises CircuitOpenError immediately for 30s
# After 30s cooldown: next call -> HALF_OPEN probe
# Probe success -> CLOSED, fresh window
# Probe failure -> OPEN, fresh 30s cooldown

Initial Brute Force

class NaiveBreaker:
    def __init__(self, threshold, cooldown_s):
        self.failures = 0
        self.opened_at = None
        self.threshold = threshold
        self.cooldown_s = cooldown_s
    def call(self, fn):
        if self.opened_at and time.time() - self.opened_at < self.cooldown_s:
            raise CircuitOpenError()
        try:
            r = fn()
            self.failures = 0
            return r
        except Exception:
            self.failures += 1
            if self.failures >= self.threshold:
                self.opened_at = time.time()
            raise

This naive version has six bugs: not thread-safe; counts forever (no window aging); no half-open state (multiple probes after cooldown); resets failures on any success even if breaker just opened; uses wall-clock; treats every exception as a failure.

Brute Force Complexity

call() is O(1). Failure window is unbounded — fails over long traffic patterns where intermittent failures should not trip.

Optimization Path

(1) Add a sliding window — deque of failure timestamps, age out on each call. (2) Add HALF_OPEN state with a probe_in_flight flag. (3) Add a Lock to serialize state transitions. (4) Inject the failure predicate so only real failures count. (5) Switch to monotonic(). (6) Emit metrics on each transition.

Final Expected Approach

State machine guarded by a threading.Lock. On each call(): under the lock, read state. If OPEN and cooldown elapsed → transition to HALF_OPEN and grant the probe to this caller (set probe_in_flight=True). If OPEN and not elapsed → raise. If HALF_OPEN and probe in flight → raise (concurrent callers see open). If CLOSED → proceed. Release the lock, call fn(), reacquire the lock to record the result. On success in HALF_OPEN → transition to CLOSED, clear failures. On failure → record (or transition to OPEN).

Data Structures Used

  • deque[float] for failure timestamps in the sliding window.
  • threading.Lock for state transitions.
  • An enum for State.
  • A monotonic clock for all time reads.

Correctness Argument

The state diagram is a closed graph: CLOSED → OPEN → HALF_OPEN → {CLOSED | OPEN}. Every transition is guarded by the lock, so two threads cannot disagree on the current state. The half-open invariant is enforced by probe_in_flight: only the thread that flipped the state from OPEN to HALF_OPEN holds the probe right; all others see CircuitOpenError. The sliding window is monotonically aged on each call, so failures older than window_s are guaranteed evicted before being counted.

Complexity

OperationTimeSpace
call (CLOSED, success)O(1) amortizedO(window) for deque
call (OPEN, fast-fail)O(1)O(1)
call (HALF_OPEN probe)O(1) plus fnO(1)

Window aging is amortized O(1) per call.

Implementation Requirements

import threading
import time
from collections import deque
from enum import Enum
from typing import Callable, Optional, TypeVar

T = TypeVar("T")


class State(Enum):
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"


class CircuitOpenError(Exception):
    pass


class CircuitBreaker:
    def __init__(self,
                 failure_threshold: int = 5,
                 window_s: float = 10.0,
                 cooldown_s: float = 30.0,
                 is_failure: Callable[[BaseException], bool] = lambda e: True,
                 *,
                 clock=time.monotonic):
        if failure_threshold < 1:
            raise ValueError("failure_threshold must be >= 1")
        self._threshold = failure_threshold
        self._window_s = window_s
        self._cooldown_s = cooldown_s
        self._is_failure = is_failure
        self._clock = clock
        self._lock = threading.Lock()
        self._state = State.CLOSED
        self._failures: deque[float] = deque()
        self._opened_at: Optional[float] = None
        self._probe_in_flight = False
        # observability
        self._transitions: list[tuple[float, State, State]] = []

    def _age_failures(self, now: float):
        cutoff = now - self._window_s
        while self._failures and self._failures[0] < cutoff:
            self._failures.popleft()

    def _transition(self, new: State, now: float):
        self._transitions.append((now, self._state, new))
        self._state = new

    def _try_acquire_probe(self, now: float) -> bool:
        """Called under lock. True if this caller becomes the probe."""
        if self._state == State.OPEN and self._opened_at is not None \
                and now - self._opened_at >= self._cooldown_s:
            self._transition(State.HALF_OPEN, now)
            self._probe_in_flight = True
            return True
        return False

    def call(self, fn: Callable[[], T]) -> T:
        now = self._clock()
        is_probe = False
        with self._lock:
            if self._state == State.CLOSED:
                self._age_failures(now)
            elif self._state == State.OPEN:
                if not self._try_acquire_probe(now):
                    raise CircuitOpenError("breaker is OPEN")
                is_probe = True
            elif self._state == State.HALF_OPEN:
                if not self._probe_in_flight:
                    # rare race: cooldown re-elapsed during a transient state
                    self._probe_in_flight = True
                    is_probe = True
                else:
                    raise CircuitOpenError("probe in flight")
        # invoke without holding the lock
        try:
            result = fn()
        except BaseException as e:
            failed = self._is_failure(e)
            with self._lock:
                now = self._clock()
                if is_probe:
                    self._probe_in_flight = False
                    self._transition(State.OPEN, now)
                    self._opened_at = now
                elif failed:
                    self._failures.append(now)
                    self._age_failures(now)
                    if len(self._failures) >= self._threshold and self._state == State.CLOSED:
                        self._transition(State.OPEN, now)
                        self._opened_at = now
                        self._failures.clear()
            raise
        with self._lock:
            if is_probe:
                self._probe_in_flight = False
                self._transition(State.CLOSED, self._clock())
                self._failures.clear()
                self._opened_at = None
        return result

    def state(self) -> State:
        with self._lock:
            return self._state

Tests

def test_closed_passes_through():
    b = CircuitBreaker(failure_threshold=3, window_s=10, cooldown_s=5)
    assert b.call(lambda: 42) == 42
    assert b.state() == State.CLOSED

def test_opens_after_threshold():
    b = CircuitBreaker(failure_threshold=3, window_s=10, cooldown_s=5)
    for _ in range(3):
        try: b.call(lambda: (_ for _ in ()).throw(RuntimeError()))
        except RuntimeError: pass
    assert b.state() == State.OPEN
    try: b.call(lambda: 42)
    except CircuitOpenError: pass

def test_half_open_success_closes():
    fake = [0.0]
    b = CircuitBreaker(failure_threshold=2, window_s=10, cooldown_s=5, clock=lambda: fake[0])
    for _ in range(2):
        try: b.call(lambda: (_ for _ in ()).throw(RuntimeError()))
        except RuntimeError: pass
    assert b.state() == State.OPEN
    fake[0] = 6
    assert b.call(lambda: "ok") == "ok"
    assert b.state() == State.CLOSED

def test_half_open_failure_reopens():
    fake = [0.0]
    b = CircuitBreaker(failure_threshold=2, window_s=10, cooldown_s=5, clock=lambda: fake[0])
    for _ in range(2):
        try: b.call(lambda: (_ for _ in ()).throw(RuntimeError()))
        except RuntimeError: pass
    fake[0] = 6
    try: b.call(lambda: (_ for _ in ()).throw(RuntimeError()))
    except RuntimeError: pass
    assert b.state() == State.OPEN

def test_concurrent_only_one_probe():
    import threading
    fake = [0.0]
    b = CircuitBreaker(failure_threshold=1, window_s=10, cooldown_s=5, clock=lambda: fake[0])
    try: b.call(lambda: (_ for _ in ()).throw(RuntimeError()))
    except RuntimeError: pass
    fake[0] = 6
    seen_states = []
    barrier = threading.Barrier(10)
    def worker():
        barrier.wait()
        try:
            b.call(lambda: time.sleep(0.05) or "ok")
            seen_states.append("ok")
        except CircuitOpenError:
            seen_states.append("open")
    threads = [threading.Thread(target=worker) for _ in range(10)]
    for t in threads: t.start()
    for t in threads: t.join()
    assert seen_states.count("ok") == 1
    assert seen_states.count("open") == 9

Follow-up Questions

  1. How would you make it thread-safe? A single threading.Lock around state transitions and counter updates is sufficient and is what the implementation does. The fn() call is invoked outside the lock so a slow downstream does not block other callers from seeing CircuitOpenError. The half-open probe race is resolved by probe_in_flight flipping atomically under the lock.
  2. What metrics would you emit? State-transition counter (labels: from_state, to_state); current state gauge; per-call outcome counter (success, failure, short_circuit, probe_success, probe_failure); failure-window gauge (current count); time-in-state histogram.
  3. What is the consistency model? Linearizable on the breaker’s state — all state() reads observe transitions in a total order consistent with the lock acquisition order. The probe invariant (“at most one probe in flight at any time”) is strict.
  4. How would you handle a poison-pill input? A request that always raises a retryable failure will trip the breaker quickly — that’s the breaker’s job. The risk is the opposite: a probe with a poison input perpetually fails the half-open probe and never recovers. Mitigation: pick the probe payload from a known-safe traffic pool (synthetic health-check), or use a periodic health probe instead of in-line traffic.
  5. What configuration knobs would you expose? failure_threshold, window_s, cooldown_s, is_failure predicate. Don’t expose the half-open probe quota — keep it 1 unless you have a strong reason. Defaults: 5 failures / 10s window / 30s cooldown.
  6. How would you scale to N nodes? Per-process breakers are local — each instance learns about downstream health independently. This is correct for most use cases (each instance’s view of latency varies) but expensive if downstream collapse is sudden. The next step is a coordinated breaker via a shared registry, but only at very high scale.

Product Extension

Real-world breakers (Hystrix, Resilience4j, Polly) layer on top of this core: bulkheads (concurrent-call limit), rate limiters, fallbacks (return cached value when open), and metric emission to Prometheus / StatsD. The state machine is the same; the bookkeeping around it varies by framework.

Language/Runtime Follow-ups

  • Python: as above. For async, replace Lock with asyncio.Lock and make call an async def.
  • Java: prefer Resilience4j in production. Hand-rolled: AtomicReference<State>, LongAdder for counters, ScheduledExecutorService for cooldown timeouts.
  • Go: a struct guarded by sync.Mutex. The probe flag is a bool. Use time.Now() (monotonic on Go 1.9+).
  • C++: std::mutex + std::condition_variable if you want concurrent callers to wait for the probe rather than fail fast (a different policy, called “blocking breaker”).
  • JS/TS: in single-threaded Node, no lock is needed — the state-transition logic is naturally atomic across awaits as long as you do not await in the middle of a transition block.

Common Bugs

  1. Holding the lock while calling fn() — a slow downstream blocks every other caller.
  2. Forgetting to clear probe_in_flight on probe failure — breaker stays in HALF_OPEN forever, all calls fail.
  3. Using time.time() — wall-clock skew can make now - opened_at negative and the cooldown effectively infinite.
  4. Counting non-failure exceptions (KeyboardInterrupt, ValueError from caller side) toward the threshold.
  5. Resetting failures on every successful call — masks intermittent failures.
  6. Aging the window only on failure — state() queries report stale counts.

Debugging Strategy

When the breaker won’t open: log the failure count after each call; check is_failure(e) returns True for the actual exception. When it won’t close after recovery: log the state and probe_in_flight flag — almost always the probe-flag-stuck-True bug. When concurrent tests are flaky: add a barrier so all callers race in lockstep, then assert exactly one probe-success and N-1 short-circuits.

Mastery Criteria

  • Drew the three-state diagram from memory in <30 seconds.
  • Listed every transition trigger (failure-threshold, cooldown-elapsed, probe-success, probe-failure) without prompting.
  • Wrote a thread-safe implementation in <30 minutes from a blank screen.
  • Wrote a concurrent test that catches the multiple-probe bug.
  • Articulated the retry × circuit-breaker composition in <60 seconds.
  • Named four metrics you’d emit for a production breaker.
  • Explained why fn() must not be called under the lock.