Lab 14 — Persistent KV Store

Goal

Implement an in-memory key-value store with TTL, snapshot + write-ahead-log (WAL) persistence, and crash recovery. After this lab you should be able to design and implement a Redis-shaped local store in under 40 minutes and articulate the durability tradeoffs.

Background Concepts

A persistent KV store has two storage paths:

  1. In-memory state: a dict[key, value] (plus TTL bookkeeping). Hot path: O(1) read/write.
  2. Durable state: writes go to a WAL (append-only log of mutations); periodically a snapshot captures the current state. On boot, recovery = load latest snapshot + replay WAL since.

The four standard durability levels (each a different fsync policy):

  • No persistence: pure in-memory. Lost on crash.
  • WAL with no fsync: writes to OS buffer; lost on power-cut, survives process crash.
  • WAL fsync per write: durable per write, slow (one syscall per op).
  • WAL fsync every N ms: hybrid — bounded data loss in exchange for throughput.

Redis offers exactly these as appendfsync no/everysec/always. The interview answer is “explain the spectrum and pick a default that matches the workload”.

Interview Context

This problem hits at infrastructure / database companies and at any senior coding round that wants to test storage fundamentals. The interviewer wants: snapshot + WAL design, fsync tradeoff articulation, working code that survives a simulated crash.

Problem Statement

Implement KVStore:

  • put(key, value, ttl_seconds=None) — store with optional TTL.
  • get(key) -> value | None
  • delete(key) -> bool
  • snapshot(path) — write current state.
  • Recovery: on construction with wal_path and snapshot_path, replay snapshot + WAL.

Constraints

  • 10^7 keys
  • 10^5 ops / second
  • Crash recovery within seconds
  • Bounded memory (configurable max)

Clarifying Questions

  1. TTL granularity? (Seconds is fine for most workloads.)
  2. fsync policy? (Configurable: none / per-write / every-N-ms.)
  3. Snapshot format: text or binary? (Binary is faster, smaller; pick pickle or msgpack.)
  4. Concurrent reads during snapshot? (Often a follow-up; default block during snapshot.)
  5. Single-threaded or concurrent? (Single-threaded simplifies; lock for concurrency.)

Examples

kv = KVStore(wal_path="wal.log", snapshot_path="snap.pkl")
kv.put("user:1", {"name": "Alice"})
kv.put("session:42", "tok", ttl_seconds=60)
kv.get("user:1")             -> {"name": "Alice"}
# … crash …
kv2 = KVStore(wal_path="wal.log", snapshot_path="snap.pkl")
kv2.get("user:1")            -> {"name": "Alice"}    # recovered
kv2.snapshot()
# After snapshot: WAL is rotated (truncated)

Initial Brute Force

dict[key, value]. No persistence. Lost on crash.

Brute Force Complexity

Per op: O(1). Memory: O(N). Durability: zero.

Optimization Path

Add WAL: append (op, key, value, ttl) per mutation. On boot, replay. Add periodic snapshot: serialize full state; truncate WAL. Add TTL: a dict[key, expires_at] and lazy expiry on get.

The cost is: O(WAL append) per write (serialization + file write); O(snapshot size) per snapshot; O(WAL size) per recovery. Throughput depends on fsync policy.

Final Expected Approach

In-memory dict for values + dict for TTL deadlines + a binary log file. Operations: log first, then update memory (“write-ahead”). Snapshot: pickle the in-memory state to a temp file, then atomic rename + truncate WAL. Recovery: load snapshot, replay WAL with file-based offset.

Data Structures Used

StructurePurpose
dict[K, V]hot key-value store
dict[K, float]TTL deadlines
WAL file (append-only)durability
Snapshot file (pickle)bounded recovery time
Locksingle-threaded mutation under multi-thread access

Correctness Argument

Durability: every mutation is appended to the WAL before updating the in-memory state. After the WAL append (and fsync, if configured), the mutation is durable. On crash, recovery replays exactly what was logged.

Atomicity of put: WAL append is atomic at the bytestream level (write syscalls of small bytes are atomic on Linux for ≤ 4 KB). Snapshot is atomic via write to tmp; fsync tmp; rename(tmp, snap).

Recovery correctness: applying snapshot first, then replaying WAL entries in order, reconstructs exactly the pre-crash state. The only loss is mutations that were in OS buffers but unsynced at crash time — bounded by fsync policy.

TTL: lazy expiry on get (check now >= deadline, delete if so). This is correct as long as we don’t return values past their TTL. Stale entries in memory are GC’d on access; a periodic background sweeper handles unused expired entries.

