Replication

Replication Architecture

Marmot v2 implements a Percolator-style Two-Phase Commit (2PC) protocol for distributed SQLite replication. This document provides comprehensive technical details on the architecture, data flows, and edge case handling.

Design Philosophy

  • CDC-first: Row-level Change Data Capture, not SQL statement replay
  • Leaderless: Any node can coordinate writes (no master election)
  • Quorum-based: Configurable consistency levels (ONE, QUORUM, ALL)
  • Eventually consistent: Anti-entropy background sync repairs divergence
  • Full replication: All nodes receive all data (no sharding)

Running Example: Users Table

Throughout this document, we'll use a consistent example to trace data through all stages:

-- Schema on all 3 nodes (Node 1, Node 2, Node 3)
CREATE TABLE users (
    id INTEGER PRIMARY KEY,
    email TEXT UNIQUE,
    name TEXT,
    balance INTEGER DEFAULT 0
);
 
-- Initial data (already replicated)
-- id=1: alice@example.com, Alice, balance=100
-- id=2: bob@example.com, Bob, balance=50

Scenario: Node 1 executes UPDATE users SET balance = 75 WHERE id = 1

We'll trace this UPDATE through CDC capture, 2PC protocol, conflict detection, and recovery scenarios.


Two-Phase Commit Protocol

Marmot's 2PC differs from traditional database 2PC by using write intents for conflict detection and CDC replay for deterministic replication.

Phase 1: PREPARE

  1. Coordinator executes locally with preupdate hooks capturing CDC data
  2. Write intents created for each affected row (distributed locks)
  3. CDC entries stored in MetaStore (msgpack-serialized row data)
  4. Broadcast to replicas with CDC data (not SQL)
  5. Replicas create write intents and store CDC entries
  6. Return SUCCESS or CONFLICT to coordinator
Coordinator                      Replica Nodes
==========                       =============

Execute with CDC hooks

Create write intents

ReplicateTransaction(PREPARE) →  Create write intents
                                 Store CDC entries
                              ←  Return: Success/Conflict

Phase 2: COMMIT

  1. Wait for quorum of PREPARE responses
  2. Send COMMIT to remote nodes first (not local)
  3. Wait for remote quorum acknowledgment
  4. Commit locally only after remote quorum achieved
  5. Cleanup: Delete write intents and CDC entries
Coordinator                      Replica Nodes
==========                       =============

Remote quorum PREPARE achieved ✓

ReplicateTransaction(COMMIT) →   Apply CDC entries to SQLite
                                 Mark transaction COMMITTED
                              ←  Return: CommitTS

Remote quorum COMMIT achieved ✓

Commit locally

Cleanup intents

Example: 2PC for UPDATE users SET balance = 75 WHERE id = 1

Step 1: Node 1 (Coordinator) executes locally

Time: T1 (TxnID = 0x0001A2B3C4D50001)
      Physical: 1736000000000ms | NodeID: 1 | Logical: 1

Node 1 executes UPDATE on hookDB connection
  → Preupdate hook fires: SQLITE_UPDATE on "users"
  → Captures: oldValues={balance: 100}, newValues={balance: 75}

Step 2: Coordinator creates write intent

MetaStore Write on Node 1:
  Key:   /intent/users/users:1
  Value: {
    TxnID: 0x0001A2B3C4D50001,
    NodeID: 1,
    StartTS: {Wall: 1736000000000000000, Logical: 1},
    IntentType: UPDATE,
    MarkedForCleanup: false
  }

Step 3: Broadcast PREPARE to Node 2 and Node 3

gRPC ReplicateTransaction (parallel to both):
{
  Phase: PREPARE,
  TxnID: 0x0001A2B3C4D50001,
  NodeID: 1,
  Database: "mydb",
  CDCEntries: [{
    Seq: 1,
    Operation: UPDATE,
    Table: "users",
    IntentKey: "users:1",
    OldValues: {"id": [0x01], "email": [...], "name": [...], "balance": [0x64]},  // 100
    NewValues: {"id": [0x01], "email": [...], "name": [...], "balance": [0x4B]}   // 75
  }]
}

Step 4: Node 2 and Node 3 process PREPARE

Node 2:
  1. Check Cuckoo filter for "users:1" → MISS (no conflict)
  2. Create write intent: /intent/users/users:1
  3. Store CDC entry: /cdc/0x0001A2B3C4D50001/1
  4. Return: {Status: SUCCESS, PrepareTS: ...}

Node 3:
  1. Check Cuckoo filter for "users:1" → MISS (no conflict)
  2. Create write intent: /intent/users/users:1
  3. Store CDC entry: /cdc/0x0001A2B3C4D50001/1
  4. Return: {Status: SUCCESS, PrepareTS: ...}

Step 5: Quorum achieved (2/3), send COMMIT

Node 1 received 2 SUCCESS responses (quorum = 2 for 3-node cluster)

gRPC ReplicateTransaction to Node 2, Node 3:
{
  Phase: COMMIT,
  TxnID: 0x0001A2B3C4D50001,
  CommitTS: {Wall: 1736000000005000000, Logical: 0}
}

Step 6: Replicas apply CDC and commit

Node 2:
  1. Read CDC entry from /cdc/0x0001A2B3C4D50001/1
  2. Generate SQL: UPDATE users SET id=1, email='alice@example.com',
                   name='Alice', balance=75 WHERE id=1
  3. Execute on SQLite
  4. Update txn record: Status=COMMITTED
  5. Delete write intent (background)
  6. Return: {CommitTS: ...}

Node 3: (same process)

Step 7: Coordinator commits locally

Node 1:
  1. Remote quorum (2/3) committed ✓
  2. Apply own CDC entry to SQLite
  3. Update txn record: Status=COMMITTED
  4. Cleanup write intents
  5. Return success to client

Why Coordinator Commits Last

If remote quorum fails after local commit, the coordinator would be in an inconsistent state. By committing remotely first, abort is always safe.

Quorum Calculation

Critical: Quorum uses total cluster membership, not just alive nodes.

