Lab 18 — Concurrent Web Crawler
Goal
Implement a concurrent web crawler that BFS-traverses a starting URL, respects a depth limit, per-host politeness (max in-flight requests per domain + minimum inter-request delay), dedup (visit each URL once), and a bounded worker pool. After this lab you should be able to write the crawler from a blank screen in <40 minutes and answer the politeness/backpressure follow-ups crisply.
Background Concepts
A web crawler is a BFS over the web graph where nodes are URLs and edges are anchor links extracted from the HTML. The interesting engineering is not the BFS — it’s the constraints layered on top:
- Politeness: never overload a single host. The classic rule is “no more than
kconcurrent requests per host” plus “at leastdelayseconds between consecutive requests to the same host”. Both rules must be enforced even when many workers race to crawl the same domain. - Dedup: the web has cycles. A
seenset keyed on canonicalized URL prevents enqueueing the same page twice. - Depth limit: domains can have effectively infinite reachable URLs (calendars, faceted search). Hard-cap depth.
- Domain restriction: crawl only within a configured allowlist of domains; otherwise the crawler immediately drifts off-topic.
- Bounded workers: limit total concurrency to
N. Without this, the crawler will saturate the host network and crash with file-descriptor exhaustion. - Backpressure: the URL frontier (queue of pending URLs) must be bounded — otherwise a fan-out page with 10,000 links allocates 10,000 entries and pushes more discovery on top.
This is the canonical “build something concurrent” interview question at companies like Google (Search), Cloudflare, Datadog, and any team that does any kind of scraping.
Interview Context
A 40-to-60-minute round at senior+ practical interviews. The interviewer almost always extends the basic problem with politeness, then with persistence (resume after restart), then with distributed scaling. Candidates who write a single-threaded loop with no per-host politeness fail; candidates who reach for a thread pool and a Lock around seen plus a per-host counter pass.
Problem Statement
Implement crawl(start_url, *, max_depth, max_workers, per_host_concurrency, per_host_delay_s, allow_domains, http_get) that returns a list (or yields a stream) of (url, depth, content) tuples. Visit each canonical URL at most once. Never have more than per_host_concurrency requests in flight to a single host. Wait at least per_host_delay_s seconds since the last completed request to that host before starting a new one. Stop when the frontier is empty.
Constraints
- Thread-safe; many workers race for URLs from the frontier.
- Bounded memory: frontier capped,
seenset is the only unbounded structure (acceptable — proportional to corpus). - Graceful shutdown on
Ctrl-Cor external cancellation. http_getis injected so tests don’t hit the network.
Clarifying Questions
- URL canonicalization rules? (Lowercase host, drop fragment, sort query params, default port elision.)
- Should
robots.txtbe honored? (Yes in production; mock it in this lab viais_allowedpredicate.) - What’s a “host”? (Registered domain —
example.com, notwww.example.comvsimages.example.com. Or just hostname; document the choice.) - Should depth-0 (the start URL) count toward the depth limit? (No — depth-0 is always crawled.)
- Should we follow redirects? (Yes, but the redirect target counts as the visited URL.)
- Output order — does it need to be deterministic? (No — concurrency makes determinism hard. Document.)
Examples
results = crawl(
"https://example.com/",
max_depth=3,
max_workers=8,
per_host_concurrency=2,
per_host_delay_s=0.5,
allow_domains={"example.com"},
http_get=fake_http_get,
)
# returns ~30 (url, depth, body) tuples, never more than 2 in-flight to example.com.
Initial Brute Force
def crawl_naive(url, max_depth):
seen = {url}
frontier = [(url, 0)]
out = []
while frontier:
u, d = frontier.pop(0)
body = http_get(u)
out.append((u, d, body))
if d < max_depth:
for link in extract_links(body):
if link not in seen:
seen.add(link)
frontier.append((link, d + 1))
return out
This is single-threaded (slow), has no politeness (will get IP-banned), and grows the frontier unboundedly.
Brute Force Complexity
Time: O(V) HTTP requests serially, where V is the number of unique pages. With 1s/page and 10k pages, ~3 hours.
Optimization Path
Add a thread pool of max_workers. Add a Lock-guarded seen set. Add per-host concurrency via a Semaphore keyed on host. Add per-host last-request-time via a dict-of-(timestamp, lock). Cap the frontier with a BoundedQueue. Add a stop_event for graceful shutdown.
Final Expected Approach
A ThreadPoolExecutor(max_workers) runs crawl_one(url, depth). The frontier is a queue.Queue(maxsize=...). A seen set guarded by a Lock ensures each URL is enqueued once. A HostLimiter class encapsulates per-host concurrency (a Semaphore) and per-host delay (a Lock + last-completed timestamp). Workers pull URLs, acquire the host limiter (which may block on the semaphore or sleep for the delay), call http_get, extract links, check the depth limit and the seen set under the lock, enqueue new URLs.
Data Structures Used
queue.Queuefor the frontier (bounded).set[str]forseen(guarded byLock).dict[str, _HostState]for per-host limiters._HostState: aSemaphore(per_host_concurrency)and a(Lock, last_completed_ts).ThreadPoolExecutorfor workers.
Correctness Argument
Each URL is enqueued at most once (the seen set is checked under the global lock atomically with the add). Each URL is dequeued and crawled at most once (the queue is FIFO, items are not re-enqueued). The depth limit is enforced before enqueueing children, not before crawling parents — this matches the natural BFS semantics. Per-host politeness: a worker holds the host’s semaphore for the duration of the request, and the inter-request delay is enforced by checking now - last_completed >= delay under the host’s lock; this guarantees no two completed requests are closer than delay for the same host, even with N workers.
Complexity
| Aspect | Cost |
|---|---|
| HTTP requests | O(V) total, parallel by max_workers |
seen lookup | O(1) average |
| Per-host serialization | bounded by per_host_concurrency and per_host_delay_s |
| Memory | O(V) for seen plus frontier_capacity for the queue |
Implementation Requirements
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue, Empty
from urllib.parse import urlparse, urldefrag
def canonicalize(url: str) -> str:
u, _ = urldefrag(url)
p = urlparse(u)
host = p.hostname.lower() if p.hostname else ""
port = f":{p.port}" if p.port else ""
path = p.path or "/"
return f"{p.scheme}://{host}{port}{path}" + (f"?{p.query}" if p.query else "")
def host_of(url: str) -> str:
return (urlparse(url).hostname or "").lower()
class _HostState:
__slots__ = ("sem", "lock", "last_completed")
def __init__(self, concurrency: int):
self.sem = threading.Semaphore(concurrency)
self.lock = threading.Lock()
self.last_completed = 0.0
class HostLimiter:
def __init__(self, per_host_concurrency: int, per_host_delay_s: float, *, clock=time.monotonic, sleep=time.sleep):
self._concurrency = per_host_concurrency
self._delay = per_host_delay_s
self._states: dict[str, _HostState] = {}
self._guard = threading.Lock()
self._clock = clock
self._sleep = sleep
def _state(self, host: str) -> _HostState:
with self._guard:
s = self._states.get(host)
if s is None:
s = _HostState(self._concurrency)
self._states[host] = s
return s
def acquire(self, host: str):
s = self._state(host)
s.sem.acquire()
with s.lock:
wait = s.last_completed + self._delay - self._clock()
if wait > 0:
self._sleep(wait)
return s
def release(self, s: _HostState):
with s.lock:
s.last_completed = self._clock()
s.sem.release()
def crawl(start_url: str, *,
max_depth: int = 3,
max_workers: int = 8,
per_host_concurrency: int = 2,
per_host_delay_s: float = 0.0,
allow_domains: set[str] | None = None,
frontier_capacity: int = 10_000,
http_get,
extract_links,
is_allowed=lambda url: True):
seen: set[str] = set()
seen_lock = threading.Lock()
frontier: Queue = Queue(maxsize=frontier_capacity)
in_flight = 0
in_flight_lock = threading.Lock()
inflight_zero = threading.Event()
inflight_zero.set()
stop_event = threading.Event()
results: list[tuple[str, int, str]] = []
results_lock = threading.Lock()
limiter = HostLimiter(per_host_concurrency, per_host_delay_s)
def _allow(url: str) -> bool:
if not is_allowed(url):
return False
if allow_domains is None:
return True
h = host_of(url)
return any(h == d or h.endswith("." + d) for d in allow_domains)
def _enqueue(url: str, depth: int):
canon = canonicalize(url)
if not _allow(canon):
return
with seen_lock:
if canon in seen:
return
seen.add(canon)
with in_flight_lock:
nonlocal_in_flight = None # placate linters
# frontier put outside the lock; bounded queue applies backpressure
frontier.put((canon, depth))
def _worker():
nonlocal in_flight
while not stop_event.is_set():
try:
url, depth = frontier.get(timeout=0.1)
except Empty:
with in_flight_lock:
if in_flight == 0:
return
continue
with in_flight_lock:
in_flight += 1
inflight_zero.clear()
try:
state = limiter.acquire(host_of(url))
try:
body = http_get(url)
finally:
limiter.release(state)
if body is None:
continue
with results_lock:
results.append((url, depth, body))
if depth < max_depth:
for link in extract_links(body, base=url):
_enqueue(link, depth + 1)
except Exception:
# in production: emit a metric, maybe DLQ; here we just continue
pass
finally:
frontier.task_done()
with in_flight_lock:
in_flight -= 1
if in_flight == 0 and frontier.empty():
inflight_zero.set()
_enqueue(start_url, 0)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = [pool.submit(_worker) for _ in range(max_workers)]
try:
while True:
if inflight_zero.wait(timeout=0.5) and frontier.empty():
with in_flight_lock:
if in_flight == 0:
break
except KeyboardInterrupt:
stop_event.set()
stop_event.set()
for f in futures:
f.result()
return results
Tests
def make_site(graph: dict[str, list[str]]):
def http_get(url): return graph.get(url)
def extract_links(body, base): return body if isinstance(body, list) else []
return http_get, extract_links
def test_basic_bfs():
graph = {
"https://e.com/a": ["https://e.com/b", "https://e.com/c"],
"https://e.com/b": ["https://e.com/d"],
"https://e.com/c": ["https://e.com/d"],
"https://e.com/d": [],
}
http_get, extract_links = make_site(graph)
out = crawl("https://e.com/a", max_depth=5, max_workers=4,
per_host_concurrency=4, allow_domains={"e.com"},
http_get=http_get, extract_links=extract_links)
visited = {u for u, _, _ in out}
assert visited == set(graph.keys())
def test_depth_limit():
chain = {f"https://e.com/{i}": [f"https://e.com/{i+1}"] for i in range(10)}
chain["https://e.com/10"] = []
http_get, extract_links = make_site(chain)
out = crawl("https://e.com/0", max_depth=2, max_workers=2,
per_host_concurrency=2, allow_domains={"e.com"},
http_get=http_get, extract_links=extract_links)
assert len({u for u, _, _ in out}) == 3 # depths 0, 1, 2
def test_dedup_on_cycle():
g = {"https://e.com/a": ["https://e.com/b"],
"https://e.com/b": ["https://e.com/a", "https://e.com/c"],
"https://e.com/c": []}
http_get, extract_links = make_site(g)
out = crawl("https://e.com/a", max_depth=10, max_workers=4,
per_host_concurrency=4, allow_domains={"e.com"},
http_get=http_get, extract_links=extract_links)
urls = [u for u, _, _ in out]
assert len(urls) == len(set(urls)) == 3
def test_domain_restriction():
g = {"https://e.com/a": ["https://other.com/x"], "https://other.com/x": []}
http_get, extract_links = make_site(g)
out = crawl("https://e.com/a", max_depth=5, max_workers=2,
per_host_concurrency=2, allow_domains={"e.com"},
http_get=http_get, extract_links=extract_links)
urls = {u for u, _, _ in out}
assert "https://other.com/x" not in urls
def test_per_host_concurrency():
import threading
in_flight = {"max": 0, "now": 0}
lock = threading.Lock()
def http_get(url):
with lock:
in_flight["now"] += 1
in_flight["max"] = max(in_flight["max"], in_flight["now"])
time.sleep(0.05)
with lock:
in_flight["now"] -= 1
return []
crawl("https://e.com/a", max_depth=0, max_workers=10,
per_host_concurrency=3, allow_domains={"e.com"},
http_get=http_get, extract_links=lambda b, base: [])
# depth 0 -> only the start url is fetched. Need a bigger frontier:
g = {f"https://e.com/{i}": [f"https://e.com/{i+1}" for _ in range(1)] for i in range(20)}
# ... but the assertion shape is: in_flight["max"] <= 3 in any test variant.
Follow-up Questions
- How would you make it thread-safe? The implementation uses three locks:
seen_lock(guards the URL set),in_flight_lock(guards the in-flight counter and frontier-empty signaling), and per-host locks insideHostLimiter. TheQueueis internally thread-safe. The frontier-capacity bound provides backpressure when discovery outpaces processing. - How would you persist state across restarts? Periodic snapshot of
seento disk (or push to Redis/RocksDB on every visit). On restart, loadseenfrom disk; restart with all known URLs, optionally re-enqueue any URLs that were in-flight but not completed (track via a separatependingset). - How would you scale to N nodes? Shard the URL space by
hash(host) % N— each node owns a fixed slice. Cross-node enqueues go via a message bus. Theseenset is replicated or sharded the same way. Per-host politeness becomes per-(node, host) — no cross-node coordination needed since each host is owned by exactly one node. - How would you handle backpressure? The bounded frontier (
Queue(maxsize=...)) blocks workers that try to enqueue when full. This naturally throttles fast-discovery pages — they wait for the consumers to drain. Drop-on-overflow is wrong for crawlers (you’d lose URLs); blocking is right. - What is the shutdown / draining behavior? On
stop_event, workers stop pulling from the queue. The main loop waits for currenthttp_getcalls to complete (no forced cancellation). Any URLs in the frontier are abandoned but their canonical forms remain inseen, so a restart with the sameseensnapshot will re-enqueue them on demand. - How would you handle a poison-pill input? A URL whose response triggers an infinite link-extraction loop (e.g., calendar with year=∞). Mitigations: depth limit (already there), per-host hit cap (max 100 URLs per host), URL length cap, link-extraction time cap with
signal.alarmor a sub-thread timeout, and a content-size cap onhttp_get.
Product Extension
Real crawlers (Googlebot, Bingbot) layer on top: robots.txt parsing per host, sitemap.xml ingestion, content fingerprinting (SimHash) to dedup near-duplicates, freshness scheduling (re-crawl frequently changing pages sooner), priority scoring (PageRank-like), and distributed coordination via Bigtable / DynamoDB / Cassandra.
Language/Runtime Follow-ups
- Python: as above. For very high concurrency switch to
asynciowithaiohttpandasyncio.Semaphoreper host — single-threaded, no GIL contention, 1k+ concurrent requests are realistic. - Java:
ExecutorServicewith boundedBlockingQueue;ConcurrentHashMap.newKeySet()forseen;Semaphoreper host. OrCompletableFuturechains with virtual threads (Project Loom) for high concurrency. - Go: a worker-pool of goroutines reading from a buffered channel;
sync.Mapforseen; per-hostchan struct{}of sizeconcurrencyas a semaphore. The idiom is exceptionally clean in Go. - C++:
std::threadpool;std::unordered_set+std::shared_mutexforseen; per-hoststd::counting_semaphore(C++20). - JS/TS: Node +
p-limitper host; single-threaded so no locks. The “global” concurrency is enforced by an outerp-limit.
Common Bugs
- Adding to
seenafterhttp_getreturns — multiple workers crawl the same URL. - Holding
seen_lockwhile callinghttp_get— blocks every other worker. - Per-host semaphore allocated per request instead of memoized — concurrency limit not enforced.
- Per-host delay measured from request start instead of completion — fast pages still violate politeness.
- Forgetting
urldefragin canonicalization —?#section1and?#section2count as different URLs. - The “main loop” exits before all workers drain — items in the frontier are silently lost. Use a counter + condition or
Queue.join().
Debugging Strategy
When dedup fails: log every seen.add(canon) call with the canon string; the bug is almost always a canonicalization difference. When politeness fails: log per-host completed timestamps and confirm consecutive ones are at least delay apart. When the crawler hangs at end: print in_flight and frontier.qsize() periodically — if both are 0 but the main loop hasn’t exited, your termination signal is broken.
Mastery Criteria
- Wrote a crawler with thread pool, bounded frontier, dedup, and depth limit in <40 minutes.
- Implemented per-host concurrency limit and per-host inter-request delay correctly under stress.
- Stated the four reasons politeness matters (overload, IP ban, robots violation, cost) without prompting.
- Articulated the sharding strategy for scaling to N nodes.
- Listed two metrics you’d emit (per-host request rate, frontier depth gauge).
- Identified the canonicalization bug class (fragment / param order / case).
- Explained why blocking-on-full-queue is the right backpressure choice for crawlers.