Lab 18 — Concurrent Web Crawler

Goal

Implement a concurrent web crawler that BFS-traverses a starting URL, respects a depth limit, per-host politeness (max in-flight requests per domain + minimum inter-request delay), dedup (visit each URL once), and a bounded worker pool. After this lab you should be able to write the crawler from a blank screen in <40 minutes and answer the politeness/backpressure follow-ups crisply.

Background Concepts

A web crawler is a BFS over the web graph where nodes are URLs and edges are anchor links extracted from the HTML. The interesting engineering is not the BFS — it’s the constraints layered on top:

  1. Politeness: never overload a single host. The classic rule is “no more than k concurrent requests per host” plus “at least delay seconds between consecutive requests to the same host”. Both rules must be enforced even when many workers race to crawl the same domain.
  2. Dedup: the web has cycles. A seen set keyed on canonicalized URL prevents enqueueing the same page twice.
  3. Depth limit: domains can have effectively infinite reachable URLs (calendars, faceted search). Hard-cap depth.
  4. Domain restriction: crawl only within a configured allowlist of domains; otherwise the crawler immediately drifts off-topic.
  5. Bounded workers: limit total concurrency to N. Without this, the crawler will saturate the host network and crash with file-descriptor exhaustion.
  6. Backpressure: the URL frontier (queue of pending URLs) must be bounded — otherwise a fan-out page with 10,000 links allocates 10,000 entries and pushes more discovery on top.

This is the canonical “build something concurrent” interview question at companies like Google (Search), Cloudflare, Datadog, and any team that does any kind of scraping.

Interview Context

A 40-to-60-minute round at senior+ practical interviews. The interviewer almost always extends the basic problem with politeness, then with persistence (resume after restart), then with distributed scaling. Candidates who write a single-threaded loop with no per-host politeness fail; candidates who reach for a thread pool and a Lock around seen plus a per-host counter pass.

Problem Statement

Implement crawl(start_url, *, max_depth, max_workers, per_host_concurrency, per_host_delay_s, allow_domains, http_get) that returns a list (or yields a stream) of (url, depth, content) tuples. Visit each canonical URL at most once. Never have more than per_host_concurrency requests in flight to a single host. Wait at least per_host_delay_s seconds since the last completed request to that host before starting a new one. Stop when the frontier is empty.

Constraints

  • Thread-safe; many workers race for URLs from the frontier.
  • Bounded memory: frontier capped, seen set is the only unbounded structure (acceptable — proportional to corpus).
  • Graceful shutdown on Ctrl-C or external cancellation.
  • http_get is injected so tests don’t hit the network.

Clarifying Questions

  1. URL canonicalization rules? (Lowercase host, drop fragment, sort query params, default port elision.)
  2. Should robots.txt be honored? (Yes in production; mock it in this lab via is_allowed predicate.)
  3. What’s a “host”? (Registered domain — example.com, not www.example.com vs images.example.com. Or just hostname; document the choice.)
  4. Should depth-0 (the start URL) count toward the depth limit? (No — depth-0 is always crawled.)
  5. Should we follow redirects? (Yes, but the redirect target counts as the visited URL.)
  6. Output order — does it need to be deterministic? (No — concurrency makes determinism hard. Document.)

Examples

results = crawl(
    "https://example.com/",
    max_depth=3,
    max_workers=8,
    per_host_concurrency=2,
    per_host_delay_s=0.5,
    allow_domains={"example.com"},
    http_get=fake_http_get,
)
# returns ~30 (url, depth, body) tuples, never more than 2 in-flight to example.com.

Initial Brute Force

def crawl_naive(url, max_depth):
    seen = {url}
    frontier = [(url, 0)]
    out = []
    while frontier:
        u, d = frontier.pop(0)
        body = http_get(u)
        out.append((u, d, body))
        if d < max_depth:
            for link in extract_links(body):
                if link not in seen:
                    seen.add(link)
                    frontier.append((link, d + 1))
    return out

This is single-threaded (slow), has no politeness (will get IP-banned), and grows the frontier unboundedly.

Brute Force Complexity

Time: O(V) HTTP requests serially, where V is the number of unique pages. With 1s/page and 10k pages, ~3 hours.

Optimization Path

Add a thread pool of max_workers. Add a Lock-guarded seen set. Add per-host concurrency via a Semaphore keyed on host. Add per-host last-request-time via a dict-of-(timestamp, lock). Cap the frontier with a BoundedQueue. Add a stop_event for graceful shutdown.