Quorum = floor(TotalMembership / 2) + 1
Cluster SizeQuorumSurvives
3 nodes21 failure
5 nodes32 failures
7 nodes43 failures

Split-brain prevention: In a 6-node cluster split 3x3:

  • Using alive nodes: Each partition sees N=3, quorum=2 → both write (dangerous)
  • Using total membership: N=6, quorum=4 → neither writes (safe)

CDC (Change Data Capture) Architecture

Why CDC Instead of SQL Replay

SQL ReplayCDC Replay
Non-deterministic (UUID, RANDOM, NOW)Exact byte-for-byte values
Parsing differences between dialectsBinary msgpack encoding
Auto-increment sequence conflictsCoordinator assigns distributed IDs
Slower (parse + execute)Faster (direct row application)

CDC Capture Flow

Row changes are captured via SQLite's preupdate hook API:

User Query (INSERT/UPDATE/DELETE)

Execute on dedicated hookDB connection

Preupdate hook fires BEFORE each row modification

Extract: table, intentKey, oldValues, newValues

Serialize to msgpack (type-preserving)

Store in MetaStore: /cdc/{txnID}/{seq}

Release hookDB before 2PC broadcast

CDC Entry Format

type IntentEntry struct {
    TxnID     uint64              // Transaction ID
    Seq       uint64              // Sequence within transaction
    Operation uint8               // INSERT=0, REPLACE=1, UPDATE=2, DELETE=3
    Table     string              // Table name
    IntentKey string              // Serialized primary key
    OldValues map[string][]byte   // Before image (msgpack per column)
    NewValues map[string][]byte   // After image (msgpack per column)
}

Intent Key Format

Single PK: "table:value" or "table:b64:base64_value"

Composite PK: "table:c:b64_col1:b64_col2:..." (columns sorted alphabetically)

NULL handling: "table:\x00NULL\x00" sentinel value

CDC Application on Replicas

OperationSQL Generated
INSERTINSERT OR REPLACE INTO table (...) VALUES (...)
UPDATEUPDATE table SET ... WHERE pk=... using OldValues
DELETEDELETE FROM table WHERE pk=... using OldValues

Example: CDC Entry for Our UPDATE

When Node 1 executes UPDATE users SET balance = 75 WHERE id = 1:

Raw CDC Entry (msgpack-serialized):

Key: /cdc/0x0001A2B3C4D50001/1

Value (IntentEntry):
{
  TxnID:     0x0001A2B3C4D50001,
  Seq:       1,
  Operation: 2,                    // UPDATE
  Table:     "users",
  IntentKey: "users:1",            // Single PK format
  OldValues: {
    "id":      0x91 0x01,          // msgpack fixint: 1
    "email":   0xB1 "alice@example.com",  // msgpack fixstr
    "name":    0xA5 "Alice",       // msgpack fixstr
    "balance": 0xCC 0x64           // msgpack uint8: 100
  },
  NewValues: {
    "id":      0x91 0x01,          // msgpack fixint: 1
    "email":   0xB1 "alice@example.com",
    "name":    0xA5 "Alice",
    "balance": 0xCC 0x4B           // msgpack uint8: 75
  }
}

Generated SQL on Replicas:

-- Replicas execute this (generated from CDC, not original SQL):
UPDATE users SET
  id = 1,
  email = 'alice@example.com',
  name = 'Alice',
  balance = 75
WHERE id = 1

Example: CDC for INSERT and DELETE

INSERT INSERT INTO users (id, email, name, balance) VALUES (3, 'carol@example.com', 'Carol', 200):

CDC Entry:
{
  Operation: 0,                    // INSERT
  Table:     "users",
  IntentKey: "users:3",
  OldValues: {},                   // Empty for INSERT
  NewValues: {
    "id": 3, "email": "carol@example.com", "name": "Carol", "balance": 200
  }
}

Generated SQL: INSERT OR REPLACE INTO users (id, email, name, balance)
               VALUES (3, 'carol@example.com', 'Carol', 200)

DELETE DELETE FROM users WHERE id = 2:

CDC Entry:
{
  Operation: 3,                    // DELETE
  Table:     "users",
  IntentKey: "users:2",
  OldValues: {
    "id": 2, "email": "bob@example.com", "name": "Bob", "balance": 50
  },
  NewValues: {}                    // Empty for DELETE
}

Generated SQL: DELETE FROM users WHERE id = 2

Example: Composite Primary Key

CREATE TABLE order_items (
  order_id INTEGER,
  item_id INTEGER,
  quantity INTEGER,
  PRIMARY KEY (order_id, item_id)
);
 
UPDATE order_items SET quantity = 5 WHERE order_id = 100 AND item_id = 42;
IntentKey Format: "order_items:c:MTAw:NDI="
                  ^table       ^composite marker
                                ^base64(order_id=100):base64(item_id=42)
                  (columns sorted alphabetically: item_id, order_id)

Write Intents & Conflict Detection

Write Intent Structure

Write intents are distributed locks stored in MetaStore:

Key:   /intent/{tableName}/{intentKey}
Value: WriteIntentRecord {
    TxnID, NodeID, StartTS, IntentType,
    Statement, DataSnapshot, MarkedForCleanup
}

Index: /intent_txn/{txnID}/{tableName}/{intentKey}  // For cleanup

Cuckoo Filter Fast Path

A Cuckoo filter provides O(1) conflict detection:

ParameterValue
Buckets250,000
Bucket size4 entries
Fingerprint32-bit
Capacity~1 million
False positive rate~2.3×10⁻¹⁰

Conflict check flow:

WriteIntent(table, intentKey)

hash = XXH64(table + ":" + intentKey)

filter.Check(hash)

  ├─ MISS → Fast path: write directly to Pebble

  └─ HIT → Slow path: lookup actual intent in Pebble
           ├─ No intent found → False positive, proceed
           ├─ Same txnID → Update in-place
           └─ Different txnID → Check if overwritable:
               ├─ MarkedForCleanup=true → Overwrite
               ├─ Status=COMMITTED/ABORTED → Overwrite
               ├─ Heartbeat timeout → Overwrite (stale)
               └─ Active transaction → Return CONFLICT

