Lab 08 — Log Parser

Goal

Implement a streaming log parser that reads log lines (potentially gigabytes), extracts structured fields via regex, aggregates per-field counts, and emits structured output — all under bounded memory. After this lab you should be able to write a clean streaming text-processing class with bounded memory in under 25 minutes.

Background Concepts

Log parsing has two patterns: batch (load file, parse all, output) and streaming (read one line at a time, emit incremental output). The bar at senior interviews is the streaming variant because real production logs are too large to load — multi-gigabyte files where batch processing would OOM.

The two streaming primitives are:

  1. Line-by-line iteration with a generator (for line in file: in Python). Memory is O(line size), not O(file size).
  2. Bounded aggregation: when counting unique IPs over a 1 TB log, you cannot keep all distinct IPs in a dict. Bound the aggregation by either (a) sketch (HyperLogLog for distinct counts, count-min for top-K), or (b) “top-K with eviction” using a min-heap of size K.

The regex itself is mundane. The interview signal is the discipline of bounded memory and clean separation between parser, extractor, and aggregator.

Interview Context

Log parsing is a popular question at logging / observability companies (Datadog, Splunk, Honeycomb, Cribl, Elastic) and at any infrastructure company that processes high-volume telemetry. It tests streaming discipline, regex fluency, and bounded-memory awareness. It also exposes weak engineering: a candidate who writes lines = file.readlines() instantly fails the bounded-memory criterion.

Problem Statement

Design LogParser(pattern, top_k=10):

  • parse_stream(line_iter) -> Iterator[dict] — yield a dict per line with extracted named fields. Skip malformed lines (count them).
  • aggregate(line_iter) -> dict — return per-field top-K aggregates (e.g., top 10 IPs, top 10 paths, top 10 status codes). Bounded memory.

The regex is provided at construction; the parser must use named capture groups.

Constraints

  • Input file size: up to 100 GB
  • Aggregator memory: ≤ 100 MB
  • Target throughput: 50 MB/s on a single core

Clarifying Questions

  1. Is the log format known? (Yes — caller provides regex with named groups.)
  2. Malformed lines: skip, error, or quarantine? (Skip + count by default; quarantine optionally.)
  3. Aggregation: which fields, what kind (count, distinct, top-K)? (Caller specifies.)
  4. Time-series: are we computing per-time-window aggregates? (Optional; default is whole-stream.)
  5. Encoding: UTF-8? Binary? (UTF-8 default; binary is a follow-up.)

Examples

pattern = r'(?P<ip>\S+) - - \[(?P<ts>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) HTTP/[\d.]+" (?P<status>\d+) (?P<bytes>\d+)'
parser = LogParser(pattern, top_k=3)

for record in parser.parse_stream(open("access.log")):
    print(record)
# {'ip': '1.2.3.4', 'ts': '20/May/2026:12:00:00', 'method': 'GET', 'path': '/x', 'status': '200', 'bytes': '1234'}

agg = parser.aggregate(open("access.log"))
# {'ip': [('1.2.3.4', 1500), ('5.6.7.8', 1200), ...],
#  'path': [('/api', 5000), ('/login', 3000), ...],
#  'status': [('200', 90000), ('404', 4000), ('500', 200)],
#  'malformed': 12,
#  'total': 100000}

Initial Brute Force

open(file).read() then re.finditer. Loads everything; OOMs on 100 GB.

Brute Force Complexity

Memory: O(file size). At 100 GB, instant OOM on a 32 GB machine.

Optimization Path

Stream line-by-line with for line in file:. For aggregation, replace the unbounded dict[ip, count] with: (a) keep all counts during the stream because the unique key cardinality is what matters (often 100k unique IPs is fine), or (b) for very high cardinality, use HyperLogLog (HLL) for distinct counts and count-min sketch + min-heap for top-K. For most workloads at moderate cardinality, the dict is fine; at extreme cardinality, sketches are required.

Final Expected Approach

Compile the regex once. Stream lines via the iterator. For each line, match and yield the groupdict() if matched, else increment malformed count. Aggregator: per configured field, maintain a Counter (which is a dict); at end, most_common(K). For very high cardinality, switch to count-min + heap.

Data Structures Used

StructurePurpose
Compiled re.Patternmatch each line in O(line length)
Counter per fieldexact top-K within bounded cardinality
Min-heap of (count, key) of size Kbounded top-K when cardinality is unbounded
Counters for total, malformedobservability

Correctness Argument

