How does it work?
There are multiple components in Marmot that make it work. Let's look at them one by one:
Triggers and data capture
Marmot works by using a pretty basic trick so that each process that's access database can capture changes, and then Marmot can publish them to rest of the nodes. This is how it works internally:
-
Global table
__marmot___global_change_log
that captures exact sequence of operations committed to DB. Ideally we should be able to keep the changed as JSON blob inline, but due to limitation of SQLites JSON not able to serialize BLOBs as JSON property we for now keep a different table with values as columns in the table. So each table gets a__marmot__<table_name>_change_log
that will record the value changed. Each column name in table change log is prefixed byval_
prefix (i.e.id
becomesval_id
). These triggers are compiled via Go's builtin templating system, and installed to database at boot time. -
Each
insert
,update
,delete
triggers for every tableAFTER
the changes have been committed to the table. These triggers recordOLD
(ON DELETE) orNEW
(ON INSERT OR UPDATE) values into the table.
Replication
When you are running Marmot process, it's watching for changes on DB file and WAL file. Everytime there is a change Marmot:
- Gathers all change records, and for each record calculate a consistent hash based on table name + primary keys.
- Using the hash decide JetStream and subject the change belongs to. And publish the change into that specific JetStream.
- Once JetStream has replicated the change log, mark the change published.
- As soon as change is published to JetStream rest of the nodes replay that log, and row changes are applied via state machine to local tables of the node. This means every row in database due to RAFT consensus at stream level will have only one deterministic order of changes getting in cluster in case of race-conditions.
- Once the order is determined for a change it's applied in an upsert or delete manner to the table. So it's quite possible that a row committed locally is overwritten or replaced later because it was not the last one in order of cluster wide commit order.
Changelog format
Changelog is a CBOR serialized (and compressed if configured) payload that has following interface definition:
interface MarmotPublishedRow {
FromNodeId: number;
Payload: {
Id: number;
Type: "insert" | "update" | "delete";
TableName: string;
Row: {[ColumnName: string]: any}
};
}
FromNodeId
points to node ID who sent the changelog (configured when launching).
Payload.TableName
points to the table that changed with mutation type of
Payload.Type
. Then Payload.Row
contains flat map of column
name to value.
There is alot of optimization that can be done to this payload overall, in future using ProtoBuf or more optimized serialization format is absolutely an open option.
Snapshotting
In a normal distributed system it's pretty typical for nodes to go down for really long time. In that particular
case a node coming back up can be lagging so far behind that max-log-entries
might not be sufficient to
fully play all logs and restore the state of database. In that case it required that a node restores a
snapshot and apply current log entries to be fully up-to-date.
As of v0.6.0+
Marmot supports taking snapshot and being able to fully restore it via
NATS Object Storage.
In future Marmot plans to support more storage mechanisms like S3 (and compatible APIs including BlackBlaze and Minio), Azure Blob, and SFTP.
Saving Snapshot
Everytime a node publishes a new change log to NATs, it saves sequence number of the entry in JetStream. The saved sequence number will be used later once the node tries to boot up. Let's first look at how snapshot is saved.
In order to keep enough headroom it calculates max snapshot entries by dividing max-log-entries
by total number
of shards
e.g. max-log-entries
of 1024 and total 8 shards will result in 128 max snapshot entries. Now anytime
sequence number of shard 1 is multiple of 128 a snapshot will be taken and uploaded to NATS Object. The snapshot
will be saved in OBJ_<stream-prefix>-snapshot-store
. Statistically due to even distribution of hashes among shards
everytime shard 1 hit max snapshot entries, rest of the shards will have almost same number of new entries.
Once it has been decided that system wants to save snapshot, a temporary path is created where we used awesome feature
of SQLite called VACUUM INTO <PATH>
. Where SQLite optimizes the DB, and gives us a compacted snapshot of database.
After which Marmot removes all the hooks, and triggers from the snapshot, and re-VACUUM packs the database for upload.
Once done this snapshot is uploaded to OBJ_<stream-prefix>-snapshot-store
(<stream-prefix>-snapshot-store
in
client API).
One important thing to keep in mind is everytime Marmot sees a sequence number in a shard higher than what it
has seen before it will record it against that shard, and this whole mapping is saved on specified sequence
map file path seq-map-path
.
Restoring Snapshot
Whenever a node boots up it first verifies the DB file integrity and performs any WAL checkpoints. Then it loads the
shard mapping from seq-map-path
and compare it to corresponding JetStream's starting sequence number. If the
sequence number that node has is less than start sequence number that simply means node can't reconstruct the
exact state of database by replaying all logs it's missing. In this case node downloads the snapshot from
NATs server as specified in section above. This snapshot is downloaded in a temporary path. Marmot then
uses exclusive
transaction lock to prevent any writers from getting into DB while it copies over
the files.
Once snapshot has been copied over, Marmot installs same triggers, and change log tables again in DB, and starts processing logs applying them all on the snapshot. This means while Marmot is restoring the DB it's quite possible that you might have an outdated copy of database until the logs are fully applied.