Lab 08 — Log Parser
Goal
Implement a streaming log parser that reads log lines (potentially gigabytes), extracts structured fields via regex, aggregates per-field counts, and emits structured output — all under bounded memory. After this lab you should be able to write a clean streaming text-processing class with bounded memory in under 25 minutes.
Background Concepts
Log parsing has two patterns: batch (load file, parse all, output) and streaming (read one line at a time, emit incremental output). The bar at senior interviews is the streaming variant because real production logs are too large to load — multi-gigabyte files where batch processing would OOM.
The two streaming primitives are:
- Line-by-line iteration with a generator (
for line in file:in Python). Memory is O(line size), not O(file size). - Bounded aggregation: when counting unique IPs over a 1 TB log, you cannot keep all distinct IPs in a
dict. Bound the aggregation by either (a) sketch (HyperLogLog for distinct counts, count-min for top-K), or (b) “top-K with eviction” using a min-heap of size K.
The regex itself is mundane. The interview signal is the discipline of bounded memory and clean separation between parser, extractor, and aggregator.
Interview Context
Log parsing is a popular question at logging / observability companies (Datadog, Splunk, Honeycomb, Cribl, Elastic) and at any infrastructure company that processes high-volume telemetry. It tests streaming discipline, regex fluency, and bounded-memory awareness. It also exposes weak engineering: a candidate who writes lines = file.readlines() instantly fails the bounded-memory criterion.
Problem Statement
Design LogParser(pattern, top_k=10):
parse_stream(line_iter) -> Iterator[dict]— yield a dict per line with extracted named fields. Skip malformed lines (count them).aggregate(line_iter) -> dict— return per-field top-K aggregates (e.g., top 10 IPs, top 10 paths, top 10 status codes). Bounded memory.
The regex is provided at construction; the parser must use named capture groups.
Constraints
- Input file size: up to 100 GB
- Aggregator memory: ≤ 100 MB
- Target throughput: 50 MB/s on a single core
Clarifying Questions
- Is the log format known? (Yes — caller provides regex with named groups.)
- Malformed lines: skip, error, or quarantine? (Skip + count by default; quarantine optionally.)
- Aggregation: which fields, what kind (count, distinct, top-K)? (Caller specifies.)
- Time-series: are we computing per-time-window aggregates? (Optional; default is whole-stream.)
- Encoding: UTF-8? Binary? (UTF-8 default; binary is a follow-up.)
Examples
pattern = r'(?P<ip>\S+) - - \[(?P<ts>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) HTTP/[\d.]+" (?P<status>\d+) (?P<bytes>\d+)'
parser = LogParser(pattern, top_k=3)
for record in parser.parse_stream(open("access.log")):
print(record)
# {'ip': '1.2.3.4', 'ts': '20/May/2026:12:00:00', 'method': 'GET', 'path': '/x', 'status': '200', 'bytes': '1234'}
agg = parser.aggregate(open("access.log"))
# {'ip': [('1.2.3.4', 1500), ('5.6.7.8', 1200), ...],
# 'path': [('/api', 5000), ('/login', 3000), ...],
# 'status': [('200', 90000), ('404', 4000), ('500', 200)],
# 'malformed': 12,
# 'total': 100000}
Initial Brute Force
open(file).read() then re.finditer. Loads everything; OOMs on 100 GB.
Brute Force Complexity
Memory: O(file size). At 100 GB, instant OOM on a 32 GB machine.
Optimization Path
Stream line-by-line with for line in file:. For aggregation, replace the unbounded dict[ip, count] with: (a) keep all counts during the stream because the unique key cardinality is what matters (often 100k unique IPs is fine), or (b) for very high cardinality, use HyperLogLog (HLL) for distinct counts and count-min sketch + min-heap for top-K. For most workloads at moderate cardinality, the dict is fine; at extreme cardinality, sketches are required.
Final Expected Approach
Compile the regex once. Stream lines via the iterator. For each line, match and yield the groupdict() if matched, else increment malformed count. Aggregator: per configured field, maintain a Counter (which is a dict); at end, most_common(K). For very high cardinality, switch to count-min + heap.
Data Structures Used
| Structure | Purpose |
|---|---|
Compiled re.Pattern | match each line in O(line length) |
Counter per field | exact top-K within bounded cardinality |
| Min-heap of (count, key) of size K | bounded top-K when cardinality is unbounded |
Counters for total, malformed | observability |
Correctness Argument
Streaming: for line in file: reads at most one line buffer at a time. Memory is O(longest line + aggregator state). For 100 GB files with 1 KB lines, memory stays at ~aggregator-state size.
Aggregation: Counter.most_common(K) returns the exact top-K when all keys are tracked. When using a count-min sketch + bounded heap, the result is approximate with bounded error: actual_count ≤ estimate ≤ actual_count + ε · total with probability ≥ 1 − δ. We pick ε, δ to fit memory.
Complexity
- Per line: O(L · regex-complexity) for parsing + O(F) for F fields aggregated
- Total: O(N · L · regex)
- Memory: O(unique keys per field) for exact aggregation; O(width × depth) for sketch
Implementation Requirements
import re
from collections import Counter
from typing import Iterator, Iterable, Optional
class LogParser:
def __init__(self, pattern: str, top_k: int = 10,
aggregate_fields: Optional[list[str]] = None):
self._re = re.compile(pattern)
self._k = top_k
self._fields = aggregate_fields # None = aggregate all named groups
def parse_stream(self, lines: Iterable[str]) -> Iterator[dict]:
for line in lines:
line = line.rstrip("\n")
m = self._re.match(line)
if m is None:
continue
yield m.groupdict()
def aggregate(self, lines: Iterable[str]) -> dict:
counters: dict[str, Counter] = {}
total = malformed = 0
for line in lines:
line = line.rstrip("\n")
total += 1
m = self._re.match(line)
if m is None:
malformed += 1
continue
d = m.groupdict()
fields = self._fields or list(d.keys())
for f in fields:
v = d.get(f)
if v is None: continue
counters.setdefault(f, Counter())[v] += 1
out = {f: c.most_common(self._k) for f, c in counters.items()}
out["total"] = total
out["malformed"] = malformed
return out
# Bounded-memory variant: top-K only via heap
import heapq
class BoundedTopK:
"""Approximate top-K using count-min sketch + min-heap of size K.
For high-cardinality streams. Replace LogParser._field_counters with this.
"""
def __init__(self, k: int, width: int = 2048, depth: int = 5):
self._k = k
self._w, self._d = width, depth
import random
self._table = [[0] * width for _ in range(depth)]
# Independent hash seeds.
self._seeds = [random.randint(1, 2**31 - 1) for _ in range(depth)]
self._heap: list[tuple[int, str]] = [] # (count, key)
self._in_heap: dict[str, int] = {} # key -> count seen at insert
def add(self, key: str) -> None:
est = self._increment(key)
if key in self._in_heap:
# Best-effort: refresh heap entry. (Lazy: do nothing; entries are stale.)
self._in_heap[key] = est
return
if len(self._heap) < self._k:
heapq.heappush(self._heap, (est, key))
self._in_heap[key] = est
return
if est > self._heap[0][0]:
old_count, old_key = heapq.heappushpop(self._heap, (est, key))
self._in_heap.pop(old_key, None)
self._in_heap[key] = est
def _increment(self, key: str) -> int:
ests = []
for i in range(self._d):
j = (hash((self._seeds[i], key))) % self._w
self._table[i][j] += 1
ests.append(self._table[i][j])
return min(ests)
def top_k(self) -> list[tuple[str, int]]:
return sorted(((k, c) for c, k in self._heap), key=lambda p: -p[1])
Tests
import unittest, io
LOG_PATTERN = (r'(?P<ip>\S+) - - \[(?P<ts>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) HTTP/[\d.]+" '
r'(?P<status>\d+) (?P<bytes>\d+)')
SAMPLE = """1.1.1.1 - - [01/Jan/2026:00:00:00 +0000] "GET /a HTTP/1.1" 200 100
2.2.2.2 - - [01/Jan/2026:00:00:01 +0000] "GET /b HTTP/1.1" 200 200
1.1.1.1 - - [01/Jan/2026:00:00:02 +0000] "POST /a HTTP/1.1" 500 0
malformed log line junk junk junk
1.1.1.1 - - [01/Jan/2026:00:00:03 +0000] "GET /a HTTP/1.1" 200 100
"""
class TestParser(unittest.TestCase):
def test_parse_stream(self):
p = LogParser(LOG_PATTERN, top_k=3)
recs = list(p.parse_stream(io.StringIO(SAMPLE)))
self.assertEqual(len(recs), 4)
self.assertEqual(recs[0]["ip"], "1.1.1.1")
self.assertEqual(recs[0]["status"], "200")
def test_aggregate(self):
p = LogParser(LOG_PATTERN, top_k=3, aggregate_fields=["ip", "status"])
agg = p.aggregate(io.StringIO(SAMPLE))
self.assertEqual(agg["total"], 5)
self.assertEqual(agg["malformed"], 1)
self.assertEqual(agg["ip"][0], ("1.1.1.1", 3))
self.assertEqual(dict(agg["status"]), {"200": 3, "500": 1})
def test_streaming_memory(self):
# Generate a synthetic stream and ensure parse_stream is lazy
def gen():
for i in range(10000):
yield f'1.1.1.1 - - [now] "GET /p{i % 100} HTTP/1.1" 200 100'
p = LogParser(LOG_PATTERN)
# consume one record at a time
it = p.parse_stream(gen())
first = next(it)
self.assertEqual(first["path"], "/p0")
Follow-up Questions
(4) Observe / monitor? Throughput (lines/sec), parse error rate (malformed/total), per-field cardinality (gauge), p99 line size (latency surrogate). Alert on parse error rate spiking — usually means upstream changed the format.
(5) Tests? Unit on regex correctness with hand-crafted lines; property-based tests with random-line generators; smoke on a real prod-shaped sample (1 MB); large-input test that asserts memory stays bounded (tracemalloc.get_traced_memory() in Python).
(7) Backpressure? If the consumer of parse_stream is slow, the iterator naturally pauses — Python generators are pull-based. For the producer side (file reads), no backpressure issue. If shipping to a downstream like Kafka, buffer with a bounded queue and drop on full (with a counter).
(11) Configuration knobs? pattern, top_k, aggregate_fields, bounded_memory: bool (toggle exact vs sketch-based). Knobs not to expose: regex compilation cache.
(13) Poison pill? A line that takes O(catastrophic backtracking) on the regex (regex DoS via specific patterns). Mitigation: line length cap (skip lines > N bytes), regex timeout (Python: only available in regex package, not stdlib re), or pre-compile with anchors and avoid .* at the start.
Product Extension
Production systems use one of: logstash / fluentd (regex-based extraction with field rules), CloudWatch Logs Insights (column-based after extraction), Datadog Logs / Splunk (full pipeline with grok patterns and ingest-time enrichment). The data structure that powers most “top-K-over-stream” dashboards is count-min + heap; HLL powers distinct-count widgets; reservoir sampling powers “show me 100 random matching events”.
Language/Runtime Follow-ups
- Python:
reis fast enough for most logs but doesn’t compile to DFA — backtracking is a real risk. Use theregexpackage for timeout support. For raw speed,pyre2(re2 binding) avoids backtracking entirely. - Java:
Pattern.compile(...)once; reuse.Matcheris mutable per match. For very high throughput, RE2/J avoids backtracking. - Go:
regexppackage is RE2-based — guaranteed linear time, no catastrophic backtracking. Idiomatic for log parsing. - C++:
std::regexis slow; prefer Boost.Regex or PCRE2 in production. - JS/TS: V8’s regex is backtracking; same DoS concern as Python’s
re. Node has no built-in regex timeout.
Common Bugs
- Loading the file:
open(f).readlines()orf.read().split("\n")— instant OOM on large files. - Recompiling the regex per line — 100x slowdown.
- Forgetting to strip
\n— the last named group captures\nand breaks comparisons. - Using
.*greedily inside the pattern — catastrophic backtracking on long lines. - Aggregator dict grows unbounded on high-cardinality fields (e.g., user-agent string with version churn). Cap or use sketch.
Debugging Strategy
For parse failures: print the first 5 malformed lines and inspect the regex against them. For wrong field values: print m.groupdict() of one matching line. For OOM: tracemalloc.start(); ...; print(tracemalloc.get_traced_memory()) at intervals — find the structure that grows. For slowness: cProfile and check whether the hot spot is regex match or dict update.
Mastery Criteria
-
Implemented streaming
LogParserwith bounded aggregation in <25 minutes. - All three tests pass first run.
-
Stated
for line in file:lazy iteration without prompting. -
Explained when to switch from
Counterto count-min + heap (when unique-key memory exceeds budget). - Answered follow-ups #4, #7, #11, #13 (regex DoS) crisply.
- Identified backtracking risk in user-supplied regexes.