Complexity

  • put: O(1) memory + O(log entry size) disk
  • get: O(1)
  • snapshot: O(N) state size
  • Recovery: O(snapshot + WAL since snapshot)

Implementation Requirements

import os, pickle, time, threading
from typing import Any, Optional

class KVStore:
    def __init__(self, wal_path: str = "kv.wal",
                 snapshot_path: str = "kv.snap",
                 fsync: str = "every_sec"):
        self._wal_path = wal_path
        self._snap_path = snapshot_path
        self._fsync = fsync     # "none" | "per_write" | "every_sec"
        self._data: dict = {}
        self._ttl: dict = {}
        self._lock = threading.RLock()
        self._wal_fp = None
        self._last_fsync = time.monotonic()
        self._recover()
        self._wal_fp = open(self._wal_path, "ab", buffering=0)
        if self._fsync == "every_sec":
            self._fsync_thread = threading.Thread(target=self._fsync_loop, daemon=True)
            self._fsync_thread.start()

    def _recover(self) -> None:
        # 1. Load snapshot if present
        if os.path.exists(self._snap_path):
            with open(self._snap_path, "rb") as f:
                self._data, self._ttl = pickle.load(f)
        # 2. Replay WAL since snapshot
        if os.path.exists(self._wal_path):
            with open(self._wal_path, "rb") as f:
                while True:
                    try:
                        entry = pickle.load(f)
                    except (EOFError, pickle.UnpicklingError):
                        break
                    self._apply(entry)
        # Sweep expired
        now = time.time()
        for k in list(self._ttl):
            if self._ttl[k] <= now:
                self._data.pop(k, None); self._ttl.pop(k, None)

    def _apply(self, entry: dict) -> None:
        op = entry["op"]
        if op == "put":
            self._data[entry["k"]] = entry["v"]
            if entry.get("ttl") is not None:
                self._ttl[entry["k"]] = entry["ttl"]
            else:
                self._ttl.pop(entry["k"], None)
        elif op == "del":
            self._data.pop(entry["k"], None)
            self._ttl.pop(entry["k"], None)

    def _wal_write(self, entry: dict) -> None:
        buf = pickle.dumps(entry)
        self._wal_fp.write(buf)
        if self._fsync == "per_write":
            self._wal_fp.flush()
            os.fsync(self._wal_fp.fileno())

    def _fsync_loop(self) -> None:
        while True:
            time.sleep(1.0)
            with self._lock:
                if self._wal_fp:
                    self._wal_fp.flush()
                    os.fsync(self._wal_fp.fileno())

    def put(self, key, value, ttl_seconds: Optional[float] = None) -> None:
        deadline = (time.time() + ttl_seconds) if ttl_seconds else None
        with self._lock:
            self._wal_write({"op": "put", "k": key, "v": value, "ttl": deadline})
            self._data[key] = value
            if deadline is not None:
                self._ttl[key] = deadline
            else:
                self._ttl.pop(key, None)

    def get(self, key) -> Any:
        with self._lock:
            deadline = self._ttl.get(key)
            if deadline is not None and time.time() >= deadline:
                self._wal_write({"op": "del", "k": key})
                self._data.pop(key, None); self._ttl.pop(key, None)
                return None
            return self._data.get(key)

    def delete(self, key) -> bool:
        with self._lock:
            existed = key in self._data
            self._wal_write({"op": "del", "k": key})
            self._data.pop(key, None); self._ttl.pop(key, None)
            return existed

    def snapshot(self) -> None:
        with self._lock:
            tmp = self._snap_path + ".tmp"
            with open(tmp, "wb") as f:
                pickle.dump((self._data, self._ttl), f)
                f.flush(); os.fsync(f.fileno())
            os.rename(tmp, self._snap_path)
            # Rotate WAL
            self._wal_fp.close()
            open(self._wal_path, "wb").close()
            self._wal_fp = open(self._wal_path, "ab", buffering=0)

    def close(self) -> None:
        with self._lock:
            if self._wal_fp:
                self._wal_fp.flush(); os.fsync(self._wal_fp.fileno())
                self._wal_fp.close(); self._wal_fp = None

Tests

import unittest, tempfile, os, time