Final Expected Approach

A ThreadPoolExecutor(max_workers) runs crawl_one(url, depth). The frontier is a queue.Queue(maxsize=...). A seen set guarded by a Lock ensures each URL is enqueued once. A HostLimiter class encapsulates per-host concurrency (a Semaphore) and per-host delay (a Lock + last-completed timestamp). Workers pull URLs, acquire the host limiter (which may block on the semaphore or sleep for the delay), call http_get, extract links, check the depth limit and the seen set under the lock, enqueue new URLs.

Data Structures Used

  • queue.Queue for the frontier (bounded).
  • set[str] for seen (guarded by Lock).
  • dict[str, _HostState] for per-host limiters.
  • _HostState: a Semaphore(per_host_concurrency) and a (Lock, last_completed_ts).
  • ThreadPoolExecutor for workers.

Correctness Argument

Each URL is enqueued at most once (the seen set is checked under the global lock atomically with the add). Each URL is dequeued and crawled at most once (the queue is FIFO, items are not re-enqueued). The depth limit is enforced before enqueueing children, not before crawling parents — this matches the natural BFS semantics. Per-host politeness: a worker holds the host’s semaphore for the duration of the request, and the inter-request delay is enforced by checking now - last_completed >= delay under the host’s lock; this guarantees no two completed requests are closer than delay for the same host, even with N workers.

Complexity

AspectCost
HTTP requestsO(V) total, parallel by max_workers
seen lookupO(1) average
Per-host serializationbounded by per_host_concurrency and per_host_delay_s
MemoryO(V) for seen plus frontier_capacity for the queue

Implementation Requirements

import threading
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue, Empty
from urllib.parse import urlparse, urldefrag


def canonicalize(url: str) -> str:
    u, _ = urldefrag(url)
    p = urlparse(u)
    host = p.hostname.lower() if p.hostname else ""
    port = f":{p.port}" if p.port else ""
    path = p.path or "/"
    return f"{p.scheme}://{host}{port}{path}" + (f"?{p.query}" if p.query else "")


def host_of(url: str) -> str:
    return (urlparse(url).hostname or "").lower()


class _HostState:
    __slots__ = ("sem", "lock", "last_completed")
    def __init__(self, concurrency: int):
        self.sem = threading.Semaphore(concurrency)
        self.lock = threading.Lock()
        self.last_completed = 0.0


class HostLimiter:
    def __init__(self, per_host_concurrency: int, per_host_delay_s: float, *, clock=time.monotonic, sleep=time.sleep):
        self._concurrency = per_host_concurrency
        self._delay = per_host_delay_s
        self._states: dict[str, _HostState] = {}
        self._guard = threading.Lock()
        self._clock = clock
        self._sleep = sleep

    def _state(self, host: str) -> _HostState:
        with self._guard:
            s = self._states.get(host)
            if s is None:
                s = _HostState(self._concurrency)
                self._states[host] = s
            return s

    def acquire(self, host: str):
        s = self._state(host)
        s.sem.acquire()
        with s.lock:
            wait = s.last_completed + self._delay - self._clock()
        if wait > 0:
            self._sleep(wait)
        return s

    def release(self, s: _HostState):
        with s.lock:
            s.last_completed = self._clock()
        s.sem.release()