Streaming: for line in file: reads at most one line buffer at a time. Memory is O(longest line + aggregator state). For 100 GB files with 1 KB lines, memory stays at ~aggregator-state size.

Aggregation: Counter.most_common(K) returns the exact top-K when all keys are tracked. When using a count-min sketch + bounded heap, the result is approximate with bounded error: actual_count ≤ estimate ≤ actual_count + ε · total with probability ≥ 1 − δ. We pick ε, δ to fit memory.

Complexity

  • Per line: O(L · regex-complexity) for parsing + O(F) for F fields aggregated
  • Total: O(N · L · regex)
  • Memory: O(unique keys per field) for exact aggregation; O(width × depth) for sketch

Implementation Requirements

import re
from collections import Counter
from typing import Iterator, Iterable, Optional

class LogParser:
    def __init__(self, pattern: str, top_k: int = 10,
                 aggregate_fields: Optional[list[str]] = None):
        self._re = re.compile(pattern)
        self._k = top_k
        self._fields = aggregate_fields  # None = aggregate all named groups

    def parse_stream(self, lines: Iterable[str]) -> Iterator[dict]:
        for line in lines:
            line = line.rstrip("\n")
            m = self._re.match(line)
            if m is None:
                continue
            yield m.groupdict()

    def aggregate(self, lines: Iterable[str]) -> dict:
        counters: dict[str, Counter] = {}
        total = malformed = 0
        for line in lines:
            line = line.rstrip("\n")
            total += 1
            m = self._re.match(line)
            if m is None:
                malformed += 1
                continue
            d = m.groupdict()
            fields = self._fields or list(d.keys())
            for f in fields:
                v = d.get(f)
                if v is None: continue
                counters.setdefault(f, Counter())[v] += 1
        out = {f: c.most_common(self._k) for f, c in counters.items()}
        out["total"] = total
        out["malformed"] = malformed
        return out


# Bounded-memory variant: top-K only via heap
import heapq
class BoundedTopK:
    """Approximate top-K using count-min sketch + min-heap of size K.

    For high-cardinality streams. Replace LogParser._field_counters with this.
    """
    def __init__(self, k: int, width: int = 2048, depth: int = 5):
        self._k = k
        self._w, self._d = width, depth
        import random
        self._table = [[0] * width for _ in range(depth)]
        # Independent hash seeds.
        self._seeds = [random.randint(1, 2**31 - 1) for _ in range(depth)]
        self._heap: list[tuple[int, str]] = []   # (count, key)
        self._in_heap: dict[str, int] = {}       # key -> count seen at insert

    def add(self, key: str) -> None:
        est = self._increment(key)
        if key in self._in_heap:
            # Best-effort: refresh heap entry. (Lazy: do nothing; entries are stale.)
            self._in_heap[key] = est
            return
        if len(self._heap) < self._k:
            heapq.heappush(self._heap, (est, key))
            self._in_heap[key] = est
            return
        if est > self._heap[0][0]:
            old_count, old_key = heapq.heappushpop(self._heap, (est, key))
            self._in_heap.pop(old_key, None)
            self._in_heap[key] = est

    def _increment(self, key: str) -> int:
        ests = []
        for i in range(self._d):
            j = (hash((self._seeds[i], key))) % self._w
            self._table[i][j] += 1
            ests.append(self._table[i][j])
        return min(ests)

    def top_k(self) -> list[tuple[str, int]]:
        return sorted(((k, c) for c, k in self._heap), key=lambda p: -p[1])

Tests

import unittest, io

LOG_PATTERN = (r'(?P<ip>\S+) - - \[(?P<ts>[^\]]+)\] '
               r'"(?P<method>\S+) (?P<path>\S+) HTTP/[\d.]+" '
               r'(?P<status>\d+) (?P<bytes>\d+)')

SAMPLE = """1.1.1.1 - - [01/Jan/2026:00:00:00 +0000] "GET /a HTTP/1.1" 200 100
2.2.2.2 - - [01/Jan/2026:00:00:01 +0000] "GET /b HTTP/1.1" 200 200
1.1.1.1 - - [01/Jan/2026:00:00:02 +0000] "POST /a HTTP/1.1" 500 0
malformed log line junk junk junk
1.1.1.1 - - [01/Jan/2026:00:00:03 +0000] "GET /a HTTP/1.1" 200 100
"""

