Integrations

Integrations

Marmot integrates with various external systems for CDC publishing, monitoring, and data pipelines.


CDC Publisher

Marmot can publish CDC events to external messaging systems for building real-time data pipelines, analytics, and event-driven architectures.

Debezium Compatibility

Events follow the Debezium specification (opens in a new tab), ensuring compatibility with:

  • Kafka Connect sink connectors
  • Apache Flink CDC
  • Apache Spark Structured Streaming
  • Materialize, ksqlDB, and other stream processors
  • Any system that consumes Debezium-format events

Supported Sinks

SinkTypeDescription
KafkakafkaApache Kafka with configurable brokers
NATSnatsNATS JetStream for lightweight messaging

Event Format

Events conform to the Debezium envelope structure (opens in a new tab):

{
  "schema": {
    "type": "struct",
    "fields": [
      {"field": "before", "type": "struct", "optional": true},
      {"field": "after", "type": "struct", "optional": true},
      {"field": "source", "type": "struct"},
      {"field": "op", "type": "string"},
      {"field": "ts_ms", "type": "int64"}
    ],
    "name": "marmot.mydb.users.Envelope"
  },
  "payload": {
    "before": null,
    "after": {"id": 1, "name": "alice", "email": "alice@example.com"},
    "source": {
      "version": "2.0.0",
      "connector": "marmot",
      "name": "marmot",
      "ts_ms": 1702500000000,
      "db": "mydb",
      "table": "users",
      "txId": "7405448882856984577"
    },
    "op": "c",
    "ts_ms": 1702500000000
  }
}

Operation Types

Per the Debezium spec (opens in a new tab):

Operationop CodebeforeafterDescription
INSERTc (create)nullrow dataNew row created
UPDATEu (update)old rownew rowRow modified
DELETEd (delete)old rownullRow removed

Topic Naming

Topics follow the pattern: {topic_prefix}.{database}.{table}

Examples:

  • marmot.cdc.myapp.users - Users table in myapp database
  • marmot.cdc.analytics.events - Events table in analytics database

Configuration

[publisher]
enabled = true
 
[[publisher.sinks]]
name = "kafka-main"              # Unique sink identifier
type = "kafka"                   # "kafka" or "nats"
format = "debezium"              # Only "debezium" supported
brokers = ["localhost:9092"]     # Kafka broker addresses
topic_prefix = "marmot.cdc"      # Topic naming prefix
filter_tables = ["*"]            # Glob patterns (e.g., "users", "order_*")
filter_databases = ["*"]         # Glob patterns (e.g., "prod_*")
batch_size = 100                 # Events per poll cycle
poll_interval_ms = 10            # Polling interval
retry_initial_ms = 100           # Initial retry delay
retry_max_ms = 30000             # Max retry delay (30s)
retry_multiplier = 2.0           # Backoff multiplier
 
# NATS sink example
[[publisher.sinks]]
name = "nats-events"
type = "nats"
format = "debezium"
nats_url = "nats://localhost:4222"
topic_prefix = "marmot.cdc"
filter_tables = ["*"]
filter_databases = ["*"]

Filtering

Use glob patterns to control which tables/databases are published:

PatternMatches
*All tables/databases
usersExact match
order_*Prefix match (order_items, order_history)
*_logSuffix match (access_log, error_log)

Reliability

  • Persistent Cursors: Each sink maintains a cursor in PebbleDB, surviving restarts
  • Automatic Retry: Exponential backoff with configurable limits
  • At-Least-Once Delivery: Events are retried until acknowledged
  • Multi-Sink Independence: Each sink tracks progress independently

Use Cases

Use CaseDescription
Real-Time AnalyticsStream to data warehouses (Snowflake, BigQuery, ClickHouse)
Event-Driven ArchitectureTrigger microservices on data changes
Cache InvalidationKeep Redis/Memcached in sync
Search IndexingUpdate Elasticsearch/Algolia indexes
Audit LoggingCapture all changes for compliance
Data ReplicationReplicate to other databases

Kafka

Overview

Marmot publishes CDC events to Apache Kafka topics in Debezium-compatible format.

Configuration

[[publisher.sinks]]
name = "kafka-production"
type = "kafka"
format = "debezium"
brokers = ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic_prefix = "marmot.cdc"
filter_tables = ["*"]
filter_databases = ["*"]
batch_size = 100
poll_interval_ms = 10

Topic Auto-Creation

Marmot automatically creates topics if they don't exist (requires Kafka broker auto-creation enabled or appropriate ACLs).

Consumer Examples

Kafka Connect (JDBC Sink):

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "topics.regex": "marmot.cdc.*",
  "connection.url": "jdbc:postgresql://localhost:5432/analytics",
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}

Apache Flink:

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics(Pattern.compile("marmot\\.cdc\\..*"))
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

NATS

Overview

Marmot publishes CDC events to NATS JetStream subjects in Debezium-compatible format. NATS is ideal for lightweight, high-performance messaging.

Configuration

[[publisher.sinks]]
name = "nats-events"
type = "nats"
format = "debezium"
nats_url = "nats://localhost:4222"
topic_prefix = "marmot.cdc"
filter_tables = ["*"]
filter_databases = ["*"]
batch_size = 50
poll_interval_ms = 10

Subject Naming

NATS subjects follow the same pattern as Kafka topics: {topic_prefix}.{database}.{table}

Consumer Example

Node.js:

const { connect } = require('nats');
 
const nc = await connect({ servers: 'localhost:4222' });
const js = nc.jetstream();
 
const sub = await js.subscribe('marmot.cdc.>', {
  callback: (err, msg) => {
    const event = JSON.parse(msg.data);
    console.log(`${event.payload.op}: ${event.payload.source.table}`);
    msg.ack();
  }
});

Prometheus

Marmot exposes metrics in Prometheus format on the gRPC port.

Configuration

[prometheus]
enabled = true  # Metrics served at http://localhost:8080/metrics

Scrape Configuration

scrape_configs:
  - job_name: 'marmot'
    static_configs:
      - targets: ['node1:8080', 'node2:8080', 'node3:8080']

See Prometheus Metrics for the full list of available metrics.


MySQL Clients

Marmot implements the MySQL wire protocol, enabling connections from any MySQL-compatible client.

Supported Clients

ClientTested
mysql CLI
DBeaver
MySQL Workbench
DataGrip
Node.js mysql2
Python mysql-connector
Go go-sql-driver/mysql
PHP PDO_MySQL

Connection String

mysql://root@localhost:3306/mydb

Example Connections

mysql CLI:

mysql -h localhost -P 3306 -u root mydb

Node.js:

const mysql = require('mysql2/promise');
const conn = await mysql.createConnection({
  host: 'localhost',
  port: 3306,
  user: 'root',
  database: 'mydb'
});

Python:

import mysql.connector
conn = mysql.connector.connect(
    host="localhost",
    port=3306,
    user="root",
    database="mydb"
)

Go:

import "database/sql"
import _ "github.com/go-sql-driver/mysql"
 
db, err := sql.Open("mysql", "root@tcp(localhost:3306)/mydb")