def crawl(start_url: str, *,
          max_depth: int = 3,
          max_workers: int = 8,
          per_host_concurrency: int = 2,
          per_host_delay_s: float = 0.0,
          allow_domains: set[str] | None = None,
          frontier_capacity: int = 10_000,
          http_get,
          extract_links,
          is_allowed=lambda url: True):
    seen: set[str] = set()
    seen_lock = threading.Lock()
    frontier: Queue = Queue(maxsize=frontier_capacity)
    in_flight = 0
    in_flight_lock = threading.Lock()
    inflight_zero = threading.Event()
    inflight_zero.set()
    stop_event = threading.Event()
    results: list[tuple[str, int, str]] = []
    results_lock = threading.Lock()
    limiter = HostLimiter(per_host_concurrency, per_host_delay_s)

    def _allow(url: str) -> bool:
        if not is_allowed(url):
            return False
        if allow_domains is None:
            return True
        h = host_of(url)
        return any(h == d or h.endswith("." + d) for d in allow_domains)

    def _enqueue(url: str, depth: int):
        canon = canonicalize(url)
        if not _allow(canon):
            return
        with seen_lock:
            if canon in seen:
                return
            seen.add(canon)
        with in_flight_lock:
            nonlocal_in_flight = None  # placate linters
        # frontier put outside the lock; bounded queue applies backpressure
        frontier.put((canon, depth))

    def _worker():
        nonlocal in_flight
        while not stop_event.is_set():
            try:
                url, depth = frontier.get(timeout=0.1)
            except Empty:
                with in_flight_lock:
                    if in_flight == 0:
                        return
                continue
            with in_flight_lock:
                in_flight += 1
                inflight_zero.clear()
            try:
                state = limiter.acquire(host_of(url))
                try:
                    body = http_get(url)
                finally:
                    limiter.release(state)
                if body is None:
                    continue
                with results_lock:
                    results.append((url, depth, body))
                if depth < max_depth:
                    for link in extract_links(body, base=url):
                        _enqueue(link, depth + 1)
            except Exception:
                # in production: emit a metric, maybe DLQ; here we just continue
                pass
            finally:
                frontier.task_done()
                with in_flight_lock:
                    in_flight -= 1
                    if in_flight == 0 and frontier.empty():
                        inflight_zero.set()

    _enqueue(start_url, 0)
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = [pool.submit(_worker) for _ in range(max_workers)]
        try:
            while True:
                if inflight_zero.wait(timeout=0.5) and frontier.empty():
                    with in_flight_lock:
                        if in_flight == 0:
                            break
        except KeyboardInterrupt:
            stop_event.set()
        stop_event.set()
        for f in futures:
            f.result()
    return results

Tests

def make_site(graph: dict[str, list[str]]):
    def http_get(url): return graph.get(url)
    def extract_links(body, base): return body if isinstance(body, list) else []
    return http_get, extract_links

def test_basic_bfs():
    graph = {
        "https://e.com/a": ["https://e.com/b", "https://e.com/c"],
        "https://e.com/b": ["https://e.com/d"],
        "https://e.com/c": ["https://e.com/d"],
        "https://e.com/d": [],
    }
    http_get, extract_links = make_site(graph)
    out = crawl("https://e.com/a", max_depth=5, max_workers=4,
                per_host_concurrency=4, allow_domains={"e.com"},
                http_get=http_get, extract_links=extract_links)
    visited = {u for u, _, _ in out}
    assert visited == set(graph.keys())

def test_depth_limit():
    chain = {f"https://e.com/{i}": [f"https://e.com/{i+1}"] for i in range(10)}
    chain["https://e.com/10"] = []
    http_get, extract_links = make_site(chain)
    out = crawl("https://e.com/0", max_depth=2, max_workers=2,
                per_host_concurrency=2, allow_domains={"e.com"},
                http_get=http_get, extract_links=extract_links)
    assert len({u for u, _, _ in out}) == 3  # depths 0, 1, 2

def test_dedup_on_cycle():
    g = {"https://e.com/a": ["https://e.com/b"],
         "https://e.com/b": ["https://e.com/a", "https://e.com/c"],
         "https://e.com/c": []}
    http_get, extract_links = make_site(g)
    out = crawl("https://e.com/a", max_depth=10, max_workers=4,
                per_host_concurrency=4, allow_domains={"e.com"},
                http_get=http_get, extract_links=extract_links)
    urls = [u for u, _, _ in out]
    assert len(urls) == len(set(urls)) == 3

def test_domain_restriction():
    g = {"https://e.com/a": ["https://other.com/x"], "https://other.com/x": []}
    http_get, extract_links = make_site(g)
    out = crawl("https://e.com/a", max_depth=5, max_workers=2,
                per_host_concurrency=2, allow_domains={"e.com"},
                http_get=http_get, extract_links=extract_links)
    urls = {u for u, _, _ in out}
    assert "https://other.com/x" not in urls

def test_per_host_concurrency():
    import threading
    in_flight = {"max": 0, "now": 0}
    lock = threading.Lock()
    def http_get(url):
        with lock:
            in_flight["now"] += 1
            in_flight["max"] = max(in_flight["max"], in_flight["now"])
        time.sleep(0.05)
        with lock:
            in_flight["now"] -= 1
        return []
    crawl("https://e.com/a", max_depth=0, max_workers=10,
          per_host_concurrency=3, allow_domains={"e.com"},
          http_get=http_get, extract_links=lambda b, base: [])
    # depth 0 -> only the start url is fetched. Need a bigger frontier:
    g = {f"https://e.com/{i}": [f"https://e.com/{i+1}" for _ in range(1)] for i in range(20)}
    # ... but the assertion shape is: in_flight["max"] <= 3 in any test variant.

