Replication Architecture
Marmot v2 implements a Percolator-style Two-Phase Commit (2PC) protocol for distributed SQLite replication. This document provides comprehensive technical details on the architecture, data flows, and edge case handling.
Design Philosophy
- CDC-first: Row-level Change Data Capture, not SQL statement replay
- Leaderless: Any node can coordinate writes (no master election)
- Quorum-based: Configurable consistency levels (ONE, QUORUM, ALL)
- Eventually consistent: Anti-entropy background sync repairs divergence
- Full replication: All nodes receive all data (no sharding)
Running Example: Users Table
Throughout this document, we'll use a consistent example to trace data through all stages:
-- Schema on all 3 nodes (Node 1, Node 2, Node 3)
CREATE TABLE users (
id INTEGER PRIMARY KEY,
email TEXT UNIQUE,
name TEXT,
balance INTEGER DEFAULT 0
);
-- Initial data (already replicated)
-- id=1: alice@example.com, Alice, balance=100
-- id=2: bob@example.com, Bob, balance=50Scenario: Node 1 executes UPDATE users SET balance = 75 WHERE id = 1
We'll trace this UPDATE through CDC capture, 2PC protocol, conflict detection, and recovery scenarios.
Two-Phase Commit Protocol
Marmot's 2PC differs from traditional database 2PC by using write intents for conflict detection and CDC replay for deterministic replication.
Phase 1: PREPARE
- Coordinator executes locally with preupdate hooks capturing CDC data
- Write intents created for each affected row (distributed locks)
- CDC entries stored in MetaStore (msgpack-serialized row data)
- Broadcast to replicas with CDC data (not SQL)
- Replicas create write intents and store CDC entries
- Return SUCCESS or CONFLICT to coordinator
Coordinator Replica Nodes
========== =============
Execute with CDC hooks
↓
Create write intents
↓
ReplicateTransaction(PREPARE) → Create write intents
Store CDC entries
← Return: Success/ConflictPhase 2: COMMIT
- Wait for quorum of PREPARE responses
- Send COMMIT to remote nodes first (not local)
- Wait for remote quorum acknowledgment
- Commit locally only after remote quorum achieved
- Cleanup: Delete write intents and CDC entries
Coordinator Replica Nodes
========== =============
Remote quorum PREPARE achieved ✓
↓
ReplicateTransaction(COMMIT) → Apply CDC entries to SQLite
Mark transaction COMMITTED
← Return: CommitTS
Remote quorum COMMIT achieved ✓
↓
Commit locally
↓
Cleanup intentsExample: 2PC for UPDATE users SET balance = 75 WHERE id = 1
Step 1: Node 1 (Coordinator) executes locally
Time: T1 (TxnID = 0x0001A2B3C4D50001)
Physical: 1736000000000ms | NodeID: 1 | Logical: 1
Node 1 executes UPDATE on hookDB connection
→ Preupdate hook fires: SQLITE_UPDATE on "users"
→ Captures: oldValues={balance: 100}, newValues={balance: 75}Step 2: Coordinator creates write intent
MetaStore Write on Node 1:
Key: /intent/users/users:1
Value: {
TxnID: 0x0001A2B3C4D50001,
NodeID: 1,
StartTS: {Wall: 1736000000000000000, Logical: 1},
IntentType: UPDATE,
MarkedForCleanup: false
}Step 3: Broadcast PREPARE to Node 2 and Node 3
gRPC ReplicateTransaction (parallel to both):
{
Phase: PREPARE,
TxnID: 0x0001A2B3C4D50001,
NodeID: 1,
Database: "mydb",
CDCEntries: [{
Seq: 1,
Operation: UPDATE,
Table: "users",
IntentKey: "users:1",
OldValues: {"id": [0x01], "email": [...], "name": [...], "balance": [0x64]}, // 100
NewValues: {"id": [0x01], "email": [...], "name": [...], "balance": [0x4B]} // 75
}]
}Step 4: Node 2 and Node 3 process PREPARE
Node 2:
1. Check Cuckoo filter for "users:1" → MISS (no conflict)
2. Create write intent: /intent/users/users:1
3. Store CDC entry: /cdc/0x0001A2B3C4D50001/1
4. Return: {Status: SUCCESS, PrepareTS: ...}
Node 3:
1. Check Cuckoo filter for "users:1" → MISS (no conflict)
2. Create write intent: /intent/users/users:1
3. Store CDC entry: /cdc/0x0001A2B3C4D50001/1
4. Return: {Status: SUCCESS, PrepareTS: ...}Step 5: Quorum achieved (2/3), send COMMIT
Node 1 received 2 SUCCESS responses (quorum = 2 for 3-node cluster)
gRPC ReplicateTransaction to Node 2, Node 3:
{
Phase: COMMIT,
TxnID: 0x0001A2B3C4D50001,
CommitTS: {Wall: 1736000000005000000, Logical: 0}
}Step 6: Replicas apply CDC and commit
Node 2:
1. Read CDC entry from /cdc/0x0001A2B3C4D50001/1
2. Generate SQL: UPDATE users SET id=1, email='alice@example.com',
name='Alice', balance=75 WHERE id=1
3. Execute on SQLite
4. Update txn record: Status=COMMITTED
5. Delete write intent (background)
6. Return: {CommitTS: ...}
Node 3: (same process)Step 7: Coordinator commits locally
Node 1:
1. Remote quorum (2/3) committed ✓
2. Apply own CDC entry to SQLite
3. Update txn record: Status=COMMITTED
4. Cleanup write intents
5. Return success to clientWhy Coordinator Commits Last
If remote quorum fails after local commit, the coordinator would be in an inconsistent state. By committing remotely first, abort is always safe.
Quorum Calculation
Critical: Quorum uses total cluster membership, not just alive nodes.
Quorum = floor(TotalMembership / 2) + 1| Cluster Size | Quorum | Survives |
|---|---|---|
| 3 nodes | 2 | 1 failure |
| 5 nodes | 3 | 2 failures |
| 7 nodes | 4 | 3 failures |
Split-brain prevention: In a 6-node cluster split 3x3:
- Using alive nodes: Each partition sees N=3, quorum=2 → both write (dangerous)
- Using total membership: N=6, quorum=4 → neither writes (safe)
CDC (Change Data Capture) Architecture
Why CDC Instead of SQL Replay
| SQL Replay | CDC Replay |
|---|---|
| Non-deterministic (UUID, RANDOM, NOW) | Exact byte-for-byte values |
| Parsing differences between dialects | Binary msgpack encoding |
| Auto-increment sequence conflicts | Coordinator assigns distributed IDs |
| Slower (parse + execute) | Faster (direct row application) |
CDC Capture Flow
Row changes are captured via SQLite's preupdate hook API:
User Query (INSERT/UPDATE/DELETE)
↓
Execute on dedicated hookDB connection
↓
Preupdate hook fires BEFORE each row modification
↓
Extract: table, intentKey, oldValues, newValues
↓
Serialize to msgpack (type-preserving)
↓
Store in MetaStore: /cdc/{txnID}/{seq}
↓
Release hookDB before 2PC broadcastCDC Entry Format
type IntentEntry struct {
TxnID uint64 // Transaction ID
Seq uint64 // Sequence within transaction
Operation uint8 // INSERT=0, REPLACE=1, UPDATE=2, DELETE=3
Table string // Table name
IntentKey string // Serialized primary key
OldValues map[string][]byte // Before image (msgpack per column)
NewValues map[string][]byte // After image (msgpack per column)
}Intent Key Format
Single PK: "table:value" or "table:b64:base64_value"
Composite PK: "table:c:b64_col1:b64_col2:..." (columns sorted alphabetically)
NULL handling: "table:\x00NULL\x00" sentinel value
CDC Application on Replicas
| Operation | SQL Generated |
|---|---|
| INSERT | INSERT OR REPLACE INTO table (...) VALUES (...) |
| UPDATE | UPDATE table SET ... WHERE pk=... using OldValues |
| DELETE | DELETE FROM table WHERE pk=... using OldValues |
Example: CDC Entry for Our UPDATE
When Node 1 executes UPDATE users SET balance = 75 WHERE id = 1:
Raw CDC Entry (msgpack-serialized):
Key: /cdc/0x0001A2B3C4D50001/1
Value (IntentEntry):
{
TxnID: 0x0001A2B3C4D50001,
Seq: 1,
Operation: 2, // UPDATE
Table: "users",
IntentKey: "users:1", // Single PK format
OldValues: {
"id": 0x91 0x01, // msgpack fixint: 1
"email": 0xB1 "alice@example.com", // msgpack fixstr
"name": 0xA5 "Alice", // msgpack fixstr
"balance": 0xCC 0x64 // msgpack uint8: 100
},
NewValues: {
"id": 0x91 0x01, // msgpack fixint: 1
"email": 0xB1 "alice@example.com",
"name": 0xA5 "Alice",
"balance": 0xCC 0x4B // msgpack uint8: 75
}
}Generated SQL on Replicas:
-- Replicas execute this (generated from CDC, not original SQL):
UPDATE users SET
id = 1,
email = 'alice@example.com',
name = 'Alice',
balance = 75
WHERE id = 1Example: CDC for INSERT and DELETE
INSERT INSERT INTO users (id, email, name, balance) VALUES (3, 'carol@example.com', 'Carol', 200):
CDC Entry:
{
Operation: 0, // INSERT
Table: "users",
IntentKey: "users:3",
OldValues: {}, // Empty for INSERT
NewValues: {
"id": 3, "email": "carol@example.com", "name": "Carol", "balance": 200
}
}
Generated SQL: INSERT OR REPLACE INTO users (id, email, name, balance)
VALUES (3, 'carol@example.com', 'Carol', 200)DELETE DELETE FROM users WHERE id = 2:
CDC Entry:
{
Operation: 3, // DELETE
Table: "users",
IntentKey: "users:2",
OldValues: {
"id": 2, "email": "bob@example.com", "name": "Bob", "balance": 50
},
NewValues: {} // Empty for DELETE
}
Generated SQL: DELETE FROM users WHERE id = 2Example: Composite Primary Key
CREATE TABLE order_items (
order_id INTEGER,
item_id INTEGER,
quantity INTEGER,
PRIMARY KEY (order_id, item_id)
);
UPDATE order_items SET quantity = 5 WHERE order_id = 100 AND item_id = 42;IntentKey Format: "order_items:c:MTAw:NDI="
^table ^composite marker
^base64(order_id=100):base64(item_id=42)
(columns sorted alphabetically: item_id, order_id)Write Intents & Conflict Detection
Write Intent Structure
Write intents are distributed locks stored in MetaStore:
Key: /intent/{tableName}/{intentKey}
Value: WriteIntentRecord {
TxnID, NodeID, StartTS, IntentType,
Statement, DataSnapshot, MarkedForCleanup
}
Index: /intent_txn/{txnID}/{tableName}/{intentKey} // For cleanupCuckoo Filter Fast Path
A Cuckoo filter provides O(1) conflict detection:
| Parameter | Value |
|---|---|
| Buckets | 250,000 |
| Bucket size | 4 entries |
| Fingerprint | 32-bit |
| Capacity | ~1 million |
| False positive rate | ~2.3×10⁻¹⁰ |
Conflict check flow:
WriteIntent(table, intentKey)
↓
hash = XXH64(table + ":" + intentKey)
↓
filter.Check(hash)
│
├─ MISS → Fast path: write directly to Pebble
│
└─ HIT → Slow path: lookup actual intent in Pebble
├─ No intent found → False positive, proceed
├─ Same txnID → Update in-place
└─ Different txnID → Check if overwritable:
├─ MarkedForCleanup=true → Overwrite
├─ Status=COMMITTED/ABORTED → Overwrite
├─ Heartbeat timeout → Overwrite (stale)
└─ Active transaction → Return CONFLICTSharded Mutexes
256 sharded mutexes prevent TOCTOU races:
func intentLockFor(table, intentKey string) *sync.Mutex {
return &intentLocks[xxhash.Sum64String(table+":"+intentKey) % 256]
}Example: Conflict Detection Scenarios
Using our users table, here are concrete conflict scenarios:
Scenario 1: No Conflict (Different Rows)
Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
→ Creates intent for "users:1"
Time T2: Node 2 starts UPDATE users SET balance = 30 WHERE id = 2
→ Cuckoo filter check for "users:2" → MISS
→ Creates intent for "users:2"
→ Both transactions succeed (different rows)Scenario 2: Conflict (Same Row, Different Nodes)
Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
TxnID: 0x0001A2B3C4D50001
→ Creates intent: /intent/users/users:1 → {TxnID: 0x0001A2B3C4D50001}
→ Adds to Cuckoo filter: hash("users:1")
Time T2: Node 2 starts UPDATE users SET balance = 50 WHERE id = 1
TxnID: 0x0001A2B3C4D60001
→ Cuckoo filter check for "users:1" → HIT
→ Lookup Pebble: /intent/users/users:1
→ Found intent with different TxnID (0x0001A2B3C4D50001)
→ Check if overwritable:
- MarkedForCleanup = false
- Status = PENDING (not COMMITTED/ABORTED)
- LastHeartbeat = recent (not stale)
→ Return CONFLICT to Node 2
→ Node 2 aborts and retries after backoffScenario 3: Stale Intent (Coordinator Crashed)
Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
→ Creates intent: /intent/users/users:1
→ Node 1 crashes before COMMIT
Time T2: (10 seconds later, heartbeat timeout)
GC on Node 2 scans pending transactions
→ Finds intent with LastHeartbeat > 10s ago
→ Marks intent for cleanup
Time T3: Node 3 starts UPDATE users SET balance = 50 WHERE id = 1
→ Cuckoo filter check → HIT
→ Lookup Pebble → Found intent
→ Check: MarkedForCleanup = true
→ Overwrite allowed → Creates new intent
→ Transaction succeedsScenario 4: Same Transaction Re-entry
Time T1: Node 1 PREPARE phase creates intent for "users:1"
TxnID: 0x0001A2B3C4D50001
Time T2: Network hiccup, Node 1 retries PREPARE
→ Cuckoo filter check → HIT
→ Lookup Pebble → Found intent with SAME TxnID
→ Update in-place (idempotent)
→ No conflictScenario 5: Multi-Row Transaction Partial Conflict
-- Node 1 executes:
BEGIN;
UPDATE users SET balance = balance - 25 WHERE id = 1;
UPDATE users SET balance = balance + 25 WHERE id = 2;
COMMIT;Time T1: Node 1 creates intents for both rows:
→ /intent/users/users:1 → TxnID: 0x...0001
→ /intent/users/users:2 → TxnID: 0x...0001
Time T2: Node 2 tries UPDATE users SET name = 'Robert' WHERE id = 2
→ Cuckoo filter check for "users:2" → HIT
→ Conflict detected!
→ Node 2 returns CONFLICT
→ Node 2's client retries after Node 1 commitsStorage Layer (MetaStore)
PebbleDB Architecture
Each database has a separate MetaStore (CockroachDB Pebble):
data_dir/
├── mydb.db # SQLite database
├── mydb.db-wal # SQLite WAL
├── mydb_meta.pebble/ # PebbleDB MetaStore
│ ├── MANIFEST-*
│ ├── CURRENT
│ └── *.sst # LSM-tree files
└── __marmot_system.db # System database
└── __marmot_system_meta.pebble/Key Namespace
| Prefix | Purpose |
|---|---|
/txn/{txnID} | Transaction records (status, timestamps, statements) |
/txn_idx/pend/{txnID} | Pending transaction index |
/txn_idx/seq/{seqNum}/{txnID} | Sequence-ordered index (gap detection) |
/intent/{table}/{intentKey} | Write intents |
/intent_txn/{txnID}/{table}/{intentKey} | Intent index by transaction |
/cdc/{txnID}/{seq} | CDC intent entries |
/repl/{peerID}/{dbName} | Replication state per peer |
/schema/{dbName} | Schema version counter |
/seq/{nodeID} | Sequence generator (pre-allocated batches) |
Transaction Record Format
type TransactionRecord struct {
TxnID uint64
NodeID uint64
SeqNum uint64 // For gap-free replication
Status TxnStatus // PENDING/COMMITTED/ABORTED
StartTSWall int64 // HLC wall time
StartTSLogical int32 // HLC logical
CommitTSWall int64
CommitTSLogical int32
LastHeartbeat int64 // Stale detection
DatabaseName string
}Group Commit Optimization
Batch writes to Pebble: 100 ops/batch or 2ms timeout, 1000 buffer size.
Example: MetaStore Keys After Our UPDATE
After Node 1's UPDATE users SET balance = 75 WHERE id = 1 completes:
During PREPARE phase (all 3 nodes):
/intent/users/users:1 → WriteIntentRecord{TxnID: 0x0001A2B3C4D50001, ...}
/intent_txn/0x0001A2B3C4D50001/users/users:1 → (index entry)
/cdc/0x0001A2B3C4D50001/1 → IntentEntry{Operation: UPDATE, ...}
/txn/0x0001A2B3C4D50001 → TransactionRecord{Status: PENDING, ...}
/txn_idx/pend/0x0001A2B3C4D50001 → (pending index)After COMMIT phase:
/txn/0x0001A2B3C4D50001 → TransactionRecord{
TxnID: 0x0001A2B3C4D50001,
NodeID: 1,
SeqNum: 42,
Status: COMMITTED,
StartTSWall: 1736000000000000000,
CommitTSWall: 1736000000005000000,
DatabaseName: "mydb"
}
/txn_idx/seq/42/0x0001A2B3C4D50001 → (sequence index for gap detection)
-- Deleted (cleanup):
/intent/users/users:1 → (removed)
/intent_txn/0x0001A2B3C4D50001/users/users:1 → (removed)
/txn_idx/pend/0x0001A2B3C4D50001 → (removed)
-- CDC entries kept for anti-entropy (GC'd after 2-24hrs):
/cdc/0x0001A2B3C4D50001/1 → (kept for delta sync)Replication State (per peer):
-- On Node 2, tracking what Node 1 has sent:
/repl/node1/mydb → {LastAppliedTxnID: 0x0001A2B3C4D50001, LastSeqNum: 42}
-- On Node 3:
/repl/node1/mydb → {LastAppliedTxnID: 0x0001A2B3C4D50001, LastSeqNum: 42}Hybrid Logical Clock (HLC)
Purpose
- Causality tracking: Timestamps preserve happens-before relationship
- Transaction IDs: Unique 64-bit IDs derived from HLC
- Conflict resolution: Last-Write-Wins using HLC comparison
Clock Structure
type Clock struct {
nodeID uint64 // 0-63 (6-bit limit)
wallTime int64 // Nanoseconds
logical int32 // 0-65535 (16-bit limit)
mu sync.Mutex
}Transaction ID Bit Layout
|-------- 42 bits --------|-- 6 bits --|-- 16 bits --|
| Physical (ms) | Node ID | Logical |- 42 bits: Milliseconds since epoch (~139 years)
- 6 bits: Node ID (max 64 nodes)
- 16 bits: Logical counter (~65k IDs/ms/node)
func (t Timestamp) ToTxnID() uint64 {
physicalMS := uint64(t.WallTime / 1_000_000)
return (physicalMS << 22) | (t.NodeID << 16) | uint64(t.Logical)
}Clock Synchronization
On receiving remote timestamp:
- Take max of: local wall, remote wall, physical now
- If same wall time: increment logical beyond both local and remote
- If new millisecond: reset logical to 0
- Overflow protection: Spin-wait if logical exceeds 65535
Example: HLC and TxnID Generation
Our UPDATE transaction on Node 1:
Physical time: 1736000000000 ms since epoch (Jan 5, 2025 ~12:26 UTC)
Node ID: 1
Logical: 1 (first transaction this millisecond)
TxnID calculation:
Physical bits (42): 1736000000000 << 22 = 0x0001A2B3C4D50000
NodeID bits (6): 1 << 16 = 0x0000000000010000
Logical bits (16): 1 = 0x0000000000000001
─────────────────────────────────────────────────────────────
TxnID: 0x0001A2B3C4D50001
Binary layout:
|---- 42 bits ----|-- 6 --|-- 16 --|
| 1736000000000 | 1 | 1 |Conflict resolution using HLC (Last-Write-Wins):
Scenario: Two updates to same row arrive at Node 3
Update A (from Node 1):
TxnID: 0x0001A2B3C4D50001
Wall: 1736000000000ns, Logical: 1
Update B (from Node 2):
TxnID: 0x0001A2B3C4D60002
Wall: 1736000000001ns, Logical: 2
Comparison: B.Wall > A.Wall → Update B wins
Result: Node 3 applies Update B (higher timestamp)Clock advance on message receive:
Node 2 local clock: Wall=1736000000000, Logical=5
Receives PREPARE from Node 1: Wall=1736000000002, Logical=1
Update logic:
localWall (1736000000000) < remoteWall (1736000000002)
→ Set wall = max(local, remote, physicalNow) = 1736000000002
→ Reset logical = 0 (new millisecond)
Node 2 new clock: Wall=1736000000002, Logical=0Anti-Entropy Background Sync
Purpose
Repairs divergence caused by:
- Network partitions healing
- Node restarts with stale data
- Missed transactions during temporary failures
Sync Strategy Decision
Lag Detection:
localMaxTxnID vs peerMaxTxnID
localTxnCount vs peerTxnCount
Decision:
IF lag < DeltaSyncThreshold (default 10K txns)
AND timeLag < DeltaSyncThresholdSeconds (default 1hr)
THEN → Delta Sync (incremental)
ELSE → Snapshot Transfer (full)Delta Sync Flow
- Query peer's replication state
- Stream transactions via
StreamChangesRPC - Gap detection: If
event.SeqNum - expectedNext > threshold→ snapshot fallback - Apply each transaction via REPLAY phase (bypasses 2PC)
- Update local replication state
Gap Detection Algorithm
if event.SeqNum > expectedNextTxn {
gap := event.SeqNum - expectedNextTxn
if gap > DeltaSyncThresholdTxns {
// Transactions GC'd - fall back to snapshot
return ErrGapDetected
}
}
expectedNextTxn = event.SeqNum + 1Snapshot Transfer
- Get snapshot info: Files, checksums, txn_id
- Stream chunks: 4MB each with MD5 verification
- Atomic restore: Replace database files
- Resume normal replication from snapshot txn_id
Example: Anti-Entropy Scenarios
Using our 3-node cluster with the users table:
Scenario 1: Node 3 Brief Network Partition (Delta Sync)
Timeline:
T0: All nodes in sync, SeqNum=40
T1: Node 3 loses network for 5 minutes
T2: Node 1, Node 2 continue processing:
- SeqNum 41: UPDATE users SET balance=75 WHERE id=1
- SeqNum 42: INSERT INTO users VALUES (3, 'carol@...', 'Carol', 200)
- SeqNum 43: DELETE FROM users WHERE id=2
T3: Node 3 reconnects
Anti-Entropy on Node 3:
1. Query Node 1: "What's your max SeqNum?" → 43
2. Local state: LastSeqNum=40
3. Lag = 43 - 40 = 3 transactions (< 10K threshold)
4. Decision: Delta Sync
Delta Sync Flow:
Node 3 → Node 1: StreamChanges(fromSeqNum=41)
Node 1 streams:
{SeqNum: 41, TxnID: 0x...0001, CDC: [UPDATE users...]}
{SeqNum: 42, TxnID: 0x...0002, CDC: [INSERT users...]}
{SeqNum: 43, TxnID: 0x...0003, CDC: [DELETE users...]}
Node 3 applies each via REPLAY (bypasses 2PC):
- Execute CDC for SeqNum 41 → balance=75 for id=1
- Execute CDC for SeqNum 42 → insert id=3
- Execute CDC for SeqNum 43 → delete id=2
Node 3 now in sync: SeqNum=43Scenario 2: New Node Joining (Snapshot Transfer)
Initial state:
Node 1: SeqNum=50000, users table has 10K rows
Node 2: SeqNum=50000
Node 4: New node, empty database
Node 4 joins cluster:
1. Gossip announces Node 4 as JOINING
2. Anti-entropy runs on Node 4
3. Query Node 1: "What's your max SeqNum?" → 50000
4. Local state: SeqNum=0
5. Lag = 50000 (> 10K threshold)
6. Decision: Snapshot Transfer
Snapshot Transfer Flow:
Node 4 → Node 1: GetSnapshotInfo(db="mydb")
Node 1 returns:
{
Files: ["mydb.db", "mydb.db-wal"],
TotalSize: 50MB,
Checksums: {...},
AtTxnID: 0x...C350,
AtSeqNum: 50000
}
Node 4 → Node 1: StreamSnapshot(db="mydb")
Node 1 streams 4MB chunks:
Chunk 1: bytes[0..4MB], MD5=abc123
Chunk 2: bytes[4MB..8MB], MD5=def456
...
Chunk 13: bytes[48MB..50MB], MD5=xyz789
Node 4 assembles and verifies:
- Write chunks to temp file
- Verify each chunk MD5
- Atomic rename to mydb.db
- Set replication state: SeqNum=50000
Node 4 state: JOINING → ALIVEScenario 3: Gap Detection (Partial GC)
Timeline:
T0: Node 3 offline for 25 hours
T1: GC runs on Node 1, Node 2:
- Transactions SeqNum 1-1000 are > 24hrs old
- Force GC deletes them (gc_max_retention=24hr)
T2: Node 3 reconnects, has SeqNum=500
Delta Sync Attempt:
Node 3 → Node 1: StreamChanges(fromSeqNum=501)
Node 1 scans /txn_idx/seq/:
- First available: SeqNum=1001
- Requested: SeqNum=501
- Gap detected: 501-1000 are missing (GC'd)
Node 1 returns: ErrGapDetected
Node 3 falls back to Snapshot TransferGossip Protocol (SWIM)
Node States
| State | Description | Replication |
|---|---|---|
ALIVE | Healthy, responding | Receives writes |
SUSPECT | May be dead, awaiting confirmation | Receives writes |
DEAD | Confirmed failed | Excluded |
JOINING | Catching up | Gossip only |
REMOVED | Admin removed | Excluded |
State Transitions
New node → JOINING → (catch-up complete) → ALIVE
ALIVE → (no response for SuspectTimeout) → SUSPECT
SUSPECT → (no response for DeadTimeout) → DEAD
SUSPECT → (received message) → ALIVESWIM Protocol
- Gossip interval: 1 second
- Fanout: 3 random peers per round
- Suspect timeout: 15 seconds
- Dead timeout: 30 seconds
Incarnation Numbers
Higher incarnation always wins (prevents stale state propagation):
if incoming.Incarnation > local.Incarnation {
updateLocalState(incoming)
} else if incoming.Incarnation == local.Incarnation {
// Only allow escalation: ALIVE → SUSPECT → DEAD
}Example: Gossip and Failure Detection
Scenario: Node 2 crashes during our UPDATE transaction
Initial cluster state (all nodes ALIVE):
Node 1: {ID: 1, State: ALIVE, Incarnation: 5, Addr: "10.0.0.1:5000"}
Node 2: {ID: 2, State: ALIVE, Incarnation: 3, Addr: "10.0.0.2:5000"}
Node 3: {ID: 3, State: ALIVE, Incarnation: 7, Addr: "10.0.0.3:5000"}
T0: Node 2 crashes (process killed)
T1 (+1s): Gossip round on Node 1
- Node 1 randomly selects Node 2 for ping
- gRPC Ping("10.0.0.2:5000") → timeout (5s)
- No response, try indirect probe via Node 3
T2 (+6s): Indirect probe
- Node 1 → Node 3: "Please ping Node 2 for me"
- Node 3 → Node 2: Ping → timeout
- Node 3 → Node 1: "Node 2 didn't respond"
T3 (+7s): Node 1 marks Node 2 as SUSPECT
Gossip message: {NodeID: 2, State: SUSPECT, Incarnation: 3}
T4 (+8s): Gossip spreads to Node 3
- Node 3 receives SUSPECT for Node 2
- Updates local state: Node 2 = SUSPECT
T5 (+22s): Suspect timeout (15s) expires
- No refutation from Node 2
- Node 1 marks Node 2 as DEAD
Gossip message: {NodeID: 2, State: DEAD, Incarnation: 3}
Effect on replication:
- Quorum calculation now excludes Node 2
- Total membership still = 3 (for split-brain safety)
- Required quorum = 2 (floor(3/2)+1)
- Node 1 + Node 3 can still achieve quorumScenario: False positive (Node 2 was just slow)
T0: Node 2 under heavy load (GC pause)
T1 (+5s): Node 1 marks Node 2 as SUSPECT
Gossip: {NodeID: 2, State: SUSPECT, Incarnation: 3}
T2 (+7s): Node 2 GC pause ends, receives SUSPECT message about itself
- "I'm being suspected? I'm alive!"
- Increments incarnation: 3 → 4
- Broadcasts refutation: {NodeID: 2, State: ALIVE, Incarnation: 4}
T3 (+8s): Node 1 receives refutation
- Incoming incarnation (4) > local (3)
- Updates: Node 2 = ALIVE, Incarnation = 4
- Node 2 stays in cluster
Quorum unaffected: Node 2 was never marked DEADDDL Replication
Cluster-Wide Locking
type DDLLock struct {
Database string
NodeID uint64
TxnID uint64
ExpiresAt time.Time // 30-second lease
}- One DDL per database at a time
- Lock expires automatically (handles node crashes)
- Different databases can have concurrent DDL
Idempotent Rewriting
DDL automatically rewritten for safe replay:
| Original | Rewritten |
|---|---|
CREATE TABLE foo | CREATE TABLE IF NOT EXISTS foo |
DROP TABLE foo | DROP TABLE IF EXISTS foo |
CREATE INDEX idx | CREATE INDEX IF NOT EXISTS idx |
Schema Version Tracking
Each database maintains a version counter:
- Incremented on every DDL
- Exchanged via gossip
- Delta sync validates schema version before applying DML
Example: DDL Replication for ALTER TABLE
Adding a column to our users table:
-- Node 1 executes:
ALTER TABLE users ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;T0: Node 1 acquires DDL lock
Key: /ddl_lock/mydb
Value: {NodeID: 1, TxnID: 0x...0001, ExpiresAt: now()+30s}
T1: Node 1 broadcasts DDL lock acquisition via gossip
All nodes pause DML on "mydb" database
T2: Node 1 executes locally
- ALTER TABLE users ADD COLUMN ...
- Increment schema version: /schema/mydb → 2
T3: Node 1 broadcasts DDL to replicas (not CDC, raw SQL)
gRPC ReplicateDDL:
{
Database: "mydb",
Statement: "ALTER TABLE IF NOT EXISTS users ADD COLUMN IF NOT EXISTS
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
SchemaVersion: 2,
TxnID: 0x...0001
}
Note: Statement rewritten for idempotency
T4: Node 2, Node 3 execute DDL
- Parse and execute SQL directly
- Update local schema version: 2
- Return success
T5: Node 1 releases DDL lock
- Delete /ddl_lock/mydb
- DML resumes on all nodes
T6: Subsequent DML includes new column
INSERT INTO users (id, email, name, balance)
VALUES (4, 'dave@...', 'Dave', 100);
CDC captures:
{
NewValues: {
"id": 4,
"email": "dave@example.com",
"name": "Dave",
"balance": 100,
"created_at": "2025-01-05 12:30:00" ← New column included
}
}Concurrent DDL conflict:
T0: Node 1 tries ALTER TABLE users ADD COLUMN age INT
T0: Node 2 tries ALTER TABLE users ADD COLUMN phone TEXT
Node 1:
- Acquires /ddl_lock/mydb (wins)
- Proceeds with DDL
Node 2:
- Tries to acquire /ddl_lock/mydb
- Lock exists, owned by Node 1
- Check expiry: 25 seconds remaining
- Return error: "DDL lock held by Node 1"
- Client retries after Node 1 completesRecovery Scenarios
Network Partition
| Scenario | Behavior |
|---|---|
| Minority partition | Writes fail (cannot achieve quorum) |
| Majority partition | Writes succeed |
| Partition heals | Delta sync + LWW merges data |
Node Failure & Recovery
| Scenario | Recovery Method |
|---|---|
| Brief outage (<1hr) | Delta sync from peers |
| Extended outage | Snapshot transfer |
| New node joining | Full snapshot + delta sync |
Transaction Failure
| Failure Point | Recovery |
|---|---|
| PREPARE timeout | Abort all nodes |
| Partial PREPARE success | Abort if quorum not achieved |
| COMMIT after remote quorum | Local must succeed (prepared) |
| Node crash during PREPARE | Heartbeat timeout → intents cleaned |
| Node crash during COMMIT | Anti-entropy replays from peers |
Example: Recovery Scenarios for Our UPDATE
Scenario 1: Network Partition During PREPARE
Initial: UPDATE users SET balance = 75 WHERE id = 1 on Node 1
T0: Node 1 executes locally, creates intent
T1: Network partition isolates Node 1 from Node 2, Node 3
Node 1 (minority partition):
- Sends PREPARE to Node 2, Node 3
- Both timeout after 5s
- Required quorum: 2, achieved: 1 (self)
- Decision: ABORT
Node 1 rollback:
- Delete local intent: /intent/users/users:1
- Update txn record: Status=ABORTED
- Return error to client: "quorum not achieved"
Client retries after partition heals.Scenario 2: Coordinator Crash After PREPARE, Before COMMIT
T0: Node 1 PREPARE succeeds on all nodes
All nodes have intent for "users:1"
T1: Node 1 crashes before sending COMMIT
T2 (+10s): Heartbeat timeout on Node 2
- GC scans /txn_idx/pend/
- Finds TxnID 0x...0001 with no recent heartbeat
- Checks txn status: PENDING
- Marks intents for cleanup
T3: Cleanup on Node 2, Node 3
- Delete intent: /intent/users/users:1
- Update txn: Status=ABORTED
- Remove from Cuckoo filter
Result: Transaction rolled back, row "users:1" unchanged
Client receives timeout, can retryScenario 3: Replica Crash During COMMIT
T0: Node 1 PREPARE succeeds (quorum achieved)
T1: Node 1 sends COMMIT to Node 2, Node 3
T2: Node 3 crashes while applying CDC
State:
Node 1: COMMITTED (coordinator)
Node 2: COMMITTED
Node 3: CRASHED (didn't complete commit)
T3: Node 3 restarts
T4: Anti-entropy on Node 3
- Local SeqNum = 40 (before crash)
- Query Node 1: SeqNum = 41
- Delta sync: get SeqNum 41 (our UPDATE)
- Apply CDC via REPLAY
- Node 3 now has balance=75 for id=1
Result: All nodes converged, transaction successfulScenario 4: Partial PREPARE Success
T0: Node 1 sends PREPARE to Node 2, Node 3
T1: Responses:
Node 2: SUCCESS (intent created)
Node 3: CONFLICT (another transaction holds intent)
T2: Quorum check
Required: 2 (floor(3/2)+1)
SUCCESS responses: 1 (Node 2) + 1 (self) = 2
But Node 3 returned CONFLICT
T3: Decision depends on consistency level
If QUORUM: 2 SUCCESS ≥ 2 required → can proceed
If ALL: 3 required, only 2 → must ABORT
With QUORUM consistency:
- Send COMMIT to Node 2 (achieved remote quorum with just Node 2)
- Commit locally
- Node 3 will get update via anti-entropy later
With ALL consistency:
- Send ABORT to Node 2
- Rollback locally
- Return CONFLICT to clientGarbage Collection
Two-Phase GC
Phase 1 (Critical, always runs):
- Abort transactions without heartbeat for
HeartbeatTimeout(10s) - Clean orphaned write intents
Phase 2 (Background, skipped under load):
- Delete COMMITTED/ABORTED records older than
GCMinRetention(2hr) - Force delete after
GCMaxRetention(24hr) regardless of peer state
GC Coordination
- Track per-peer replication state
- Query minimum applied txn_id across peers
- Only GC transactions all peers have applied
- Gap detection prevents data loss if GC runs while nodes offline
Example: Garbage Collection Timeline
Continuing our UPDATE scenario over time:
T0: UPDATE users SET balance = 75 WHERE id = 1 committed
TxnID: 0x0001A2B3C4D50001, SeqNum: 42
All nodes: Status=COMMITTED
T0 + 5s: Phase 1 GC runs
- Scan /txn_idx/pend/ for stale transactions
- Our transaction is COMMITTED, not pending
- No action needed
T0 + 1hr: Phase 2 GC considers deletion
- Transaction age: 1 hour (< gc_min_retention=2hr)
- Skipped: too recent
T0 + 2.5hr: Phase 2 GC runs again
- Transaction age: 2.5 hours (> gc_min_retention=2hr)
- Check peer replication state:
/repl/node2/mydb → LastSeqNum: 42 ✓
/repl/node3/mydb → LastSeqNum: 42 ✓
- All peers have applied SeqNum 42
- Safe to delete
Cleanup:
DELETE /txn/0x0001A2B3C4D50001
DELETE /txn_idx/seq/42/0x0001A2B3C4D50001
DELETE /cdc/0x0001A2B3C4D50001/1
T0 + 2.5hr: MetaStore state after GC
- Transaction record: GONE
- CDC entries: GONE
- SQLite data: balance=75 for id=1 (permanent)GC blocked by lagging peer:
T0: Transactions SeqNum 40-50 committed
T1: Node 3 goes offline
T0 + 3hr: Phase 2 GC on Node 1
- Check peer states:
/repl/node2/mydb → LastSeqNum: 50 ✓
/repl/node3/mydb → LastSeqNum: 39 ✗ (offline, stale)
- Minimum across peers: 39
- Cannot GC SeqNum 40-50 (Node 3 hasn't applied them)
- Skip cleanup to preserve delta sync capability
T0 + 25hr: Force GC kicks in (> gc_max_retention=24hr)
- Transaction SeqNum 40-50 are > 24 hours old
- Force delete regardless of peer state
- Node 3 will need snapshot transfer when it returnsPerformance Optimizations
| Optimization | Mechanism |
|---|---|
| Cuckoo filter | O(1) conflict detection, FP rate ~2.3×10⁻¹⁰ |
| Sharded mutexes | 256 shards reduce lock contention |
| Group commit | Batch Pebble writes (100 ops or 2ms) |
| Sequence pre-allocation | Batch 1000 sequence numbers |
| Object pooling | Reuse allocations in hot paths |
| Early quorum exit | Don't wait for slow nodes |
| Parallel replication | Goroutines for all replica communication |
| CDC replay | Binary values, no SQL parsing |
| NoSync cleanup | Intent cleanup is idempotent |
Limitations
Design Constraints
- Single writer per node: SQLite WAL limitation
- 64 nodes max: 6-bit NodeID in transaction ID
- ~65k txns/ms/node: 16-bit logical counter
- Full replication only: No sharding/partitioning
- Eventual consistency: No strict serializability across nodes
Not Supported
- Distributed deadlock detection (client retries)
- Read repair (anti-entropy only)
- Cascading rollback (best-effort abort)
- XA transactions (uses own 2PC)
- Table-level selective replication
Complete End-to-End Example
This section traces our UPDATE users SET balance = 75 WHERE id = 1 through the entire system lifecycle:
┌─────────────────────────────────────────────────────────────────────────────┐
│ COMPLETE TRANSACTION LIFECYCLE │
│ UPDATE users SET balance = 75 WHERE id = 1 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ T0: CLIENT REQUEST │
│ ══════════════════ │
│ MySQL client → Node 1 (port 3306) │
│ Query: UPDATE users SET balance = 75 WHERE id = 1 │
│ │
│ T1: LOCAL EXECUTION + CDC CAPTURE │
│ ═══════════════════════════════ │
│ Node 1 hookDB executes UPDATE │
│ ↓ │
│ Preupdate hook fires (SQLITE_UPDATE) │
│ ↓ │
│ CDC Entry created: │
│ Key: /cdc/0x0001A2B3C4D50001/1 │
│ Value: {Op: UPDATE, Table: "users", IntentKey: "users:1", │
│ OldValues: {balance: 100}, NewValues: {balance: 75}} │
│ │
│ T2: WRITE INTENT CREATION │
│ ══════════════════════════ │
│ Cuckoo filter: Add hash("users:1") │
│ Mutex: Lock shard[hash("users:1") % 256] │
│ MetaStore write: │
│ /intent/users/users:1 → {TxnID: 0x...0001, Status: PENDING} │
│ /intent_txn/0x...0001/users/users:1 → (index) │
│ /txn/0x...0001 → {Status: PENDING, SeqNum: 42} │
│ │
│ T3: 2PC PREPARE PHASE (parallel to Node 2, Node 3) │
│ ═══════════════════════════════════════════════════ │
│ │
│ Node 1 ──gRPC──→ Node 2: ReplicateTransaction(PREPARE) │
│ │ ↓ │
│ │ Cuckoo check "users:1" → MISS │
│ │ Create intent, store CDC │
│ │ ↓ │
│ └──────────────← SUCCESS │
│ │
│ Node 1 ──gRPC──→ Node 3: ReplicateTransaction(PREPARE) │
│ │ ↓ │
│ │ Cuckoo check "users:1" → MISS │
│ │ Create intent, store CDC │
│ │ ↓ │
│ └──────────────← SUCCESS │
│ │
│ Quorum: 3/3 SUCCESS (need 2 for 3-node cluster) ✓ │
│ │
│ T4: 2PC COMMIT PHASE │
│ ═══════════════════ │
│ │
│ Node 1 ──gRPC──→ Node 2: ReplicateTransaction(COMMIT) │
│ │ ↓ │
│ │ Read /cdc/0x...0001/1 │
│ │ Generate: UPDATE users SET ... WHERE id=1 │
│ │ Execute on SQLite │
│ │ Update /txn/0x...0001 → COMMITTED │
│ │ ↓ │
│ └──────────────← CommitTS │
│ │
│ Node 1 ──gRPC──→ Node 3: ReplicateTransaction(COMMIT) │
│ │ ↓ │
│ │ (same as Node 2) │
│ │ ↓ │
│ └──────────────← CommitTS │
│ │
│ Remote quorum achieved (2/2) ✓ │
│ │
│ T5: COORDINATOR LOCAL COMMIT │
│ ═════════════════════════════ │
│ Node 1: │
│ - Apply CDC to own SQLite │
│ - Update /txn/0x...0001 → COMMITTED │
│ - Schedule intent cleanup (background) │
│ │
│ T6: RESPONSE TO CLIENT │
│ ═══════════════════════ │
│ Node 1 → MySQL client: OK, 1 row affected │
│ │
│ T7: BACKGROUND CLEANUP (async) │
│ ═════════════════════════════ │
│ All nodes: │
│ - Delete /intent/users/users:1 │
│ - Delete /intent_txn/0x...0001/users/users:1 │
│ - Remove from Cuckoo filter │
│ - Move /txn_idx/pend/0x...0001 → /txn_idx/seq/42/0x...0001 │
│ │
│ T8: GOSSIP PROPAGATION │
│ ═══════════════════════ │
│ Node 1 gossips to random peers: │
│ {Type: TXN_COMMITTED, TxnID: 0x...0001, SeqNum: 42} │
│ Replication state updated: │
│ /repl/node1/mydb → {LastSeqNum: 42} │
│ │
│ T9: FINAL STATE (all nodes) │
│ ════════════════════════════ │
│ SQLite: users.balance = 75 WHERE id = 1 │
│ MetaStore: │
│ /txn/0x...0001 → {Status: COMMITTED, SeqNum: 42} │
│ /cdc/0x...0001/1 → (kept for anti-entropy) │
│ Cuckoo filter: "users:1" removed │
│ │
│ T10: GC (after 2+ hours) │
│ ═════════════════════════ │
│ Delete transaction metadata, keep only SQLite data │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Configuration Reference
[replication]
default_write_consistency = "QUORUM"
default_read_consistency = "LOCAL_ONE"
write_timeout_ms = 5000
read_timeout_ms = 2000
# Anti-Entropy
enable_anti_entropy = true
anti_entropy_interval_seconds = 60
delta_sync_threshold_transactions = 10000
delta_sync_threshold_seconds = 3600
# Garbage Collection
gc_min_retention_hours = 2
gc_max_retention_hours = 24
[transaction]
heartbeat_timeout_seconds = 10
conflict_window_seconds = 10
lock_wait_timeout_seconds = 50
[ddl]
lock_lease_seconds = 30
enable_idempotent = true
[cluster]
gossip_interval_ms = 1000
gossip_fanout = 3
suspect_timeout_ms = 5000
dead_timeout_ms = 10000Key Files Reference
| File | Purpose |
|---|---|
coordinator/write_coordinator.go | 2PC protocol implementation |
coordinator/handler.go | MySQL query routing, CDC extraction |
db/meta_store_pebble.go | PebbleDB storage, write intents |
db/preupdate_hook.go | CDC capture via SQLite hooks |
db/intent_filter.go | Cuckoo filter for conflict detection |
db/transaction.go | Transaction lifecycle, GC |
grpc/replication_handler.go | Remote 2PC phase handling |
grpc/anti_entropy.go | Background sync coordination |
grpc/delta_sync.go | Incremental catch-up |
grpc/gossip.go | SWIM protocol, membership |
hlc/clock.go | Hybrid Logical Clock |
publisher/ | CDC publishing system |
Next Steps
- Integrations - CDC publishing to Kafka, NATS, and external systems