class TestParser(unittest.TestCase):
    def test_parse_stream(self):
        p = LogParser(LOG_PATTERN, top_k=3)
        recs = list(p.parse_stream(io.StringIO(SAMPLE)))
        self.assertEqual(len(recs), 4)
        self.assertEqual(recs[0]["ip"], "1.1.1.1")
        self.assertEqual(recs[0]["status"], "200")

    def test_aggregate(self):
        p = LogParser(LOG_PATTERN, top_k=3, aggregate_fields=["ip", "status"])
        agg = p.aggregate(io.StringIO(SAMPLE))
        self.assertEqual(agg["total"], 5)
        self.assertEqual(agg["malformed"], 1)
        self.assertEqual(agg["ip"][0], ("1.1.1.1", 3))
        self.assertEqual(dict(agg["status"]), {"200": 3, "500": 1})

    def test_streaming_memory(self):
        # Generate a synthetic stream and ensure parse_stream is lazy
        def gen():
            for i in range(10000):
                yield f'1.1.1.1 - - [now] "GET /p{i % 100} HTTP/1.1" 200 100'
        p = LogParser(LOG_PATTERN)
        # consume one record at a time
        it = p.parse_stream(gen())
        first = next(it)
        self.assertEqual(first["path"], "/p0")

Follow-up Questions

(4) Observe / monitor? Throughput (lines/sec), parse error rate (malformed/total), per-field cardinality (gauge), p99 line size (latency surrogate). Alert on parse error rate spiking — usually means upstream changed the format.

(5) Tests? Unit on regex correctness with hand-crafted lines; property-based tests with random-line generators; smoke on a real prod-shaped sample (1 MB); large-input test that asserts memory stays bounded (tracemalloc.get_traced_memory() in Python).

(7) Backpressure? If the consumer of parse_stream is slow, the iterator naturally pauses — Python generators are pull-based. For the producer side (file reads), no backpressure issue. If shipping to a downstream like Kafka, buffer with a bounded queue and drop on full (with a counter).

(11) Configuration knobs? pattern, top_k, aggregate_fields, bounded_memory: bool (toggle exact vs sketch-based). Knobs not to expose: regex compilation cache.

(13) Poison pill? A line that takes O(catastrophic backtracking) on the regex (regex DoS via specific patterns). Mitigation: line length cap (skip lines > N bytes), regex timeout (Python: only available in regex package, not stdlib re), or pre-compile with anchors and avoid .* at the start.

Product Extension

Production systems use one of: logstash / fluentd (regex-based extraction with field rules), CloudWatch Logs Insights (column-based after extraction), Datadog Logs / Splunk (full pipeline with grok patterns and ingest-time enrichment). The data structure that powers most “top-K-over-stream” dashboards is count-min + heap; HLL powers distinct-count widgets; reservoir sampling powers “show me 100 random matching events”.

Language/Runtime Follow-ups

  • Python: re is fast enough for most logs but doesn’t compile to DFA — backtracking is a real risk. Use the regex package for timeout support. For raw speed, pyre2 (re2 binding) avoids backtracking entirely.
  • Java: Pattern.compile(...) once; reuse. Matcher is mutable per match. For very high throughput, RE2/J avoids backtracking.
  • Go: regexp package is RE2-based — guaranteed linear time, no catastrophic backtracking. Idiomatic for log parsing.
  • C++: std::regex is slow; prefer Boost.Regex or PCRE2 in production.
  • JS/TS: V8’s regex is backtracking; same DoS concern as Python’s re. Node has no built-in regex timeout.

Common Bugs

  1. Loading the file: open(f).readlines() or f.read().split("\n") — instant OOM on large files.
  2. Recompiling the regex per line — 100x slowdown.
  3. Forgetting to strip \n — the last named group captures \n and breaks comparisons.
  4. Using .* greedily inside the pattern — catastrophic backtracking on long lines.
  5. Aggregator dict grows unbounded on high-cardinality fields (e.g., user-agent string with version churn). Cap or use sketch.

Debugging Strategy

For parse failures: print the first 5 malformed lines and inspect the regex against them. For wrong field values: print m.groupdict() of one matching line. For OOM: tracemalloc.start(); ...; print(tracemalloc.get_traced_memory()) at intervals — find the structure that grows. For slowness: cProfile and check whether the hot spot is regex match or dict update.

Mastery Criteria

  • Implemented streaming LogParser with bounded aggregation in <25 minutes.
  • All three tests pass first run.
  • Stated for line in file: lazy iteration without prompting.
  • Explained when to switch from Counter to count-min + heap (when unique-key memory exceeds budget).
  • Answered follow-ups #4, #7, #11, #13 (regex DoS) crisply.
  • Identified backtracking risk in user-supplied regexes.