Follow-up Questions

  1. How would you make it thread-safe? The implementation uses three locks: seen_lock (guards the URL set), in_flight_lock (guards the in-flight counter and frontier-empty signaling), and per-host locks inside HostLimiter. The Queue is internally thread-safe. The frontier-capacity bound provides backpressure when discovery outpaces processing.
  2. How would you persist state across restarts? Periodic snapshot of seen to disk (or push to Redis/RocksDB on every visit). On restart, load seen from disk; restart with all known URLs, optionally re-enqueue any URLs that were in-flight but not completed (track via a separate pending set).
  3. How would you scale to N nodes? Shard the URL space by hash(host) % N — each node owns a fixed slice. Cross-node enqueues go via a message bus. The seen set is replicated or sharded the same way. Per-host politeness becomes per-(node, host) — no cross-node coordination needed since each host is owned by exactly one node.
  4. How would you handle backpressure? The bounded frontier (Queue(maxsize=...)) blocks workers that try to enqueue when full. This naturally throttles fast-discovery pages — they wait for the consumers to drain. Drop-on-overflow is wrong for crawlers (you’d lose URLs); blocking is right.
  5. What is the shutdown / draining behavior? On stop_event, workers stop pulling from the queue. The main loop waits for current http_get calls to complete (no forced cancellation). Any URLs in the frontier are abandoned but their canonical forms remain in seen, so a restart with the same seen snapshot will re-enqueue them on demand.
  6. How would you handle a poison-pill input? A URL whose response triggers an infinite link-extraction loop (e.g., calendar with year=∞). Mitigations: depth limit (already there), per-host hit cap (max 100 URLs per host), URL length cap, link-extraction time cap with signal.alarm or a sub-thread timeout, and a content-size cap on http_get.

Product Extension

Real crawlers (Googlebot, Bingbot) layer on top: robots.txt parsing per host, sitemap.xml ingestion, content fingerprinting (SimHash) to dedup near-duplicates, freshness scheduling (re-crawl frequently changing pages sooner), priority scoring (PageRank-like), and distributed coordination via Bigtable / DynamoDB / Cassandra.

Language/Runtime Follow-ups

  • Python: as above. For very high concurrency switch to asyncio with aiohttp and asyncio.Semaphore per host — single-threaded, no GIL contention, 1k+ concurrent requests are realistic.
  • Java: ExecutorService with bounded BlockingQueue; ConcurrentHashMap.newKeySet() for seen; Semaphore per host. Or CompletableFuture chains with virtual threads (Project Loom) for high concurrency.
  • Go: a worker-pool of goroutines reading from a buffered channel; sync.Map for seen; per-host chan struct{} of size concurrency as a semaphore. The idiom is exceptionally clean in Go.
  • C++: std::thread pool; std::unordered_set + std::shared_mutex for seen; per-host std::counting_semaphore (C++20).
  • JS/TS: Node + p-limit per host; single-threaded so no locks. The “global” concurrency is enforced by an outer p-limit.

Common Bugs

  1. Adding to seen after http_get returns — multiple workers crawl the same URL.
  2. Holding seen_lock while calling http_get — blocks every other worker.
  3. Per-host semaphore allocated per request instead of memoized — concurrency limit not enforced.
  4. Per-host delay measured from request start instead of completion — fast pages still violate politeness.
  5. Forgetting urldefrag in canonicalization — ?#section1 and ?#section2 count as different URLs.
  6. The “main loop” exits before all workers drain — items in the frontier are silently lost. Use a counter + condition or Queue.join().

Debugging Strategy

When dedup fails: log every seen.add(canon) call with the canon string; the bug is almost always a canonicalization difference. When politeness fails: log per-host completed timestamps and confirm consecutive ones are at least delay apart. When the crawler hangs at end: print in_flight and frontier.qsize() periodically — if both are 0 but the main loop hasn’t exited, your termination signal is broken.

Mastery Criteria

  • Wrote a crawler with thread pool, bounded frontier, dedup, and depth limit in <40 minutes.
  • Implemented per-host concurrency limit and per-host inter-request delay correctly under stress.
  • Stated the four reasons politeness matters (overload, IP ban, robots violation, cost) without prompting.
  • Articulated the sharding strategy for scaling to N nodes.
  • Listed two metrics you’d emit (per-host request rate, frontier depth gauge).
  • Identified the canonicalization bug class (fragment / param order / case).
  • Explained why blocking-on-full-queue is the right backpressure choice for crawlers.