Sharded Mutexes

256 sharded mutexes prevent TOCTOU races:

func intentLockFor(table, intentKey string) *sync.Mutex {
    return &intentLocks[xxhash.Sum64String(table+":"+intentKey) % 256]
}

Example: Conflict Detection Scenarios

Using our users table, here are concrete conflict scenarios:

Scenario 1: No Conflict (Different Rows)

Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
         → Creates intent for "users:1"

Time T2: Node 2 starts UPDATE users SET balance = 30 WHERE id = 2
         → Cuckoo filter check for "users:2" → MISS
         → Creates intent for "users:2"
         → Both transactions succeed (different rows)

Scenario 2: Conflict (Same Row, Different Nodes)

Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
         TxnID: 0x0001A2B3C4D50001
         → Creates intent: /intent/users/users:1 → {TxnID: 0x0001A2B3C4D50001}
         → Adds to Cuckoo filter: hash("users:1")

Time T2: Node 2 starts UPDATE users SET balance = 50 WHERE id = 1
         TxnID: 0x0001A2B3C4D60001
         → Cuckoo filter check for "users:1" → HIT
         → Lookup Pebble: /intent/users/users:1
         → Found intent with different TxnID (0x0001A2B3C4D50001)
         → Check if overwritable:
           - MarkedForCleanup = false
           - Status = PENDING (not COMMITTED/ABORTED)
           - LastHeartbeat = recent (not stale)
         → Return CONFLICT to Node 2
         → Node 2 aborts and retries after backoff

Scenario 3: Stale Intent (Coordinator Crashed)

Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
         → Creates intent: /intent/users/users:1
         → Node 1 crashes before COMMIT

Time T2: (10 seconds later, heartbeat timeout)
         GC on Node 2 scans pending transactions
         → Finds intent with LastHeartbeat > 10s ago
         → Marks intent for cleanup

Time T3: Node 3 starts UPDATE users SET balance = 50 WHERE id = 1
         → Cuckoo filter check → HIT
         → Lookup Pebble → Found intent
         → Check: MarkedForCleanup = true
         → Overwrite allowed → Creates new intent
         → Transaction succeeds

Scenario 4: Same Transaction Re-entry

Time T1: Node 1 PREPARE phase creates intent for "users:1"
         TxnID: 0x0001A2B3C4D50001

Time T2: Network hiccup, Node 1 retries PREPARE
         → Cuckoo filter check → HIT
         → Lookup Pebble → Found intent with SAME TxnID
         → Update in-place (idempotent)
         → No conflict

Scenario 5: Multi-Row Transaction Partial Conflict

-- Node 1 executes:
BEGIN;
UPDATE users SET balance = balance - 25 WHERE id = 1;
UPDATE users SET balance = balance + 25 WHERE id = 2;
COMMIT;
Time T1: Node 1 creates intents for both rows:
         → /intent/users/users:1 → TxnID: 0x...0001
         → /intent/users/users:2 → TxnID: 0x...0001

Time T2: Node 2 tries UPDATE users SET name = 'Robert' WHERE id = 2
         → Cuckoo filter check for "users:2" → HIT
         → Conflict detected!
         → Node 2 returns CONFLICT
         → Node 2's client retries after Node 1 commits

Storage Layer (MetaStore)

PebbleDB Architecture

Each database has a separate MetaStore (CockroachDB Pebble):

data_dir/
├── mydb.db                    # SQLite database
├── mydb.db-wal               # SQLite WAL
├── mydb_meta.pebble/         # PebbleDB MetaStore
│   ├── MANIFEST-*
│   ├── CURRENT
│   └── *.sst                 # LSM-tree files
└── __marmot_system.db        # System database
    └── __marmot_system_meta.pebble/

Key Namespace

PrefixPurpose
/txn/{txnID}Transaction records (status, timestamps, statements)
/txn_idx/pend/{txnID}Pending transaction index
/txn_idx/seq/{seqNum}/{txnID}Sequence-ordered index (gap detection)
/intent/{table}/{intentKey}Write intents
/intent_txn/{txnID}/{table}/{intentKey}Intent index by transaction
/cdc/{txnID}/{seq}CDC intent entries
/repl/{peerID}/{dbName}Replication state per peer
/schema/{dbName}Schema version counter
/seq/{nodeID}Sequence generator (pre-allocated batches)

Transaction Record Format

type TransactionRecord struct {
    TxnID                uint64
    NodeID               uint64
    SeqNum               uint64      // For gap-free replication
    Status               TxnStatus   // PENDING/COMMITTED/ABORTED
    StartTSWall          int64       // HLC wall time
    StartTSLogical       int32       // HLC logical
    CommitTSWall         int64
    CommitTSLogical      int32
    LastHeartbeat        int64       // Stale detection
    DatabaseName         string
}

Group Commit Optimization

Batch writes to Pebble: 100 ops/batch or 2ms timeout, 1000 buffer size.

Example: MetaStore Keys After Our UPDATE

After Node 1's UPDATE users SET balance = 75 WHERE id = 1 completes:

During PREPARE phase (all 3 nodes):

/intent/users/users:1 → WriteIntentRecord{TxnID: 0x0001A2B3C4D50001, ...}
/intent_txn/0x0001A2B3C4D50001/users/users:1 → (index entry)
/cdc/0x0001A2B3C4D50001/1 → IntentEntry{Operation: UPDATE, ...}
/txn/0x0001A2B3C4D50001 → TransactionRecord{Status: PENDING, ...}
/txn_idx/pend/0x0001A2B3C4D50001 → (pending index)

After COMMIT phase:

/txn/0x0001A2B3C4D50001 → TransactionRecord{
  TxnID: 0x0001A2B3C4D50001,
  NodeID: 1,
  SeqNum: 42,
  Status: COMMITTED,
  StartTSWall: 1736000000000000000,
  CommitTSWall: 1736000000005000000,
  DatabaseName: "mydb"
}
/txn_idx/seq/42/0x0001A2B3C4D50001 → (sequence index for gap detection)

