Replication Architecture
Marmot v2 implements a Percolator-style Two-Phase Commit (2PC) protocol for distributed SQLite replication. This document provides comprehensive technical details on the architecture, data flows, and edge case handling.
Design Philosophy
- CDC-first: Row-level Change Data Capture, not SQL statement replay
- Leaderless: Any node can coordinate writes (no master election)
- Quorum-based: Configurable consistency levels (ONE, QUORUM, ALL)
- Eventually consistent: Anti-entropy background sync repairs divergence
- Full replication: All nodes receive all data (no sharding)
Running Example: Users Table
Throughout this document, we'll use a consistent example to trace data through all stages:
-- Schema on all 3 nodes (Node 1, Node 2, Node 3)
CREATE TABLE users (
id INTEGER PRIMARY KEY,
email TEXT UNIQUE,
name TEXT,
balance INTEGER DEFAULT 0
);
-- Initial data (already replicated)
-- id=1: alice@example.com, Alice, balance=100
-- id=2: bob@example.com, Bob, balance=50Scenario: Node 1 executes UPDATE users SET balance = 75 WHERE id = 1
We'll trace this UPDATE through CDC capture, 2PC protocol, conflict detection, and recovery scenarios.
Two-Phase Commit Protocol
Marmot's 2PC differs from traditional database 2PC by using write intents for conflict detection and CDC replay for deterministic replication.
Phase 1: PREPARE
- Coordinator executes locally with preupdate hooks capturing CDC data
- Write intents created for each affected row (distributed locks)
- Captured rows encoded as canonical msgpack
EncodedCapturedRowrecords - DML bytes stored in the local CDC segment log; Pebble tracks transaction metadata and row-lock indexes
- Broadcast to replicas with encoded row bytes (not SQL)
- Replicas create write intents and store the same encoded row bytes in their local CDC segment log
- Return SUCCESS or CONFLICT to coordinator
Coordinator Replica Nodes
========== =============
Execute with CDC hooks
↓
Create write intents
↓
ReplicateTransaction(PREPARE) → Create write intents
Store encoded rows in CDC segment log
← Return: Success/ConflictPhase 2: COMMIT
- Wait for quorum of PREPARE responses
- Send COMMIT to remote nodes first (not local)
- Wait for remote quorum acknowledgment
- Commit locally only after remote quorum achieved
- Cleanup: Delete write intents and CDC entries
Coordinator Replica Nodes
========== =============
Remote quorum PREPARE achieved ✓
↓
ReplicateTransaction(COMMIT) → Decode and apply CDC rows to SQLite
Mark transaction COMMITTED
← Return: CommitTS
Remote quorum COMMIT achieved ✓
↓
Commit locally
↓
Cleanup intentsSpecial Case: LOAD DATA LOCAL INFILE
LOAD DATA LOCAL INFILE is replicated as a bulk-load operation with payload transfer, not as row-by-row CDC extraction:
- Coordinator receives file bytes from client over MySQL protocol.
- PREPARE broadcasts load metadata (
load_id, payload size, chunk hint). - Replicas pull payload chunks from coordinator and persist durable LOAD intents.
- COMMIT applies the same payload locally on each node.
- Operation is recorded in CDC stream for replay/catch-up.
This keeps LOAD DATA LOCAL INFILE deterministic and durable across cluster members while preserving normal 2PC quorum semantics.
Example: 2PC for UPDATE users SET balance = 75 WHERE id = 1
Step 1: Node 1 (Coordinator) executes locally
Time: T1 (TxnID = 0x0001A2B3C4D50001)
Physical: 1736000000000ms | NodeID: 1 | Logical: 1
Node 1 executes UPDATE on hookDB connection
→ Preupdate hook fires: SQLITE_UPDATE on "users"
→ Captures row images and primary-key intent key
→ Encodes one msgpack EncodedCapturedRowStep 2: Coordinator creates write intent
MetaStore Write on Node 1:
Key: /intent/users/users:1
Value: {
TxnID: 0x0001A2B3C4D50001,
NodeID: 1,
StartTS: {Wall: 1736000000000000000, Logical: 1},
IntentType: UPDATE,
MarkedForCleanup: false
}Step 3: Broadcast PREPARE to Node 2 and Node 3
gRPC ReplicateTransaction (parallel to both):
{
Phase: PREPARE,
TxnID: 0x0001A2B3C4D50001,
NodeID: 1,
Database: "mydb",
Statements: [{
Type: UPDATE,
TableName: "users",
RowChange: {
EncodedRowCodec: 1,
EncodedRow: msgpack({
Table: "users",
Op: UPDATE,
IntentKey: binary("users:1"),
OldValues: {"id": msgpack(1), "balance": msgpack(100)},
NewValues: {"id": msgpack(1), "balance": msgpack(75)}
})
}
}]
}Step 4: Node 2 and Node 3 process PREPARE
Node 2:
1. Check Cuckoo filter for "users:1" → MISS (no conflict)
2. Create write intent: /intent/users/users:1
3. Append encoded row to local cdcseg segment and publish transaction manifest
4. Return: {Status: SUCCESS, PrepareTS: ...}
Node 3:
1. Check Cuckoo filter for "users:1" → MISS (no conflict)
2. Create write intent: /intent/users/users:1
3. Append encoded row to local cdcseg segment and publish transaction manifest
4. Return: {Status: SUCCESS, PrepareTS: ...}Step 5: Quorum achieved (2/3), send COMMIT
Node 1 received 2 SUCCESS responses (quorum = 2 for 3-node cluster)
gRPC ReplicateTransaction to Node 2, Node 3:
{
Phase: COMMIT,
TxnID: 0x0001A2B3C4D50001,
CommitTS: {Wall: 1736000000005000000, Logical: 0}
}Step 6: Replicas apply CDC and commit
Node 2:
1. Read encoded row from the local cdcseg transaction manifest
2. Generate SQL: UPDATE users SET id=1, email='alice@example.com',
name='Alice', balance=75 WHERE id=1
3. Execute on SQLite
4. Update txn record: Status=COMMITTED
5. Delete write intent (background)
6. Return: {CommitTS: ...}
Node 3: (same process)Step 7: Coordinator commits locally
Node 1:
1. Remote quorum (2/3) committed ✓
2. Apply own encoded CDC row to SQLite
3. Update txn record: Status=COMMITTED
4. Cleanup write intents
5. Return success to clientWhy Coordinator Commits Last
If remote quorum fails after local commit, the coordinator would be in an inconsistent state. By committing remotely first, abort is always safe.
Quorum Calculation
Critical: Quorum uses total cluster membership, not just alive nodes.
Quorum = floor(TotalMembership / 2) + 1| Cluster Size | Quorum | Survives |
|---|---|---|
| 3 nodes | 2 | 1 failure |
| 5 nodes | 3 | 2 failures |
| 7 nodes | 4 | 3 failures |
Split-brain prevention: In a 6-node cluster split 3x3:
- Using alive nodes: Each partition sees N=3, quorum=2 → both write (dangerous)
- Using total membership: N=6, quorum=4 → neither writes (safe)
CDC (Change Data Capture) Architecture
Why CDC Instead of SQL Replay
| SQL Replay | CDC Replay |
|---|---|
| Non-deterministic (UUID, RANDOM, NOW) | Exact byte-for-byte values |
| Parsing differences between dialects | Binary msgpack encoding |
| Auto-increment sequence conflicts | Coordinator assigns distributed IDs |
| Slower (parse + execute) | Faster (direct row application) |
CDC Capture Flow
Row changes are captured via SQLite's preupdate hook API:
User Query (INSERT/UPDATE/DELETE)
↓
Execute on dedicated hookDB connection
↓
Preupdate hook fires BEFORE each row modification
↓
Extract: table, intentKey, oldValues, newValues
↓
Serialize one EncodedCapturedRow to msgpack
↓
Keep in bounded hook-session memory or spill to cdcseg for oversized transactions
↓
Release hookDB before 2PC broadcastEncoded Row Format
type EncodedCapturedRow struct {
Table string
Op uint8 // INSERT=0, REPLACE=1, UPDATE=2, DELETE=3
IntentKey []byte // Binary intent key
OldValues map[string][]byte // Before image, msgpack per column
NewValues map[string][]byte // After image, msgpack per column
}The DML wire payload is the encoded msgpack bytes plus a codec id. Decoded maps are reconstructed locally only for SQLite apply, vector-index hooks, and CDC publishing.
Intent Key Format
Single PK: "table:value" or "table:b64:base64_value"
Composite PK: "table:c:b64_col1:b64_col2:..." (columns sorted alphabetically)
NULL handling: "table:\x00NULL\x00" sentinel value
CDC Application on Replicas
| Operation | SQL Generated |
|---|---|
| INSERT | INSERT OR REPLACE INTO table (...) VALUES (...) |
| UPDATE | UPDATE table SET ... WHERE pk=... using the decoded old primary key image |
| DELETE | DELETE FROM table WHERE pk=... using the decoded old primary key image |
Example: Encoded Row for Our UPDATE
When Node 1 executes UPDATE users SET balance = 75 WHERE id = 1:
DML payload stored in cdcseg and referenced by /cdc_manifest/{txnID}:
Manifest: /cdc_manifest/0x0001A2B3C4D50001
Segment range: cdcseg-00000042.log offset=8192 length=148
EncodedCapturedRow (msgpack):
{
Table: "users",
Op: 2, // UPDATE
IntentKey: "users:1", // Single PK format
OldValues: {
"id": 0x91 0x01, // msgpack fixint: 1
"email": 0xB1 "alice@example.com", // msgpack fixstr
"name": 0xA5 "Alice", // msgpack fixstr
"balance": 0xCC 0x64 // msgpack uint8: 100
},
NewValues: {
"id": 0x91 0x01, // msgpack fixint: 1
"email": 0xB1 "alice@example.com",
"name": 0xA5 "Alice",
"balance": 0xCC 0x4B // msgpack uint8: 75
}
}Generated SQL on Replicas:
-- Replicas execute this (generated from CDC, not original SQL):
UPDATE users SET
id = 1,
email = 'alice@example.com',
name = 'Alice',
balance = 75
WHERE id = 1Example: CDC for INSERT and DELETE
INSERT INSERT INTO users (id, email, name, balance) VALUES (3, 'carol@example.com', 'Carol', 200):
EncodedCapturedRow:
{
Op: 0, // INSERT
Table: "users",
IntentKey: "users:3",
OldValues: {}, // Empty for INSERT
NewValues: {
"id": 3, "email": "carol@example.com", "name": "Carol", "balance": 200
}
}
Generated SQL: INSERT OR REPLACE INTO users (id, email, name, balance)
VALUES (3, 'carol@example.com', 'Carol', 200)DELETE DELETE FROM users WHERE id = 2:
EncodedCapturedRow:
{
Op: 3, // DELETE
Table: "users",
IntentKey: "users:2",
OldValues: {
"id": 2, "email": "bob@example.com", "name": "Bob", "balance": 50
},
NewValues: {} // Empty for DELETE
}
Generated SQL: DELETE FROM users WHERE id = 2Example: Composite Primary Key
CREATE TABLE order_items (
order_id INTEGER,
item_id INTEGER,
quantity INTEGER,
PRIMARY KEY (order_id, item_id)
);
UPDATE order_items SET quantity = 5 WHERE order_id = 100 AND item_id = 42;IntentKey Format: "order_items:c:MTAw:NDI="
^table ^composite marker
^base64(order_id=100):base64(item_id=42)
(columns sorted alphabetically: item_id, order_id)Write Intents & Conflict Detection
Write Intent Structure
Write intents are distributed locks stored in MetaStore:
Key: /intent/{tableName}/{intentKey}
Value: WriteIntentRecord {
TxnID, NodeID, StartTS, IntentType,
Statement, DataSnapshot, MarkedForCleanup
}
Index: /intent_txn/{txnID}/{tableName}/{intentKey} // For cleanupCuckoo Filter Fast Path
A Cuckoo filter provides O(1) conflict detection:
| Parameter | Value |
|---|---|
| Buckets | 250,000 |
| Bucket size | 4 entries |
| Fingerprint | 32-bit |
| Capacity | ~1 million |
| False positive rate | ~2.3×10⁻¹⁰ |
Conflict check flow:
WriteIntent(table, intentKey)
↓
hash = XXH64(table + ":" + intentKey)
↓
filter.Check(hash)
│
├─ MISS → Fast path: write directly to Pebble
│
└─ HIT → Slow path: lookup actual intent in Pebble
├─ No intent found → False positive, proceed
├─ Same txnID → Update in-place
└─ Different txnID → Check if overwritable:
├─ MarkedForCleanup=true → Overwrite
├─ Status=COMMITTED/ABORTED → Overwrite
├─ Heartbeat timeout → Overwrite (stale)
└─ Active transaction → Return CONFLICTSharded Mutexes
256 sharded mutexes prevent TOCTOU races:
func intentLockFor(table, intentKey string) *sync.Mutex {
return &intentLocks[xxhash.Sum64String(table+":"+intentKey) % 256]
}Example: Conflict Detection Scenarios
Using our users table, here are concrete conflict scenarios:
Scenario 1: No Conflict (Different Rows)
Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
→ Creates intent for "users:1"
Time T2: Node 2 starts UPDATE users SET balance = 30 WHERE id = 2
→ Cuckoo filter check for "users:2" → MISS
→ Creates intent for "users:2"
→ Both transactions succeed (different rows)Scenario 2: Conflict (Same Row, Different Nodes)
Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
TxnID: 0x0001A2B3C4D50001
→ Creates intent: /intent/users/users:1 → {TxnID: 0x0001A2B3C4D50001}
→ Adds to Cuckoo filter: hash("users:1")
Time T2: Node 2 starts UPDATE users SET balance = 50 WHERE id = 1
TxnID: 0x0001A2B3C4D60001
→ Cuckoo filter check for "users:1" → HIT
→ Lookup Pebble: /intent/users/users:1
→ Found intent with different TxnID (0x0001A2B3C4D50001)
→ Check if overwritable:
- MarkedForCleanup = false
- Status = PENDING (not COMMITTED/ABORTED)
- LastHeartbeat = recent (not stale)
→ Return CONFLICT to Node 2
→ Node 2 aborts and retries after backoffScenario 3: Stale Intent (Coordinator Crashed)
Time T1: Node 1 starts UPDATE users SET balance = 75 WHERE id = 1
→ Creates intent: /intent/users/users:1
→ Node 1 crashes before COMMIT
Time T2: (10 seconds later, heartbeat timeout)
GC on Node 2 scans pending transactions
→ Finds intent with LastHeartbeat > 10s ago
→ Marks intent for cleanup
Time T3: Node 3 starts UPDATE users SET balance = 50 WHERE id = 1
→ Cuckoo filter check → HIT
→ Lookup Pebble → Found intent
→ Check: MarkedForCleanup = true
→ Overwrite allowed → Creates new intent
→ Transaction succeedsScenario 4: Same Transaction Re-entry
Time T1: Node 1 PREPARE phase creates intent for "users:1"
TxnID: 0x0001A2B3C4D50001
Time T2: Network hiccup, Node 1 retries PREPARE
→ Cuckoo filter check → HIT
→ Lookup Pebble → Found intent with SAME TxnID
→ Update in-place (idempotent)
→ No conflictScenario 5: Multi-Row Transaction Partial Conflict
-- Node 1 executes:
BEGIN;
UPDATE users SET balance = balance - 25 WHERE id = 1;
UPDATE users SET balance = balance + 25 WHERE id = 2;
COMMIT;Time T1: Node 1 creates intents for both rows:
→ /intent/users/users:1 → TxnID: 0x...0001
→ /intent/users/users:2 → TxnID: 0x...0001
Time T2: Node 2 tries UPDATE users SET name = 'Robert' WHERE id = 2
→ Cuckoo filter check for "users:2" → HIT
→ Conflict detected!
→ Node 2 returns CONFLICT
→ Node 2's client retries after Node 1 commitsStorage Layer (MetaStore)
PebbleDB Architecture
Each database has a separate MetaStore (CockroachDB Pebble):
data_dir/
├── mydb.db # SQLite database
├── mydb.db-wal # SQLite WAL
├── mydb_meta.pebble/ # PebbleDB MetaStore
│ ├── MANIFEST-*
│ ├── CURRENT
│ └── *.sst # LSM-tree files
├── cdcseg/ # Rolling CDC segment-log files
│ └── seg-000000000000000001.log
└── __marmot_system.db # System database
└── __marmot_system_meta.pebble/Key Namespace
| Prefix | Purpose |
|---|---|
/txn/{txnID} | Transaction records (status, timestamps, statements) |
/txn_idx/pend/{txnID} | Pending transaction index |
/txn_idx/seq/{seqNum}/{txnID} | Sequence-ordered index (gap detection) |
/intent/{table}/{intentKey} | Write intents |
/intent_txn/{txnID}/{table}/{intentKey} | Intent index by transaction |
/cdc_manifest/{txnID} | Manifest pointing to the transaction's CDC segment-log range |
/repl/{peerID}/{dbName} | Replication state per peer |
/schema/{dbName} | Schema version counter |
/seq/{nodeID} | Sequence generator (pre-allocated batches) |
CDC Segment Store
The CDC segment store is the local append-only payload store for DML row changes. Pebble keeps the transaction metadata, row-lock indexes, and /cdc_manifest/{txnID} pointer; the actual encoded row payloads live in cdcseg/seg-*.log.
Each row record is:
[32-byte header][msgpack EncodedCapturedRow payload]The header stores a magic value, format version, record type, transaction id, row sequence, payload length, and payload CRC32. Segment files roll at 64 MiB, and a single record larger than one segment is rejected before append so recovery and validation use the same size rule.
Atomicity Model
CDC segment publication is manifest-based:
- Encoded row records are appended to the current segment file.
- The in-memory transaction manifest tracks row count, first/last row sequence, CRC, and one or more
{segmentID, offset, length}chunks. SealCapturedRowspublishes/cdc_manifest/{txnID}in Pebble after the segment ranges are known.- Readers never scan arbitrary segment bytes for committed DML; they read the manifest and then perform bounded
ReadAtcalls over the referenced segment ranges.
This gives atomic visibility: a transaction's CDC payload is visible only when its manifest is visible. A crash before manifest publication leaves unreferenced segment bytes, which are ignored and later reclaimed. A crash after manifest publication leaves a manifest that points to specific validated byte ranges.
Durability And Prepare Sync
By default Marmot keeps the hot path aligned with Pebble NoSync: CDC segment fsync is grouped asynchronously by bytes, transaction count, or a short timer. This avoids forcing a disk flush per row or per transaction.
For deployments that require a PREPARE acknowledgement to wait for local CDC segment fsync, enable strict prepare sync:
[meta_store]
strict_prepare_sync = trueor set MARMOT_CDC_PREPARE_SYNC=strict. Strict mode waits for the segment sync before acknowledging PREPARE, which strengthens local crash durability at the cost of write latency.
Recovery And Cleanup
On startup, Marmot validates each segment record in order. If it finds a torn header, short payload, invalid magic/version, oversized payload, or CRC mismatch, it truncates the active tail to the last valid record boundary.
Prepared manifests embedded in segment records can reconstruct prepared CDC state if Pebble metadata was lost after the segment record became durable. For committed transactions, /cdc_manifest/{txnID} is the retention root used by streaming replication, catch-up, and delta sync.
GC deletes segment files only when no current manifest, pending prepare, or active writer can reference them. Current and pending segments are retained; old unreferenced segments are pruned with the transaction metadata retention rules.
Transaction Record Format
type TransactionRecord struct {
TxnID uint64
NodeID uint64
SeqNum uint64 // For gap-free replication
Status TxnStatus // PENDING/COMMITTED/ABORTED
StartTSWall int64 // HLC wall time
StartTSLogical int32 // HLC logical
CommitTSWall int64
CommitTSLogical int32
LastHeartbeat int64 // Stale detection
DatabaseName string
}Group Commit Optimization
SQLite commit batching groups committed DML transactions before applying decoded row changes. Pebble remains responsible for transaction metadata, row-lock indexes, schema state, and CDC manifests; DML row payload bytes live in rolling CDC segment-log files.
Example: MetaStore Keys After Our UPDATE
After Node 1's UPDATE users SET balance = 75 WHERE id = 1 completes:
During PREPARE phase (all 3 nodes):
/intent/users/users:1 → WriteIntentRecord{TxnID: 0x0001A2B3C4D50001, ...}
/intent_txn/0x0001A2B3C4D50001/users/users:1 → (index entry)
/cdc_manifest/0x0001A2B3C4D50001 → CDCManifest{SegmentID, Offset, Length, RowCount}
/txn/0x0001A2B3C4D50001 → TransactionRecord{Status: PENDING, ...}
/txn_idx/pend/0x0001A2B3C4D50001 → (pending index)After COMMIT phase:
/txn/0x0001A2B3C4D50001 → TransactionRecord{
TxnID: 0x0001A2B3C4D50001,
NodeID: 1,
SeqNum: 42,
Status: COMMITTED,
StartTSWall: 1736000000000000000,
CommitTSWall: 1736000000005000000,
DatabaseName: "mydb"
}
/txn_idx/seq/42/0x0001A2B3C4D50001 → (sequence index for gap detection)
-- Deleted (cleanup):
/intent/users/users:1 → (removed)
/intent_txn/0x0001A2B3C4D50001/users/users:1 → (removed)
/txn_idx/pend/0x0001A2B3C4D50001 → (removed)
-- CDC segment range kept for anti-entropy (GC'd after retention rules allow it):
/cdc_manifest/0x0001A2B3C4D50001 → (kept for delta sync)Replication State (per peer):
-- On Node 2, tracking what Node 1 has sent:
/repl/node1/mydb → {LastAppliedTxnID: 0x0001A2B3C4D50001, LastSeqNum: 42}
-- On Node 3:
/repl/node1/mydb → {LastAppliedTxnID: 0x0001A2B3C4D50001, LastSeqNum: 42}Hybrid Logical Clock (HLC)
Purpose
- Causality tracking: Timestamps preserve happens-before relationship
- Transaction IDs: Unique 64-bit IDs derived from HLC
- Conflict resolution: Last-Write-Wins using HLC comparison
Clock Structure
type Clock struct {
nodeID uint64 // 0-63 (6-bit limit)
wallTime int64 // Nanoseconds
logical int32 // 0-65535 (16-bit limit)
mu sync.Mutex
}Transaction ID Bit Layout
|-------- 42 bits --------|-- 6 bits --|-- 16 bits --|
| Physical (ms) | Node ID | Logical |- 42 bits: Milliseconds since epoch (~139 years)
- 6 bits: Node ID (max 64 nodes)
- 16 bits: Logical counter (~65k IDs/ms/node)
func (t Timestamp) ToTxnID() uint64 {
physicalMS := uint64(t.WallTime / 1_000_000)
return (physicalMS << 22) | (t.NodeID << 16) | uint64(t.Logical)
}Clock Synchronization
On receiving remote timestamp:
- Take max of: local wall, remote wall, physical now
- If same wall time: increment logical beyond both local and remote
- If new millisecond: reset logical to 0
- Overflow protection: Spin-wait if logical exceeds 65535
Example: HLC and TxnID Generation
Our UPDATE transaction on Node 1:
Physical time: 1736000000000 ms since epoch (Jan 5, 2025 ~12:26 UTC)
Node ID: 1
Logical: 1 (first transaction this millisecond)
TxnID calculation:
Physical bits (42): 1736000000000 << 22 = 0x0001A2B3C4D50000
NodeID bits (6): 1 << 16 = 0x0000000000010000
Logical bits (16): 1 = 0x0000000000000001
─────────────────────────────────────────────────────────────
TxnID: 0x0001A2B3C4D50001
Binary layout:
|---- 42 bits ----|-- 6 --|-- 16 --|
| 1736000000000 | 1 | 1 |Conflict resolution using HLC (Last-Write-Wins):
Scenario: Two updates to same row arrive at Node 3
Update A (from Node 1):
TxnID: 0x0001A2B3C4D50001
Wall: 1736000000000ns, Logical: 1
Update B (from Node 2):
TxnID: 0x0001A2B3C4D60002
Wall: 1736000000001ns, Logical: 2
Comparison: B.Wall > A.Wall → Update B wins
Result: Node 3 applies Update B (higher timestamp)Clock advance on message receive:
Node 2 local clock: Wall=1736000000000, Logical=5
Receives PREPARE from Node 1: Wall=1736000000002, Logical=1
Update logic:
localWall (1736000000000) < remoteWall (1736000000002)
→ Set wall = max(local, remote, physicalNow) = 1736000000002
→ Reset logical = 0 (new millisecond)
Node 2 new clock: Wall=1736000000002, Logical=0Anti-Entropy Background Sync
Purpose
Repairs divergence caused by:
- Network partitions healing
- Node restarts with stale data
- Missed transactions during temporary failures
Sync Strategy Decision
Lag Detection:
localMaxTxnID vs peerMaxTxnID
localTxnCount vs peerTxnCount
Decision:
IF lag < DeltaSyncThreshold (default 10K txns)
AND timeLag < DeltaSyncThresholdSeconds (default 1hr)
THEN → Delta Sync (incremental)
ELSE → Snapshot Transfer (full)Delta Sync Flow
- Query peer's replication state
- Stream transactions via
StreamChangesRPC - Gap detection: If
event.SeqNum - expectedNext > threshold→ snapshot fallback - Apply each transaction via REPLAY phase (bypasses 2PC)
- Update local replication state
Gap Detection Algorithm
if event.SeqNum > expectedNextTxn {
gap := event.SeqNum - expectedNextTxn
if gap > DeltaSyncThresholdTxns {
// Transactions GC'd - fall back to snapshot
return ErrGapDetected
}
}
expectedNextTxn = event.SeqNum + 1Snapshot Transfer
- Get snapshot info: Files, checksums, txn_id
- Stream chunks: 4MB each with MD5 verification
- Atomic restore: Replace database files
- Resume normal replication from snapshot txn_id
Example: Anti-Entropy Scenarios
Using our 3-node cluster with the users table:
Scenario 1: Node 3 Brief Network Partition (Delta Sync)
Timeline:
T0: All nodes in sync, SeqNum=40
T1: Node 3 loses network for 5 minutes
T2: Node 1, Node 2 continue processing:
- SeqNum 41: UPDATE users SET balance=75 WHERE id=1
- SeqNum 42: INSERT INTO users VALUES (3, 'carol@...', 'Carol', 200)
- SeqNum 43: DELETE FROM users WHERE id=2
T3: Node 3 reconnects
Anti-Entropy on Node 3:
1. Query Node 1: "What's your max SeqNum?" → 43
2. Local state: LastSeqNum=40
3. Lag = 43 - 40 = 3 transactions (< 10K threshold)
4. Decision: Delta Sync
Delta Sync Flow:
Node 3 → Node 1: StreamChanges(fromSeqNum=41)
Node 1 streams:
{SeqNum: 41, TxnID: 0x...0001, CDC: [UPDATE users...]}
{SeqNum: 42, TxnID: 0x...0002, CDC: [INSERT users...]}
{SeqNum: 43, TxnID: 0x...0003, CDC: [DELETE users...]}
Node 3 applies each via REPLAY (bypasses 2PC):
- Execute CDC for SeqNum 41 → balance=75 for id=1
- Execute CDC for SeqNum 42 → insert id=3
- Execute CDC for SeqNum 43 → delete id=2
Node 3 now in sync: SeqNum=43Scenario 2: New Node Joining (Snapshot Transfer)
Initial state:
Node 1: SeqNum=50000, users table has 10K rows
Node 2: SeqNum=50000
Node 4: New node, empty database
Node 4 joins cluster:
1. Gossip announces Node 4 as JOINING
2. Anti-entropy runs on Node 4
3. Query Node 1: "What's your max SeqNum?" → 50000
4. Local state: SeqNum=0
5. Lag = 50000 (> 10K threshold)
6. Decision: Snapshot Transfer
Snapshot Transfer Flow:
Node 4 → Node 1: GetSnapshotInfo(db="mydb")
Node 1 returns:
{
Files: ["mydb.db", "mydb.db-wal"],
TotalSize: 50MB,
Checksums: {...},
AtTxnID: 0x...C350,
AtSeqNum: 50000
}
Node 4 → Node 1: StreamSnapshot(db="mydb")
Node 1 streams 4MB chunks:
Chunk 1: bytes[0..4MB], MD5=abc123
Chunk 2: bytes[4MB..8MB], MD5=def456
...
Chunk 13: bytes[48MB..50MB], MD5=xyz789
Node 4 assembles and verifies:
- Write chunks to temp file
- Verify each chunk MD5
- Atomic rename to mydb.db
- Set replication state: SeqNum=50000
Node 4 state: JOINING → ALIVEScenario 3: Gap Detection (Partial GC)
Timeline:
T0: Node 3 offline for 25 hours
T1: GC runs on Node 1, Node 2:
- Transactions SeqNum 1-1000 are > 24hrs old
- Force GC deletes them (gc_max_retention=24hr)
T2: Node 3 reconnects, has SeqNum=500
Delta Sync Attempt:
Node 3 → Node 1: StreamChanges(fromSeqNum=501)
Node 1 scans /txn_idx/seq/:
- First available: SeqNum=1001
- Requested: SeqNum=501
- Gap detected: 501-1000 are missing (GC'd)
Node 1 returns: ErrGapDetected
Node 3 falls back to Snapshot TransferGossip Protocol (SWIM)
Node States
| State | Description | Replication |
|---|---|---|
ALIVE | Healthy, responding | Receives writes |
SUSPECT | May be dead, awaiting confirmation | Receives writes |
DEAD | Confirmed failed | Excluded |
JOINING | Catching up | Gossip only |
REMOVED | Admin removed | Excluded |
State Transitions
New node → JOINING → (catch-up complete) → ALIVE
ALIVE → (no response for SuspectTimeout) → SUSPECT
SUSPECT → (no response for DeadTimeout) → DEAD
SUSPECT → (received message) → ALIVESWIM Protocol
- Gossip interval: 1 second
- Fanout: 3 random peers per round
- Suspect timeout: 15 seconds
- Dead timeout: 30 seconds
Incarnation Numbers
Higher incarnation always wins (prevents stale state propagation):
if incoming.Incarnation > local.Incarnation {
updateLocalState(incoming)
} else if incoming.Incarnation == local.Incarnation {
// Only allow escalation: ALIVE → SUSPECT → DEAD
}Example: Gossip and Failure Detection
Scenario: Node 2 crashes during our UPDATE transaction
Initial cluster state (all nodes ALIVE):
Node 1: {ID: 1, State: ALIVE, Incarnation: 5, Addr: "10.0.0.1:5000"}
Node 2: {ID: 2, State: ALIVE, Incarnation: 3, Addr: "10.0.0.2:5000"}
Node 3: {ID: 3, State: ALIVE, Incarnation: 7, Addr: "10.0.0.3:5000"}
T0: Node 2 crashes (process killed)
T1 (+1s): Gossip round on Node 1
- Node 1 randomly selects Node 2 for ping
- gRPC Ping("10.0.0.2:5000") → timeout (5s)
- No response, try indirect probe via Node 3
T2 (+6s): Indirect probe
- Node 1 → Node 3: "Please ping Node 2 for me"
- Node 3 → Node 2: Ping → timeout
- Node 3 → Node 1: "Node 2 didn't respond"
T3 (+7s): Node 1 marks Node 2 as SUSPECT
Gossip message: {NodeID: 2, State: SUSPECT, Incarnation: 3}
T4 (+8s): Gossip spreads to Node 3
- Node 3 receives SUSPECT for Node 2
- Updates local state: Node 2 = SUSPECT
T5 (+22s): Suspect timeout (15s) expires
- No refutation from Node 2
- Node 1 marks Node 2 as DEAD
Gossip message: {NodeID: 2, State: DEAD, Incarnation: 3}
Effect on replication:
- Quorum calculation now excludes Node 2
- Total membership still = 3 (for split-brain safety)
- Required quorum = 2 (floor(3/2)+1)
- Node 1 + Node 3 can still achieve quorumScenario: False positive (Node 2 was just slow)
T0: Node 2 under heavy load (GC pause)
T1 (+5s): Node 1 marks Node 2 as SUSPECT
Gossip: {NodeID: 2, State: SUSPECT, Incarnation: 3}
T2 (+7s): Node 2 GC pause ends, receives SUSPECT message about itself
- "I'm being suspected? I'm alive!"
- Increments incarnation: 3 → 4
- Broadcasts refutation: {NodeID: 2, State: ALIVE, Incarnation: 4}
T3 (+8s): Node 1 receives refutation
- Incoming incarnation (4) > local (3)
- Updates: Node 2 = ALIVE, Incarnation = 4
- Node 2 stays in cluster
Quorum unaffected: Node 2 was never marked DEADDDL Replication
Cluster-Wide Locking
type DDLLock struct {
Database string
NodeID uint64
TxnID uint64
ExpiresAt time.Time // 30-second lease
}- One DDL per database at a time
- Lock expires automatically (handles node crashes)
- Different databases can have concurrent DDL
Idempotent Rewriting
DDL automatically rewritten for safe replay:
| Original | Rewritten |
|---|---|
CREATE TABLE foo | CREATE TABLE IF NOT EXISTS foo |
DROP TABLE foo | DROP TABLE IF EXISTS foo |
CREATE INDEX idx | CREATE INDEX IF NOT EXISTS idx |
Schema Version Tracking
Each database maintains a version counter:
- Incremented on every DDL
- Exchanged via gossip
- Delta sync validates schema version before applying DML
Example: DDL Replication for ALTER TABLE
Adding a column to our users table:
-- Node 1 executes:
ALTER TABLE users ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;T0: Node 1 acquires DDL lock
Key: /ddl_lock/mydb
Value: {NodeID: 1, TxnID: 0x...0001, ExpiresAt: now()+30s}
T1: Node 1 broadcasts DDL lock acquisition via gossip
All nodes pause DML on "mydb" database
T2: Node 1 executes locally
- ALTER TABLE users ADD COLUMN ...
- Increment schema version: /schema/mydb → 2
T3: Node 1 broadcasts DDL to replicas (not CDC, raw SQL)
gRPC ReplicateDDL:
{
Database: "mydb",
Statement: "ALTER TABLE IF NOT EXISTS users ADD COLUMN IF NOT EXISTS
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
SchemaVersion: 2,
TxnID: 0x...0001
}
Note: Statement rewritten for idempotency
T4: Node 2, Node 3 execute DDL
- Parse and execute SQL directly
- Update local schema version: 2
- Return success
T5: Node 1 releases DDL lock
- Delete /ddl_lock/mydb
- DML resumes on all nodes
T6: Subsequent DML includes new column
INSERT INTO users (id, email, name, balance)
VALUES (4, 'dave@...', 'Dave', 100);
CDC captures:
{
NewValues: {
"id": 4,
"email": "dave@example.com",
"name": "Dave",
"balance": 100,
"created_at": "2025-01-05 12:30:00" ← New column included
}
}Concurrent DDL conflict:
T0: Node 1 tries ALTER TABLE users ADD COLUMN age INT
T0: Node 2 tries ALTER TABLE users ADD COLUMN phone TEXT
Node 1:
- Acquires /ddl_lock/mydb (wins)
- Proceeds with DDL
Node 2:
- Tries to acquire /ddl_lock/mydb
- Lock exists, owned by Node 1
- Check expiry: 25 seconds remaining
- Return error: "DDL lock held by Node 1"
- Client retries after Node 1 completesRecovery Scenarios
Network Partition
| Scenario | Behavior |
|---|---|
| Minority partition | Writes fail (cannot achieve quorum) |
| Majority partition | Writes succeed |
| Partition heals | Delta sync + LWW merges data |
Node Failure & Recovery
| Scenario | Recovery Method |
|---|---|
| Brief outage (<1hr) | Delta sync from peers |
| Extended outage | Snapshot transfer |
| New node joining | Full snapshot + delta sync |
Transaction Failure
| Failure Point | Recovery |
|---|---|
| PREPARE timeout | Abort all nodes |
| Partial PREPARE success | Abort if quorum not achieved |
| COMMIT after remote quorum | Local must succeed (prepared) |
| Node crash during PREPARE | Heartbeat timeout → intents cleaned |
| Node crash during COMMIT | Anti-entropy replays from peers |
Example: Recovery Scenarios for Our UPDATE
Scenario 1: Network Partition During PREPARE
Initial: UPDATE users SET balance = 75 WHERE id = 1 on Node 1
T0: Node 1 executes locally, creates intent
T1: Network partition isolates Node 1 from Node 2, Node 3
Node 1 (minority partition):
- Sends PREPARE to Node 2, Node 3
- Both timeout after 5s
- Required quorum: 2, achieved: 1 (self)
- Decision: ABORT
Node 1 rollback:
- Delete local intent: /intent/users/users:1
- Update txn record: Status=ABORTED
- Return error to client: "quorum not achieved"
Client retries after partition heals.Scenario 2: Coordinator Crash After PREPARE, Before COMMIT
T0: Node 1 PREPARE succeeds on all nodes
All nodes have intent for "users:1"
T1: Node 1 crashes before sending COMMIT
T2 (+10s): Heartbeat timeout on Node 2
- GC scans /txn_idx/pend/
- Finds TxnID 0x...0001 with no recent heartbeat
- Checks txn status: PENDING
- Marks intents for cleanup
T3: Cleanup on Node 2, Node 3
- Delete intent: /intent/users/users:1
- Update txn: Status=ABORTED
- Remove from Cuckoo filter
Result: Transaction rolled back, row "users:1" unchanged
Client receives timeout, can retryScenario 3: Replica Crash During COMMIT
T0: Node 1 PREPARE succeeds (quorum achieved)
T1: Node 1 sends COMMIT to Node 2, Node 3
T2: Node 3 crashes while applying CDC
State:
Node 1: COMMITTED (coordinator)
Node 2: COMMITTED
Node 3: CRASHED (didn't complete commit)
T3: Node 3 restarts
T4: Anti-entropy on Node 3
- Local SeqNum = 40 (before crash)
- Query Node 1: SeqNum = 41
- Delta sync: get SeqNum 41 (our UPDATE)
- Apply CDC via REPLAY
- Node 3 now has balance=75 for id=1
Result: All nodes converged, transaction successfulScenario 4: Partial PREPARE Success
T0: Node 1 sends PREPARE to Node 2, Node 3
T1: Responses:
Node 2: SUCCESS (intent created)
Node 3: CONFLICT (another transaction holds intent)
T2: Quorum check
Required: 2 (floor(3/2)+1)
SUCCESS responses: 1 (Node 2) + 1 (self) = 2
But Node 3 returned CONFLICT
T3: Decision depends on consistency level
If QUORUM: 2 SUCCESS ≥ 2 required → can proceed
If ALL: 3 required, only 2 → must ABORT
With QUORUM consistency:
- Send COMMIT to Node 2 (achieved remote quorum with just Node 2)
- Commit locally
- Node 3 will get update via anti-entropy later
With ALL consistency:
- Send ABORT to Node 2
- Rollback locally
- Return CONFLICT to clientGarbage Collection
Two-Phase GC
Phase 1 (Critical, always runs):
- Abort transactions without heartbeat for
HeartbeatTimeout(10s) - Clean orphaned write intents
Phase 2 (Background, skipped under load):
- Delete COMMITTED/ABORTED records older than
GCMinRetention(2hr) - Force delete after
GCMaxRetention(24hr) regardless of peer state
GC Coordination
- Track per-peer replication state
- Query minimum applied txn_id across peers
- Only GC transactions all peers have applied
- Gap detection prevents data loss if GC runs while nodes offline
Example: Garbage Collection Timeline
Continuing our UPDATE scenario over time:
T0: UPDATE users SET balance = 75 WHERE id = 1 committed
TxnID: 0x0001A2B3C4D50001, SeqNum: 42
All nodes: Status=COMMITTED
T0 + 5s: Phase 1 GC runs
- Scan /txn_idx/pend/ for stale transactions
- Our transaction is COMMITTED, not pending
- No action needed
T0 + 1hr: Phase 2 GC considers deletion
- Transaction age: 1 hour (< gc_min_retention=2hr)
- Skipped: too recent
T0 + 2.5hr: Phase 2 GC runs again
- Transaction age: 2.5 hours (> gc_min_retention=2hr)
- Check peer replication state:
/repl/node2/mydb → LastSeqNum: 42 ✓
/repl/node3/mydb → LastSeqNum: 42 ✓
- All peers have applied SeqNum 42
- Safe to delete
Cleanup:
DELETE /txn/0x0001A2B3C4D50001
DELETE /txn_idx/seq/42/0x0001A2B3C4D50001
DELETE /cdc_manifest/0x0001A2B3C4D50001
Reclaim unreferenced cdcseg ranges/files
T0 + 2.5hr: MetaStore state after GC
- Transaction record: GONE
- CDC manifest and reclaimed segment ranges: GONE
- SQLite data: balance=75 for id=1 (permanent)GC blocked by lagging peer:
T0: Transactions SeqNum 40-50 committed
T1: Node 3 goes offline
T0 + 3hr: Phase 2 GC on Node 1
- Check peer states:
/repl/node2/mydb → LastSeqNum: 50 ✓
/repl/node3/mydb → LastSeqNum: 39 ✗ (offline, stale)
- Minimum across peers: 39
- Cannot GC SeqNum 40-50 (Node 3 hasn't applied them)
- Skip cleanup to preserve delta sync capability
T0 + 25hr: Force GC kicks in (> gc_max_retention=24hr)
- Transaction SeqNum 40-50 are > 24 hours old
- Force delete regardless of peer state
- Node 3 will need snapshot transfer when it returnsPerformance Optimizations
| Optimization | Mechanism |
|---|---|
| Cuckoo filter | O(1) conflict detection, FP rate ~2.3×10⁻¹⁰ |
| Sharded mutexes | 256 shards reduce lock contention |
| Grouped CDC sync | Batch CDC segment fsync by bytes, transaction count, or timer |
| Sequence pre-allocation | Batch 1000 sequence numbers |
| Object pooling | Reuse allocations in hot paths |
| Early quorum exit | Don't wait for slow nodes |
| Parallel replication | Goroutines for all replica communication |
| CDC replay | Binary values, no SQL parsing |
| NoSync cleanup | Intent cleanup is idempotent |
Limitations
Design Constraints
- Single writer per node: SQLite WAL limitation
- 64 nodes max: 6-bit NodeID in transaction ID
- ~65k txns/ms/node: 16-bit logical counter
- Full replication only: No sharding/partitioning
- Eventual consistency: No strict serializability across nodes
Not Supported
- Distributed deadlock detection (client retries)
- Read repair (anti-entropy only)
- Cascading rollback (best-effort abort)
- XA transactions (uses own 2PC)
- Table-level selective replication
Complete End-to-End Example
This section traces our UPDATE users SET balance = 75 WHERE id = 1 through the entire system lifecycle:
┌─────────────────────────────────────────────────────────────────────────────┐
│ COMPLETE TRANSACTION LIFECYCLE │
│ UPDATE users SET balance = 75 WHERE id = 1 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ T0: CLIENT REQUEST │
│ ══════════════════ │
│ MySQL client → Node 1 (port 3306) │
│ Query: UPDATE users SET balance = 75 WHERE id = 1 │
│ │
│ T1: LOCAL EXECUTION + CDC CAPTURE │
│ ═══════════════════════════════ │
│ Node 1 hookDB executes UPDATE │
│ ↓ │
│ Preupdate hook fires (SQLITE_UPDATE) │
│ ↓ │
│ EncodedCapturedRow created: │
│ {Op: UPDATE, Table: "users", IntentKey: "users:1", │
│ OldValues: {balance: 100}, NewValues: {balance: 75}} │
│ Stored in bounded hook memory or cdcseg spill │
│ │
│ T2: WRITE INTENT CREATION │
│ ══════════════════════════ │
│ Cuckoo filter: Add hash("users:1") │
│ Mutex: Lock shard[hash("users:1") % 256] │
│ MetaStore write: │
│ /intent/users/users:1 → {TxnID: 0x...0001, Status: PENDING} │
│ /intent_txn/0x...0001/users/users:1 → (index) │
│ /cdc_manifest/0x...0001 → {SegmentID, Offset, Length, RowCount} │
│ /txn/0x...0001 → {Status: PENDING, SeqNum: 42} │
│ │
│ T3: 2PC PREPARE PHASE (parallel to Node 2, Node 3) │
│ ═══════════════════════════════════════════════════ │
│ │
│ Node 1 ──gRPC──→ Node 2: ReplicateTransaction(PREPARE) │
│ │ ↓ │
│ │ Cuckoo check "users:1" → MISS │
│ │ Create intent, store encoded CDC row │
│ │ ↓ │
│ └──────────────← SUCCESS │
│ │
│ Node 1 ──gRPC──→ Node 3: ReplicateTransaction(PREPARE) │
│ │ ↓ │
│ │ Cuckoo check "users:1" → MISS │
│ │ Create intent, store encoded CDC row │
│ │ ↓ │
│ └──────────────← SUCCESS │
│ │
│ Quorum: 3/3 SUCCESS (need 2 for 3-node cluster) ✓ │
│ │
│ T4: 2PC COMMIT PHASE │
│ ═══════════════════ │
│ │
│ Node 1 ──gRPC──→ Node 2: ReplicateTransaction(COMMIT) │
│ │ ↓ │
│ │ Read encoded row via /cdc_manifest/0x...0001 │
│ │ Generate: UPDATE users SET ... WHERE id=1 │
│ │ Execute on SQLite │
│ │ Update /txn/0x...0001 → COMMITTED │
│ │ ↓ │
│ └──────────────← CommitTS │
│ │
│ Node 1 ──gRPC──→ Node 3: ReplicateTransaction(COMMIT) │
│ │ ↓ │
│ │ (same as Node 2) │
│ │ ↓ │
│ └──────────────← CommitTS │
│ │
│ Remote quorum achieved (2/2) ✓ │
│ │
│ T5: COORDINATOR LOCAL COMMIT │
│ ═════════════════════════════ │
│ Node 1: │
│ - Apply CDC to own SQLite │
│ - Update /txn/0x...0001 → COMMITTED │
│ - Schedule intent cleanup (background) │
│ │
│ T6: RESPONSE TO CLIENT │
│ ═══════════════════════ │
│ Node 1 → MySQL client: OK, 1 row affected │
│ │
│ T7: BACKGROUND CLEANUP (async) │
│ ═════════════════════════════ │
│ All nodes: │
│ - Delete /intent/users/users:1 │
│ - Delete /intent_txn/0x...0001/users/users:1 │
│ - Remove from Cuckoo filter │
│ - Move /txn_idx/pend/0x...0001 → /txn_idx/seq/42/0x...0001 │
│ │
│ T8: GOSSIP PROPAGATION │
│ ═══════════════════════ │
│ Node 1 gossips to random peers: │
│ {Type: TXN_COMMITTED, TxnID: 0x...0001, SeqNum: 42} │
│ Replication state updated: │
│ /repl/node1/mydb → {LastSeqNum: 42} │
│ │
│ T9: FINAL STATE (all nodes) │
│ ════════════════════════════ │
│ SQLite: users.balance = 75 WHERE id = 1 │
│ MetaStore: │
│ /txn/0x...0001 → {Status: COMMITTED, SeqNum: 42} │
│ /cdc_manifest/0x...0001 → cdcseg range kept for anti-entropy │
│ Cuckoo filter: "users:1" removed │
│ │
│ T10: GC (after 2+ hours) │
│ ═════════════════════════ │
│ Delete transaction metadata, keep only SQLite data │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Configuration Reference
[replication]
default_write_consistency = "QUORUM"
default_read_consistency = "LOCAL_ONE"
write_timeout_ms = 5000
read_timeout_ms = 2000
# Anti-Entropy
enable_anti_entropy = true
anti_entropy_interval_seconds = 30
gc_interval_seconds = 60
delta_sync_threshold_transactions = 10000
delta_sync_threshold_seconds = 3600
# Garbage Collection
gc_min_retention_hours = 2
gc_max_retention_hours = 24
[transaction]
heartbeat_timeout_seconds = 10
conflict_window_seconds = 10
lock_wait_timeout_seconds = 50
[ddl]
lock_lease_seconds = 30
enable_idempotent = true
[cluster]
gossip_interval_ms = 1000
gossip_fanout = 3
suspect_timeout_ms = 5000
dead_timeout_ms = 10000Key Files Reference
| File | Purpose |
|---|---|
coordinator/write_coordinator.go | 2PC protocol implementation |
coordinator/handler.go | MySQL query routing, CDC extraction |
db/meta_store_pebble.go | PebbleDB storage, write intents |
db/preupdate_hook.go | CDC capture via SQLite hooks |
db/intent_filter.go | Cuckoo filter for conflict detection |
db/transaction.go | Transaction lifecycle, GC |
grpc/replication_handler.go | Remote 2PC phase handling |
grpc/anti_entropy.go | Background sync coordination |
grpc/delta_sync.go | Incremental catch-up |
grpc/gossip.go | SWIM protocol, membership |
hlc/clock.go | Hybrid Logical Clock |
publisher/ | CDC publishing system |
Next Steps
- Integrations - CDC publishing to Kafka, NATS, and external systems