class TestKV(unittest.TestCase):
    def setUp(self):
        self.tmp = tempfile.mkdtemp()
        self.wal = os.path.join(self.tmp, "wal.log")
        self.snap = os.path.join(self.tmp, "snap.pkl")

    def tearDown(self):
        import shutil; shutil.rmtree(self.tmp)

    def test_basic(self):
        kv = KVStore(self.wal, self.snap, fsync="none")
        kv.put("a", 1); kv.put("b", "two")
        self.assertEqual(kv.get("a"), 1)
        self.assertEqual(kv.get("b"), "two")
        kv.delete("a")
        self.assertIsNone(kv.get("a"))
        kv.close()

    def test_ttl(self):
        kv = KVStore(self.wal, self.snap, fsync="none")
        kv.put("k", "v", ttl_seconds=0.1)
        self.assertEqual(kv.get("k"), "v")
        time.sleep(0.15)
        self.assertIsNone(kv.get("k"))
        kv.close()

    def test_recovery_from_wal(self):
        kv = KVStore(self.wal, self.snap, fsync="per_write")
        kv.put("x", "y")
        kv.close()
        # Simulate crash and restart
        kv2 = KVStore(self.wal, self.snap, fsync="none")
        self.assertEqual(kv2.get("x"), "y")
        kv2.close()

    def test_snapshot_rotates_wal(self):
        kv = KVStore(self.wal, self.snap, fsync="none")
        for i in range(100):
            kv.put(f"k{i}", i)
        wal_size_before = os.path.getsize(self.wal)
        kv.snapshot()
        wal_size_after = os.path.getsize(self.wal)
        self.assertGreater(wal_size_before, wal_size_after)
        kv.close()
        # Recover
        kv2 = KVStore(self.wal, self.snap, fsync="none")
        self.assertEqual(kv2.get("k99"), 99)
        kv2.close()

Follow-up Questions

(2) Persist state across restarts? That’s what we built. The four fsync levels and their tradeoffs are the answer-bearing detail: per_write (durable per op, slow); every_sec (≤1 sec data loss, fast — Redis default); none (lose on crash, fastest).

(10) Consistency model? Linearizable in a single process under the lock. Across processes (or replicas), this becomes a consensus problem — Raft / Paxos. The KV store is the data plane; consensus is the control plane.

(8) Partial failure? Crash mid-write: _wal_write buffers a partial entry — pickle.UnpicklingError on recovery; we ignore the trailing junk (caught above). For OS-level partial writes (rare on Linux for ≤ 4 KB), a per-entry checksum (CRC32) catches them.

(9) Eviction / cleanup? TTL provides automatic cleanup, but expired keys still in memory consume RAM until accessed. Background sweeper: periodically scan _ttl for expired keys and delete. For unbounded growth, add an LRU/LFU policy on top: when memory > threshold, evict by policy.

(11) Configuration knobs? fsync policy, snapshot_interval, max_memory_bytes, eviction_policy. Knobs not to expose: pickle protocol (use latest).

(12) Shutdown? Graceful: flush WAL, fsync, close file. The close method ensures durability up to the last write.

Product Extension

Redis (RDB = snapshot, AOF = WAL); RocksDB / LevelDB (LSM trees with WAL + memtable + SSTable); Memcached (no persistence — pure cache); etcd / ZooKeeper (snapshot + WAL + Raft for consensus). The pattern you wrote here is the foundation; SSTable + LSM is the next-level optimization for write-heavy + range-query workloads.

Language/Runtime Follow-ups

  • Python: pickle is fine for the snapshot format but not version-safe; for production, use msgpack or Protocol Buffers.
  • Java: RandomAccessFile for the WAL; Java serialization for snapshot (also fragile — prefer Avro or Protobuf).
  • Go: bufio.Writer over os.File; gob for snapshot. BadgerDB and BoltDB are production-grade Go KV stores.
  • C++: write your own framing or use Cap’n Proto. RocksDB is the canonical reference (C++ implementation of LSM + WAL).
  • JS/TS: rare in Node; use level (LevelDB binding) instead of rolling your own.

Common Bugs

  1. Updating in-memory state before WAL append: lose the durability guarantee.
  2. fsync per write but on the wrong fd (forgetting flush() before fsync()).
  3. Snapshot writes to the actual snapshot path before fsync — if crash mid-write, snapshot is corrupt. Always write-tmp + fsync + rename.
  4. WAL not rotated on snapshot — recovery replays the entire history every time, even after snapshot.
  5. TTL stored as duration instead of absolute time — restart shifts deadlines.

Debugging Strategy

For “lost data after restart” bugs: tail the WAL with a pickle reader and check that the missing key was logged. For corrupt-snapshot bugs: check that os.rename is on the same filesystem (cross-fs rename is not atomic).

Mastery Criteria

  • Implemented KVStore with WAL + snapshot + recovery in <40 minutes.
  • All four tests pass.
  • Articulated three fsync levels and their tradeoffs without prompting.
  • Stated WAL-before-memory as the durability invariant.
  • Answered follow-ups #2, #8 (partial-write tolerance), #9, #10 (single vs replicated consistency), #11.
  • Compared snapshot+WAL vs LSM tree at a high level.