Integrations
Marmot integrates with various external systems for CDC publishing, monitoring, and data pipelines.
CDC Publisher
Marmot can publish CDC events to external messaging systems for building real-time data pipelines, analytics, and event-driven architectures.
Debezium Compatibility
Events follow the Debezium specification (opens in a new tab), ensuring compatibility with:
- Kafka Connect sink connectors
- Apache Flink CDC
- Apache Spark Structured Streaming
- Materialize, ksqlDB, and other stream processors
- Any system that consumes Debezium-format events
Supported Sinks
| Sink | Type | Description |
|---|---|---|
| Kafka | kafka | Apache Kafka with configurable brokers |
| NATS | nats | NATS JetStream for lightweight messaging |
Event Format
Events conform to the Debezium envelope structure (opens in a new tab):
{
"schema": {
"type": "struct",
"fields": [
{"field": "before", "type": "struct", "optional": true},
{"field": "after", "type": "struct", "optional": true},
{"field": "source", "type": "struct"},
{"field": "op", "type": "string"},
{"field": "ts_ms", "type": "int64"}
],
"name": "marmot.mydb.users.Envelope"
},
"payload": {
"before": null,
"after": {"id": 1, "name": "alice", "email": "alice@example.com"},
"source": {
"version": "2.0.0",
"connector": "marmot",
"name": "marmot",
"ts_ms": 1702500000000,
"db": "mydb",
"table": "users",
"txId": "7405448882856984577"
},
"op": "c",
"ts_ms": 1702500000000
}
}Operation Types
Per the Debezium spec (opens in a new tab):
| Operation | op Code | before | after | Description |
|---|---|---|---|---|
| INSERT | c (create) | null | row data | New row created |
| UPDATE | u (update) | old row | new row | Row modified |
| DELETE | d (delete) | old row | null | Row removed |
Topic Naming
Topics follow the pattern: {topic_prefix}.{database}.{table}
Examples:
marmot.cdc.myapp.users- Users table in myapp databasemarmot.cdc.analytics.events- Events table in analytics database
Configuration
[publisher]
enabled = true
[[publisher.sinks]]
name = "kafka-main" # Unique sink identifier
type = "kafka" # "kafka" or "nats"
format = "debezium" # Only "debezium" supported
brokers = ["localhost:9092"] # Kafka broker addresses
topic_prefix = "marmot.cdc" # Topic naming prefix
filter_tables = ["*"] # Glob patterns (e.g., "users", "order_*")
filter_databases = ["*"] # Glob patterns (e.g., "prod_*")
batch_size = 100 # Events per poll cycle
poll_interval_ms = 10 # Polling interval
retry_initial_ms = 100 # Initial retry delay
retry_max_ms = 30000 # Max retry delay (30s)
retry_multiplier = 2.0 # Backoff multiplier
# NATS sink example
[[publisher.sinks]]
name = "nats-events"
type = "nats"
format = "debezium"
nats_url = "nats://localhost:4222"
topic_prefix = "marmot.cdc"
filter_tables = ["*"]
filter_databases = ["*"]Filtering
Use glob patterns to control which tables/databases are published:
| Pattern | Matches |
|---|---|
* | All tables/databases |
users | Exact match |
order_* | Prefix match (order_items, order_history) |
*_log | Suffix match (access_log, error_log) |
Reliability
- Persistent Cursors: Each sink maintains a cursor in PebbleDB, surviving restarts
- Automatic Retry: Exponential backoff with configurable limits
- At-Least-Once Delivery: Events are retried until acknowledged
- Multi-Sink Independence: Each sink tracks progress independently
Use Cases
| Use Case | Description |
|---|---|
| Real-Time Analytics | Stream to data warehouses (Snowflake, BigQuery, ClickHouse) |
| Event-Driven Architecture | Trigger microservices on data changes |
| Cache Invalidation | Keep Redis/Memcached in sync |
| Search Indexing | Update Elasticsearch/Algolia indexes |
| Audit Logging | Capture all changes for compliance |
| Data Replication | Replicate to other databases |
Kafka
Overview
Marmot publishes CDC events to Apache Kafka topics in Debezium-compatible format.
Configuration
[[publisher.sinks]]
name = "kafka-production"
type = "kafka"
format = "debezium"
brokers = ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic_prefix = "marmot.cdc"
filter_tables = ["*"]
filter_databases = ["*"]
batch_size = 100
poll_interval_ms = 10Topic Auto-Creation
Marmot automatically creates topics if they don't exist (requires Kafka broker auto-creation enabled or appropriate ACLs).
Consumer Examples
Kafka Connect (JDBC Sink):
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics.regex": "marmot.cdc.*",
"connection.url": "jdbc:postgresql://localhost:5432/analytics",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}Apache Flink:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics(Pattern.compile("marmot\\.cdc\\..*"))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();NATS
Overview
Marmot publishes CDC events to NATS JetStream subjects in Debezium-compatible format. NATS is ideal for lightweight, high-performance messaging.
Configuration
[[publisher.sinks]]
name = "nats-events"
type = "nats"
format = "debezium"
nats_url = "nats://localhost:4222"
topic_prefix = "marmot.cdc"
filter_tables = ["*"]
filter_databases = ["*"]
batch_size = 50
poll_interval_ms = 10Subject Naming
NATS subjects follow the same pattern as Kafka topics: {topic_prefix}.{database}.{table}
Consumer Example
Node.js:
const { connect } = require('nats');
const nc = await connect({ servers: 'localhost:4222' });
const js = nc.jetstream();
const sub = await js.subscribe('marmot.cdc.>', {
callback: (err, msg) => {
const event = JSON.parse(msg.data);
console.log(`${event.payload.op}: ${event.payload.source.table}`);
msg.ack();
}
});Prometheus
Marmot exposes metrics in Prometheus format on the gRPC port.
Configuration
[prometheus]
enabled = true # Metrics served at http://localhost:8080/metricsScrape Configuration
scrape_configs:
- job_name: 'marmot'
static_configs:
- targets: ['node1:8080', 'node2:8080', 'node3:8080']See Prometheus Metrics for the full list of available metrics.
MySQL Clients
Marmot implements the MySQL wire protocol, enabling connections from any MySQL-compatible client.
Supported Clients
| Client | Tested |
|---|---|
| mysql CLI | ✅ |
| DBeaver | ✅ |
| MySQL Workbench | ✅ |
| DataGrip | ✅ |
| Node.js mysql2 | ✅ |
| Python mysql-connector | ✅ |
| Go go-sql-driver/mysql | ✅ |
| PHP PDO_MySQL | ✅ |
Connection String
mysql://root@localhost:3306/mydbExample Connections
mysql CLI:
mysql -h localhost -P 3306 -u root mydbNode.js:
const mysql = require('mysql2/promise');
const conn = await mysql.createConnection({
host: 'localhost',
port: 3306,
user: 'root',
database: 'mydb'
});Python:
import mysql.connector
conn = mysql.connector.connect(
host="localhost",
port=3306,
user="root",
database="mydb"
)Go:
import "database/sql"
import _ "github.com/go-sql-driver/mysql"
db, err := sql.Open("mysql", "root@tcp(localhost:3306)/mydb")