-- Deleted (cleanup):
/intent/users/users:1 → (removed)
/intent_txn/0x0001A2B3C4D50001/users/users:1 → (removed)
/txn_idx/pend/0x0001A2B3C4D50001 → (removed)

-- CDC entries kept for anti-entropy (GC'd after 2-24hrs):
/cdc/0x0001A2B3C4D50001/1 → (kept for delta sync)

Replication State (per peer):

-- On Node 2, tracking what Node 1 has sent:
/repl/node1/mydb → {LastAppliedTxnID: 0x0001A2B3C4D50001, LastSeqNum: 42}

-- On Node 3:
/repl/node1/mydb → {LastAppliedTxnID: 0x0001A2B3C4D50001, LastSeqNum: 42}

Hybrid Logical Clock (HLC)

Purpose

  • Causality tracking: Timestamps preserve happens-before relationship
  • Transaction IDs: Unique 64-bit IDs derived from HLC
  • Conflict resolution: Last-Write-Wins using HLC comparison

Clock Structure

type Clock struct {
    nodeID   uint64      // 0-63 (6-bit limit)
    wallTime int64       // Nanoseconds
    logical  int32       // 0-65535 (16-bit limit)
    mu       sync.Mutex
}

Transaction ID Bit Layout

|-------- 42 bits --------|-- 6 bits --|-- 16 bits --|
|     Physical (ms)       |  Node ID   |   Logical   |
  • 42 bits: Milliseconds since epoch (~139 years)
  • 6 bits: Node ID (max 64 nodes)
  • 16 bits: Logical counter (~65k IDs/ms/node)
func (t Timestamp) ToTxnID() uint64 {
    physicalMS := uint64(t.WallTime / 1_000_000)
    return (physicalMS << 22) | (t.NodeID << 16) | uint64(t.Logical)
}

Clock Synchronization

On receiving remote timestamp:

  1. Take max of: local wall, remote wall, physical now
  2. If same wall time: increment logical beyond both local and remote
  3. If new millisecond: reset logical to 0
  4. Overflow protection: Spin-wait if logical exceeds 65535

Example: HLC and TxnID Generation

Our UPDATE transaction on Node 1:

Physical time: 1736000000000 ms since epoch (Jan 5, 2025 ~12:26 UTC)
Node ID: 1
Logical: 1 (first transaction this millisecond)

TxnID calculation:
  Physical bits (42): 1736000000000 << 22 = 0x0001A2B3C4D50000
  NodeID bits (6):    1 << 16              = 0x0000000000010000
  Logical bits (16):  1                    = 0x0000000000000001
  ─────────────────────────────────────────────────────────────
  TxnID:              0x0001A2B3C4D50001

Binary layout:
  |---- 42 bits ----|-- 6 --|-- 16 --|
  |  1736000000000  |   1   |    1   |

Conflict resolution using HLC (Last-Write-Wins):

Scenario: Two updates to same row arrive at Node 3

Update A (from Node 1):
  TxnID: 0x0001A2B3C4D50001
  Wall:  1736000000000ns, Logical: 1

Update B (from Node 2):
  TxnID: 0x0001A2B3C4D60002
  Wall:  1736000000001ns, Logical: 2

Comparison: B.Wall > A.Wall → Update B wins
Result: Node 3 applies Update B (higher timestamp)

Clock advance on message receive:

Node 2 local clock: Wall=1736000000000, Logical=5
Receives PREPARE from Node 1: Wall=1736000000002, Logical=1

Update logic:
  localWall (1736000000000) < remoteWall (1736000000002)
  → Set wall = max(local, remote, physicalNow) = 1736000000002
  → Reset logical = 0 (new millisecond)

Node 2 new clock: Wall=1736000000002, Logical=0

Anti-Entropy Background Sync

Purpose

Repairs divergence caused by:

  • Network partitions healing
  • Node restarts with stale data
  • Missed transactions during temporary failures

Sync Strategy Decision

Lag Detection:
  localMaxTxnID vs peerMaxTxnID
  localTxnCount vs peerTxnCount

Decision:
  IF lag < DeltaSyncThreshold (default 10K txns)
     AND timeLag < DeltaSyncThresholdSeconds (default 1hr)
  THEN → Delta Sync (incremental)
  ELSE → Snapshot Transfer (full)

Delta Sync Flow

  1. Query peer's replication state
  2. Stream transactions via StreamChanges RPC
  3. Gap detection: If event.SeqNum - expectedNext > threshold → snapshot fallback
  4. Apply each transaction via REPLAY phase (bypasses 2PC)
  5. Update local replication state

Gap Detection Algorithm

if event.SeqNum > expectedNextTxn {
    gap := event.SeqNum - expectedNextTxn
    if gap > DeltaSyncThresholdTxns {
        // Transactions GC'd - fall back to snapshot
        return ErrGapDetected
    }
}
expectedNextTxn = event.SeqNum + 1

Snapshot Transfer

  1. Get snapshot info: Files, checksums, txn_id
  2. Stream chunks: 4MB each with MD5 verification
  3. Atomic restore: Replace database files
  4. Resume normal replication from snapshot txn_id

Example: Anti-Entropy Scenarios

Using our 3-node cluster with the users table:

Scenario 1: Node 3 Brief Network Partition (Delta Sync)

Timeline:
  T0: All nodes in sync, SeqNum=40
  T1: Node 3 loses network for 5 minutes
  T2: Node 1, Node 2 continue processing:
      - SeqNum 41: UPDATE users SET balance=75 WHERE id=1
      - SeqNum 42: INSERT INTO users VALUES (3, 'carol@...', 'Carol', 200)
      - SeqNum 43: DELETE FROM users WHERE id=2
  T3: Node 3 reconnects

Anti-Entropy on Node 3:
  1. Query Node 1: "What's your max SeqNum?" → 43
  2. Local state: LastSeqNum=40
  3. Lag = 43 - 40 = 3 transactions (< 10K threshold)
  4. Decision: Delta Sync

Delta Sync Flow:
  Node 3 → Node 1: StreamChanges(fromSeqNum=41)
  Node 1 streams:
    {SeqNum: 41, TxnID: 0x...0001, CDC: [UPDATE users...]}
    {SeqNum: 42, TxnID: 0x...0002, CDC: [INSERT users...]}
    {SeqNum: 43, TxnID: 0x...0003, CDC: [DELETE users...]}

  Node 3 applies each via REPLAY (bypasses 2PC):
    - Execute CDC for SeqNum 41 → balance=75 for id=1
    - Execute CDC for SeqNum 42 → insert id=3
    - Execute CDC for SeqNum 43 → delete id=2

  Node 3 now in sync: SeqNum=43

Scenario 2: New Node Joining (Snapshot Transfer)

Initial state:
  Node 1: SeqNum=50000, users table has 10K rows
  Node 2: SeqNum=50000
  Node 4: New node, empty database

Node 4 joins cluster:
  1. Gossip announces Node 4 as JOINING
  2. Anti-entropy runs on Node 4
  3. Query Node 1: "What's your max SeqNum?" → 50000
  4. Local state: SeqNum=0
  5. Lag = 50000 (> 10K threshold)
  6. Decision: Snapshot Transfer

Snapshot Transfer Flow:
  Node 4 → Node 1: GetSnapshotInfo(db="mydb")
  Node 1 returns:
    {
      Files: ["mydb.db", "mydb.db-wal"],
      TotalSize: 50MB,
      Checksums: {...},
      AtTxnID: 0x...C350,
      AtSeqNum: 50000
    }

  Node 4 → Node 1: StreamSnapshot(db="mydb")
  Node 1 streams 4MB chunks:
    Chunk 1: bytes[0..4MB], MD5=abc123
    Chunk 2: bytes[4MB..8MB], MD5=def456
    ...
    Chunk 13: bytes[48MB..50MB], MD5=xyz789

  Node 4 assembles and verifies:
    - Write chunks to temp file
    - Verify each chunk MD5
    - Atomic rename to mydb.db
    - Set replication state: SeqNum=50000

  Node 4 state: JOINING → ALIVE

Scenario 3: Gap Detection (Partial GC)

Timeline:
  T0: Node 3 offline for 25 hours
  T1: GC runs on Node 1, Node 2:
      - Transactions SeqNum 1-1000 are > 24hrs old
      - Force GC deletes them (gc_max_retention=24hr)
  T2: Node 3 reconnects, has SeqNum=500

Delta Sync Attempt:
  Node 3 → Node 1: StreamChanges(fromSeqNum=501)
  Node 1 scans /txn_idx/seq/:
    - First available: SeqNum=1001
    - Requested: SeqNum=501
    - Gap detected: 501-1000 are missing (GC'd)

  Node 1 returns: ErrGapDetected

  Node 3 falls back to Snapshot Transfer

Gossip Protocol (SWIM)

Node States

StateDescriptionReplication
ALIVEHealthy, respondingReceives writes
SUSPECTMay be dead, awaiting confirmationReceives writes
DEADConfirmed failedExcluded
JOININGCatching upGossip only
REMOVEDAdmin removedExcluded

State Transitions

New node → JOINING → (catch-up complete) → ALIVE
ALIVE → (no response for SuspectTimeout) → SUSPECT
SUSPECT → (no response for DeadTimeout) → DEAD
SUSPECT → (received message) → ALIVE

SWIM Protocol

  • Gossip interval: 1 second
  • Fanout: 3 random peers per round
  • Suspect timeout: 15 seconds
  • Dead timeout: 30 seconds

Incarnation Numbers

Higher incarnation always wins (prevents stale state propagation):

if incoming.Incarnation > local.Incarnation {
    updateLocalState(incoming)
} else if incoming.Incarnation == local.Incarnation {
    // Only allow escalation: ALIVE → SUSPECT → DEAD
}

Example: Gossip and Failure Detection

Scenario: Node 2 crashes during our UPDATE transaction

Initial cluster state (all nodes ALIVE):
  Node 1: {ID: 1, State: ALIVE, Incarnation: 5, Addr: "10.0.0.1:5000"}
  Node 2: {ID: 2, State: ALIVE, Incarnation: 3, Addr: "10.0.0.2:5000"}
  Node 3: {ID: 3, State: ALIVE, Incarnation: 7, Addr: "10.0.0.3:5000"}

T0: Node 2 crashes (process killed)

T1 (+1s): Gossip round on Node 1
  - Node 1 randomly selects Node 2 for ping
  - gRPC Ping("10.0.0.2:5000") → timeout (5s)
  - No response, try indirect probe via Node 3

T2 (+6s): Indirect probe
  - Node 1 → Node 3: "Please ping Node 2 for me"
  - Node 3 → Node 2: Ping → timeout
  - Node 3 → Node 1: "Node 2 didn't respond"

T3 (+7s): Node 1 marks Node 2 as SUSPECT
  Gossip message: {NodeID: 2, State: SUSPECT, Incarnation: 3}

T4 (+8s): Gossip spreads to Node 3
  - Node 3 receives SUSPECT for Node 2
  - Updates local state: Node 2 = SUSPECT

T5 (+22s): Suspect timeout (15s) expires
  - No refutation from Node 2
  - Node 1 marks Node 2 as DEAD
  Gossip message: {NodeID: 2, State: DEAD, Incarnation: 3}

Effect on replication:
  - Quorum calculation now excludes Node 2
  - Total membership still = 3 (for split-brain safety)
  - Required quorum = 2 (floor(3/2)+1)
  - Node 1 + Node 3 can still achieve quorum

Scenario: False positive (Node 2 was just slow)

T0: Node 2 under heavy load (GC pause)

T1 (+5s): Node 1 marks Node 2 as SUSPECT
  Gossip: {NodeID: 2, State: SUSPECT, Incarnation: 3}

T2 (+7s): Node 2 GC pause ends, receives SUSPECT message about itself
  - "I'm being suspected? I'm alive!"
  - Increments incarnation: 3 → 4
  - Broadcasts refutation: {NodeID: 2, State: ALIVE, Incarnation: 4}

T3 (+8s): Node 1 receives refutation
  - Incoming incarnation (4) > local (3)
  - Updates: Node 2 = ALIVE, Incarnation = 4
  - Node 2 stays in cluster

Quorum unaffected: Node 2 was never marked DEAD

DDL Replication

Cluster-Wide Locking

type DDLLock struct {
    Database   string
    NodeID     uint64
    TxnID      uint64
    ExpiresAt  time.Time  // 30-second lease
}
  • One DDL per database at a time
  • Lock expires automatically (handles node crashes)
  • Different databases can have concurrent DDL

Idempotent Rewriting

DDL automatically rewritten for safe replay:

OriginalRewritten
CREATE TABLE fooCREATE TABLE IF NOT EXISTS foo
DROP TABLE fooDROP TABLE IF EXISTS foo
CREATE INDEX idxCREATE INDEX IF NOT EXISTS idx

Schema Version Tracking

Each database maintains a version counter:

  • Incremented on every DDL
  • Exchanged via gossip
  • Delta sync validates schema version before applying DML

Example: DDL Replication for ALTER TABLE

Adding a column to our users table:

-- Node 1 executes:
ALTER TABLE users ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
T0: Node 1 acquires DDL lock
  Key: /ddl_lock/mydb
  Value: {NodeID: 1, TxnID: 0x...0001, ExpiresAt: now()+30s}

T1: Node 1 broadcasts DDL lock acquisition via gossip
  All nodes pause DML on "mydb" database

T2: Node 1 executes locally
  - ALTER TABLE users ADD COLUMN ...
  - Increment schema version: /schema/mydb → 2

T3: Node 1 broadcasts DDL to replicas (not CDC, raw SQL)
  gRPC ReplicateDDL:
  {
    Database: "mydb",
    Statement: "ALTER TABLE IF NOT EXISTS users ADD COLUMN IF NOT EXISTS
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
    SchemaVersion: 2,
    TxnID: 0x...0001
  }

  Note: Statement rewritten for idempotency

T4: Node 2, Node 3 execute DDL
  - Parse and execute SQL directly
  - Update local schema version: 2
  - Return success

T5: Node 1 releases DDL lock
  - Delete /ddl_lock/mydb
  - DML resumes on all nodes

T6: Subsequent DML includes new column
  INSERT INTO users (id, email, name, balance)
  VALUES (4, 'dave@...', 'Dave', 100);

  CDC captures:
  {
    NewValues: {
      "id": 4,
      "email": "dave@example.com",
      "name": "Dave",
      "balance": 100,
      "created_at": "2025-01-05 12:30:00"  ← New column included
    }
  }

Concurrent DDL conflict:

T0: Node 1 tries ALTER TABLE users ADD COLUMN age INT
T0: Node 2 tries ALTER TABLE users ADD COLUMN phone TEXT

Node 1:
  - Acquires /ddl_lock/mydb (wins)
  - Proceeds with DDL

Node 2:
  - Tries to acquire /ddl_lock/mydb
  - Lock exists, owned by Node 1
  - Check expiry: 25 seconds remaining
  - Return error: "DDL lock held by Node 1"
  - Client retries after Node 1 completes

Recovery Scenarios

Network Partition

ScenarioBehavior
Minority partitionWrites fail (cannot achieve quorum)
Majority partitionWrites succeed
Partition healsDelta sync + LWW merges data

Node Failure & Recovery

ScenarioRecovery Method
Brief outage (<1hr)Delta sync from peers
Extended outageSnapshot transfer
New node joiningFull snapshot + delta sync

Transaction Failure

Failure PointRecovery
PREPARE timeoutAbort all nodes
Partial PREPARE successAbort if quorum not achieved
COMMIT after remote quorumLocal must succeed (prepared)
Node crash during PREPAREHeartbeat timeout → intents cleaned
Node crash during COMMITAnti-entropy replays from peers

Example: Recovery Scenarios for Our UPDATE

Scenario 1: Network Partition During PREPARE

Initial: UPDATE users SET balance = 75 WHERE id = 1 on Node 1

T0: Node 1 executes locally, creates intent
T1: Network partition isolates Node 1 from Node 2, Node 3

  Node 1 (minority partition):
    - Sends PREPARE to Node 2, Node 3
    - Both timeout after 5s
    - Required quorum: 2, achieved: 1 (self)
    - Decision: ABORT

  Node 1 rollback:
    - Delete local intent: /intent/users/users:1
    - Update txn record: Status=ABORTED
    - Return error to client: "quorum not achieved"

  Client retries after partition heals.

Scenario 2: Coordinator Crash After PREPARE, Before COMMIT

T0: Node 1 PREPARE succeeds on all nodes
    All nodes have intent for "users:1"

T1: Node 1 crashes before sending COMMIT

T2 (+10s): Heartbeat timeout on Node 2
    - GC scans /txn_idx/pend/
    - Finds TxnID 0x...0001 with no recent heartbeat
    - Checks txn status: PENDING
    - Marks intents for cleanup

T3: Cleanup on Node 2, Node 3
    - Delete intent: /intent/users/users:1
    - Update txn: Status=ABORTED
    - Remove from Cuckoo filter

Result: Transaction rolled back, row "users:1" unchanged
        Client receives timeout, can retry

Scenario 3: Replica Crash During COMMIT

T0: Node 1 PREPARE succeeds (quorum achieved)
T1: Node 1 sends COMMIT to Node 2, Node 3
T2: Node 3 crashes while applying CDC

State:
  Node 1: COMMITTED (coordinator)
  Node 2: COMMITTED
  Node 3: CRASHED (didn't complete commit)

T3: Node 3 restarts

T4: Anti-entropy on Node 3
  - Local SeqNum = 40 (before crash)
  - Query Node 1: SeqNum = 41
  - Delta sync: get SeqNum 41 (our UPDATE)
  - Apply CDC via REPLAY
  - Node 3 now has balance=75 for id=1

Result: All nodes converged, transaction successful

Scenario 4: Partial PREPARE Success

T0: Node 1 sends PREPARE to Node 2, Node 3

T1: Responses:
  Node 2: SUCCESS (intent created)
  Node 3: CONFLICT (another transaction holds intent)

T2: Quorum check
  Required: 2 (floor(3/2)+1)
  SUCCESS responses: 1 (Node 2) + 1 (self) = 2
  But Node 3 returned CONFLICT

T3: Decision depends on consistency level
  If QUORUM: 2 SUCCESS ≥ 2 required → can proceed
  If ALL: 3 required, only 2 → must ABORT

  With QUORUM consistency:
    - Send COMMIT to Node 2 (achieved remote quorum with just Node 2)
    - Commit locally
    - Node 3 will get update via anti-entropy later

  With ALL consistency:
    - Send ABORT to Node 2
    - Rollback locally
    - Return CONFLICT to client

Garbage Collection

Two-Phase GC

Phase 1 (Critical, always runs):

  • Abort transactions without heartbeat for HeartbeatTimeout (10s)
  • Clean orphaned write intents

Phase 2 (Background, skipped under load):

  • Delete COMMITTED/ABORTED records older than GCMinRetention (2hr)
  • Force delete after GCMaxRetention (24hr) regardless of peer state

GC Coordination

  • Track per-peer replication state
  • Query minimum applied txn_id across peers
  • Only GC transactions all peers have applied
  • Gap detection prevents data loss if GC runs while nodes offline

Example: Garbage Collection Timeline

Continuing our UPDATE scenario over time:

T0: UPDATE users SET balance = 75 WHERE id = 1 committed
    TxnID: 0x0001A2B3C4D50001, SeqNum: 42
    All nodes: Status=COMMITTED

T0 + 5s: Phase 1 GC runs
    - Scan /txn_idx/pend/ for stale transactions
    - Our transaction is COMMITTED, not pending
    - No action needed

T0 + 1hr: Phase 2 GC considers deletion
    - Transaction age: 1 hour (< gc_min_retention=2hr)
    - Skipped: too recent

T0 + 2.5hr: Phase 2 GC runs again
    - Transaction age: 2.5 hours (> gc_min_retention=2hr)
    - Check peer replication state:
      /repl/node2/mydb → LastSeqNum: 42 ✓
      /repl/node3/mydb → LastSeqNum: 42 ✓
    - All peers have applied SeqNum 42
    - Safe to delete

    Cleanup:
      DELETE /txn/0x0001A2B3C4D50001
      DELETE /txn_idx/seq/42/0x0001A2B3C4D50001
      DELETE /cdc/0x0001A2B3C4D50001/1

T0 + 2.5hr: MetaStore state after GC
    - Transaction record: GONE
    - CDC entries: GONE
    - SQLite data: balance=75 for id=1 (permanent)

GC blocked by lagging peer:

T0: Transactions SeqNum 40-50 committed

T1: Node 3 goes offline

T0 + 3hr: Phase 2 GC on Node 1
    - Check peer states:
      /repl/node2/mydb → LastSeqNum: 50 ✓
      /repl/node3/mydb → LastSeqNum: 39 ✗ (offline, stale)
    - Minimum across peers: 39
    - Cannot GC SeqNum 40-50 (Node 3 hasn't applied them)
    - Skip cleanup to preserve delta sync capability

T0 + 25hr: Force GC kicks in (> gc_max_retention=24hr)
    - Transaction SeqNum 40-50 are > 24 hours old
    - Force delete regardless of peer state
    - Node 3 will need snapshot transfer when it returns

Performance Optimizations

OptimizationMechanism
Cuckoo filterO(1) conflict detection, FP rate ~2.3×10⁻¹⁰
Sharded mutexes256 shards reduce lock contention
Group commitBatch Pebble writes (100 ops or 2ms)
Sequence pre-allocationBatch 1000 sequence numbers
Object poolingReuse allocations in hot paths
Early quorum exitDon't wait for slow nodes
Parallel replicationGoroutines for all replica communication
CDC replayBinary values, no SQL parsing
NoSync cleanupIntent cleanup is idempotent

Limitations

Design Constraints

  1. Single writer per node: SQLite WAL limitation
  2. 64 nodes max: 6-bit NodeID in transaction ID
  3. ~65k txns/ms/node: 16-bit logical counter
  4. Full replication only: No sharding/partitioning
  5. Eventual consistency: No strict serializability across nodes

Not Supported

  • Distributed deadlock detection (client retries)
  • Read repair (anti-entropy only)
  • Cascading rollback (best-effort abort)
  • XA transactions (uses own 2PC)
  • Table-level selective replication

Complete End-to-End Example

This section traces our UPDATE users SET balance = 75 WHERE id = 1 through the entire system lifecycle:

┌─────────────────────────────────────────────────────────────────────────────┐
│  COMPLETE TRANSACTION LIFECYCLE                                              │
│  UPDATE users SET balance = 75 WHERE id = 1                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  T0: CLIENT REQUEST                                                          │
│  ══════════════════                                                          │
│  MySQL client → Node 1 (port 3306)                                          │
│  Query: UPDATE users SET balance = 75 WHERE id = 1                          │
│                                                                              │
│  T1: LOCAL EXECUTION + CDC CAPTURE                                           │
│  ═══════════════════════════════                                            │
│  Node 1 hookDB executes UPDATE                                              │
│    ↓                                                                        │
│  Preupdate hook fires (SQLITE_UPDATE)                                       │
│    ↓                                                                        │
│  CDC Entry created:                                                         │
│    Key: /cdc/0x0001A2B3C4D50001/1                                           │
│    Value: {Op: UPDATE, Table: "users", IntentKey: "users:1",                │
│            OldValues: {balance: 100}, NewValues: {balance: 75}}             │
│                                                                              │
│  T2: WRITE INTENT CREATION                                                   │
│  ══════════════════════════                                                 │
│  Cuckoo filter: Add hash("users:1")                                         │
│  Mutex: Lock shard[hash("users:1") % 256]                                   │
│  MetaStore write:                                                           │
│    /intent/users/users:1 → {TxnID: 0x...0001, Status: PENDING}             │
│    /intent_txn/0x...0001/users/users:1 → (index)                           │
│    /txn/0x...0001 → {Status: PENDING, SeqNum: 42}                          │
│                                                                              │
│  T3: 2PC PREPARE PHASE (parallel to Node 2, Node 3)                         │
│  ═══════════════════════════════════════════════════                        │
│                                                                              │
│  Node 1 ──gRPC──→ Node 2: ReplicateTransaction(PREPARE)                     │
│           │                  ↓                                               │
│           │                Cuckoo check "users:1" → MISS                    │
│           │                Create intent, store CDC                          │
│           │                  ↓                                               │
│           └──────────────← SUCCESS                                           │
│                                                                              │
│  Node 1 ──gRPC──→ Node 3: ReplicateTransaction(PREPARE)                     │
│           │                  ↓                                               │
│           │                Cuckoo check "users:1" → MISS                    │
│           │                Create intent, store CDC                          │
│           │                  ↓                                               │
│           └──────────────← SUCCESS                                           │
│                                                                              │
│  Quorum: 3/3 SUCCESS (need 2 for 3-node cluster) ✓                          │
│                                                                              │
│  T4: 2PC COMMIT PHASE                                                        │
│  ═══════════════════                                                        │
│                                                                              │
│  Node 1 ──gRPC──→ Node 2: ReplicateTransaction(COMMIT)                      │
│           │                  ↓                                               │
│           │                Read /cdc/0x...0001/1                            │
│           │                Generate: UPDATE users SET ... WHERE id=1        │
│           │                Execute on SQLite                                 │
│           │                Update /txn/0x...0001 → COMMITTED                │
│           │                  ↓                                               │
│           └──────────────← CommitTS                                          │
│                                                                              │
│  Node 1 ──gRPC──→ Node 3: ReplicateTransaction(COMMIT)                      │
│           │                  ↓                                               │
│           │                (same as Node 2)                                  │
│           │                  ↓                                               │
│           └──────────────← CommitTS                                          │
│                                                                              │
│  Remote quorum achieved (2/2) ✓                                              │
│                                                                              │
│  T5: COORDINATOR LOCAL COMMIT                                                │
│  ═════════════════════════════                                              │
│  Node 1:                                                                     │
│    - Apply CDC to own SQLite                                                │
│    - Update /txn/0x...0001 → COMMITTED                                      │
│    - Schedule intent cleanup (background)                                   │
│                                                                              │
│  T6: RESPONSE TO CLIENT                                                      │
│  ═══════════════════════                                                    │
│  Node 1 → MySQL client: OK, 1 row affected                                  │
│                                                                              │
│  T7: BACKGROUND CLEANUP (async)                                              │
│  ═════════════════════════════                                              │
│  All nodes:                                                                  │
│    - Delete /intent/users/users:1                                           │
│    - Delete /intent_txn/0x...0001/users/users:1                             │
│    - Remove from Cuckoo filter                                              │
│    - Move /txn_idx/pend/0x...0001 → /txn_idx/seq/42/0x...0001              │
│                                                                              │
│  T8: GOSSIP PROPAGATION                                                      │
│  ═══════════════════════                                                    │
│  Node 1 gossips to random peers:                                            │
│    {Type: TXN_COMMITTED, TxnID: 0x...0001, SeqNum: 42}                      │
│  Replication state updated:                                                 │
│    /repl/node1/mydb → {LastSeqNum: 42}                                      │
│                                                                              │
│  T9: FINAL STATE (all nodes)                                                 │
│  ════════════════════════════                                               │
│  SQLite: users.balance = 75 WHERE id = 1                                    │
│  MetaStore:                                                                 │
│    /txn/0x...0001 → {Status: COMMITTED, SeqNum: 42}                        │
│    /cdc/0x...0001/1 → (kept for anti-entropy)                              │
│  Cuckoo filter: "users:1" removed                                           │
│                                                                              │
│  T10: GC (after 2+ hours)                                                    │
│  ═════════════════════════                                                  │
│  Delete transaction metadata, keep only SQLite data                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Configuration Reference

[replication]
default_write_consistency = "QUORUM"
default_read_consistency = "LOCAL_ONE"
write_timeout_ms = 5000
read_timeout_ms = 2000
 
# Anti-Entropy
enable_anti_entropy = true
anti_entropy_interval_seconds = 60
delta_sync_threshold_transactions = 10000
delta_sync_threshold_seconds = 3600
 
# Garbage Collection
gc_min_retention_hours = 2
gc_max_retention_hours = 24
 
[transaction]
heartbeat_timeout_seconds = 10
conflict_window_seconds = 10
lock_wait_timeout_seconds = 50
 
[ddl]
lock_lease_seconds = 30
enable_idempotent = true
 
[cluster]
gossip_interval_ms = 1000
gossip_fanout = 3
suspect_timeout_ms = 5000
dead_timeout_ms = 10000

Key Files Reference

FilePurpose
coordinator/write_coordinator.go2PC protocol implementation
coordinator/handler.goMySQL query routing, CDC extraction
db/meta_store_pebble.goPebbleDB storage, write intents
db/preupdate_hook.goCDC capture via SQLite hooks
db/intent_filter.goCuckoo filter for conflict detection
db/transaction.goTransaction lifecycle, GC
grpc/replication_handler.goRemote 2PC phase handling
grpc/anti_entropy.goBackground sync coordination
grpc/delta_sync.goIncremental catch-up
grpc/gossip.goSWIM protocol, membership
hlc/clock.goHybrid Logical Clock
publisher/CDC publishing system

Next Steps

  • Integrations - CDC publishing to Kafka, NATS, and external systems