DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

How to Seamlessly Integrate SeaTunnel MySQL-CDC with Databend: Formats Explained & Best Practices

SeaTunnel is an easy-to-use and high-performance distributed data integration platform that supports real-time massive data synchronization. It features stable and efficient processing capabilities, able to synchronize hundreds of billions of records daily, and has been widely used in production environments by over 3,000 enterprises in China.

Databend is a cloud-native compute-storage separated data platform with elasticity and high concurrency features, suitable for modern data processing demands.

This article will focus on analyzing the MySQL-CDC plugin in SeaTunnel and the data format output by its Sink, and further explore the feasibility and implementation path of integrating SeaTunnel with Databend in practical scenarios.

SeaTunnel as a whole is a standard data synchronization tool:

seatunnel\_arch

SeaTunnel and MySQL-CDC

SeaTunnel’s MySQL CDC connector allows reading snapshot data and incremental data from MySQL databases. Depending on the sink side, we observe whether the data output by MySQL-CDC can be directly used by Databend.

From testing, the MySQL synchronization component used by SeaTunnel appears to be debezium-mysql-connector (the same component used by Kafka Connect).

Sure! Here is the line-by-line English translation of the text you provided:

source: MySQL-CDC sink: console

The task is to synchronize the wubx.t01 table from MySQL using SeaTunnel.
Configuration file v2.mysql.streaming.conf

# v2.mysql.streaming.conf
env{
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
  MySQL-CDC {
   base-url="jdbc:mysql://192.168.1.100:3306/wubx"
   username="wubx"
   password="wubxwubx"
   table-names=["wubx.t01"]
   startup.mode="initial"
  }
}

sink {
  Console {
  }
}
Enter fullscreen mode Exit fullscreen mode

Start SeaTunnel

./bin/seatunnel.sh --config ./config/v2.mysql.streaming.conf -m local
Enter fullscreen mode Exit fullscreen mode

Observe the logs on the terminal.

Observed full synchronization

SELECT * FROM `wubx`.`t01`
Enter fullscreen mode Exit fullscreen mode

The retrieved data is as follows:

2025-05-07 14:28:21,914 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 1, databend
2025-05-07 14:28:21,914 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 3, MySQL
2025-05-07 14:28:21,914 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 4, Setunnel01
Enter fullscreen mode Exit fullscreen mode

Full synchronization completed.

Insert on source side

insert into t01 values(5,'SeaTunnel');
SeaTunnel can directly capture incremental data, with the corresponding action kind=INSERT.

2025-05-07 14:35:48,520 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=4:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 5, SeaTunnel
Enter fullscreen mode Exit fullscreen mode

Update on source side

update t01 set c1='MySQL-CDC' where id=5;

2025-05-07 14:36:47,455 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=5:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_BEFORE : 5, SeaTunnel
2025-05-07 14:36:47,455 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=6:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_AFTER : 5, MySQL-CDC
Enter fullscreen mode Exit fullscreen mode

Delete on source side

delete from t01 where id=5;

2025-05-07 14:37:33,082 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=7:  SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=DELETE : 5, MySQL-CDC
Enter fullscreen mode Exit fullscreen mode

The log format output by the console is relatively clear, which is very helpful for troubleshooting and subsequent use.

source: MySQL-CDC sink: MySQL

Based on the above test of MySQL-CDC output to the terminal, it can be confirmed that insert, update, and delete operations can all be correctly captured and processed. Next, we test MySQL-CDC -> MySQL. The corresponding configuration file v2.mysql.streaming.m.conf is as follows:

#v2.mysql.streaming.m.conf
env{
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
  MySQL-CDC {
   base-url="jdbc:mysql://192.168.1.100:3306/wubx"
   username="wubx"
   password="wubxwubx"
   table-names=["wubx.t01"]
   startup.mode="initial"
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://192.168.1.100:3306/wubx?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "wubx"
    password = "wubxwubx"
    generate_sink_sql = true
    # You need to configure both database and table
    database = wubx
    table = s_t01
    primary_keys = ["id"]
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    data_save_mode="APPEND_DATA"
  }
}
Enter fullscreen mode Exit fullscreen mode

Here is the line-by-line English translation of the content you provided:

Start SeaTunnel

./bin/seatunnel.sh --config ./config/v2.mysql.streaming.m.conf -m local
Enter fullscreen mode Exit fullscreen mode

Observe the logs on the terminal.

Sync process analysis

Full synchronization statements:

2025-05-07 14:56:01,024 INFO  [e.IncrementalSourceScanFetcher] [debezium-snapshot-reader-0] - Start snapshot read task for snapshot split: SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) exactly-once: false
2025-05-07 14:56:01,026 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)
2025-05-07 14:56:01,028 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 2 - Snapshotting data
2025-05-07 14:56:01,028 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Exporting data from split 'wubx.t01:0' of table wubx.t01
2025-05-07 14:56:01,028 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - For split 'wubx.t01:0' of table wubx.t01 using select statement: 'SELECT * FROM `wubx`.`t01`'
2025-05-07 14:56:01,032 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Finished exporting 3 records for split 'wubx.t01:0', total duration '00:00:00.004'
2025-05-07 14:56:01,033 INFO  [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)
2025-05-07 14:56:01,519 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=972391330309210113, pipelineId=1, taskGroupId=2}] - Finished reading from splits [wubx.t01:0]
Enter fullscreen mode Exit fullscreen mode

Sink side prepared SQL statements for writing data:

2025-05-07 14:56:01,708 INFO  [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is:
INSERT INTO `wubx`.`s_t01` (`id`, `c1`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `c1`=VALUES(`c1`)

2025-05-07 14:56:01,709 INFO  [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is:
DELETE FROM `wubx`.`s_t01` WHERE `id` = ?
Enter fullscreen mode Exit fullscreen mode

From the above statements, it can be seen that the corresponding binlog events can be directly handled as follows:

  • insert and update can be handled directly by the statement: INSERT INTO wubx.s_t01 (id, c1) VALUES (?, ?) ON DUPLICATE KEY UPDATE id=VALUES(id), c1=VALUES(c1)
  • delete can be handled by the statement: DELETE FROM wubx.s_t01 WHERE id = ?

Summary

SeaTunnel MySQL-CDC is relatively stable. The underlying data reading uses Debezium, which is a very mature and reliable tool.

source: MySQL-CDC sink: s3 format json

This section focuses on the data synchronization foundation in cloud environments, especially how to complete data synchronization at the lowest cost. When synchronizing data in the cloud, it is necessary to consider how to complete the task at minimal cost. In overseas projects, developers prefer to use kafka-connect, usually sinking data to S3 first, then processing files in S3 in batch to finally obtain a complete dataset.

Use the configuration file v2.mysql.streaming.s3.conf directly:

env{
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
  MySQL-CDC {
   base-url="jdbc:mysql://192.168.1.100:3306/wubx"
   username="wubx"
   password="wubxwubx"
   table-names=["wubx.t01"]
   startup.mode="initial"
  }
}

sink {
  S3File {
    bucket = "s3a://mystage"
    tmp_path = "/tmp/SeaTunnel/${table_name}"
    path="/mysql/${table_name}"
    fs.s3a.endpoint="http://192.168.1.100:9900"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "minioadmin"
    secret_key = "minioadmin"
    file_format_type="json"
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    data_save_mode="APPEND_DATA"
  }
}
Enter fullscreen mode Exit fullscreen mode

First, sink using json format.

Start SeaTunnel

./bin/seatunnel.sh --config ./config/v2.mysql.streaming.s3.conf -m local
Enter fullscreen mode Exit fullscreen mode

Observe the logs on the terminal.

Found full synchronization

2025-05-07 15:14:41,430 INFO  [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-42] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_1/NON_PARTITION/T_972396021571125249_c679929b12_0_1_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_1_0.json] finish
Enter fullscreen mode Exit fullscreen mode

Content of /mysql/t01/T_972396021571125249_c679929b12_0_1_0.json:

{"id":1,"c1":"databend"}
{"id":3,"c1":"MySQL"}
{"id":4,"c1":"Setunnel01"}
{"id":5,"c1":"SeaTunnel"}
Enter fullscreen mode Exit fullscreen mode

Seeing this is somewhat disappointing; it seems to lack the kind and timestamp fields.

Insert on source side

Next,
insert into t01 values(6,'SeaTunnel01');

2025-05-07 15:18:59,380 INFO  [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-16] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_130/NON_PARTITION/T_972396021571125249_c679929b12_0_130_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_130_0.json] finish
Enter fullscreen mode Exit fullscreen mode

Content of T_972396021571125249_c679929b12_0_130_0.json:

{"id":6,"c1":"SeaTunnel01"}
Enter fullscreen mode Exit fullscreen mode

source side update

Execute statement:

update t01 set c1='MySQL-CDC' where id=5;
Enter fullscreen mode Exit fullscreen mode

Log info:

2025-05-07 15:20:15,386 INFO  [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-9] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_168/NON_PARTITION/T_972396021571125249_c679929b12_0_168_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_168_0.json] finish
Enter fullscreen mode Exit fullscreen mode

The corresponding JSON file content:

{"id":5,"c1":"SeaTunnel"}
{"id":5,"c1":"MySQL-CDC"}
Enter fullscreen mode Exit fullscreen mode

One update operation recorded two data lines in the json file, but due to the lack of operation type (kind) and timestamp fields, it is difficult to accurately restore the data change process. If a timestamp field were included, the latest record could be kept.

source side delete

Execute statement:

delete from t01 where id=5;
Enter fullscreen mode Exit fullscreen mode

Log info:

2025-05-07 15:22:53,392 INFO  [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-6] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_247/NON_PARTITION/T_972396021571125249_c679929b12_0_247_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_247_0.json] finish
Enter fullscreen mode Exit fullscreen mode

Corresponding JSON file content:

{"id":5,"c1":"MySQL-CDC"}
Enter fullscreen mode Exit fullscreen mode

The delete operation also lacks an operation type (kind) and only records one original data line, making it difficult for subsequent data processing and tracing.

Summary

Therefore, using SeaTunnel’s S3File sink with JSON format for data tracing is currently not feasible. It is recommended that the S3File sink adds support for maxwell_json and debezium_json formats.

https://github.com/apache/SeaTunnel/issues/9278

Looking forward to this feature enhancement, so that SeaTunnel can sync all data to S3, allowing S3 to play the role of a message queue.

source: MySQL-CDC sink: Kafka

The open source world is very interesting; if a feature is not achievable, there is always an alternative.
Since MySQL-CDC is based on Debezium underneath, it should support the Debezium format.

https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/debezium-json
Also supports
https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/maxwell-json

This means SeaTunnel, to maintain compatibility with Debezium and Maxwell, supports choosing these two formats when sinking to Kafka.

debezium-json

{
    "before": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter ",
        "weight": 5.18
    },
    "after": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter ",
        "weight": 5.17
    },
    "source": {
        "version": "1.1.1.Final",
        "connector": "mysql",
        "name": "dbserver1",
        "ts_ms": 1589362330000,
        "snapshot": "false",
        "db": "inventory",
        "table": "products",
        "server_id": 223344,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 2090,
        "row": 0,
        "thread": 2,
        "query": null
    },
    "op": "u",
    "ts_ms": 1589362330904,
    "transaction": null
}
Enter fullscreen mode Exit fullscreen mode

The above format’s data can be easily processed in Databend or Snowflake. You can use the fields

    "op": "u",
    "ts_ms": 1589362330904,
Enter fullscreen mode Exit fullscreen mode

to merge the data into the target table via merge into + stream method.


maxwell-json

{
    "database":"test",
    "table":"product",
    "type":"insert",
    "ts":1596684904,
    "xid":7201,
    "commit":true,
    "data":{
        "id":111,
        "name":"scooter",
        "description":"Big 2-wheel scooter ",
        "weight":5.18
    },
    "primary_key_columns":[
        "id"
    ]
}
Enter fullscreen mode Exit fullscreen mode

This JSON body contains type, ts, and primary key fields, making it very convenient to use SQL for ELT processing later.

Summary

In other words, if you want to output this kind of standard CDC format logs using SeaTunnel, you need to introduce Kafka-like architecture:

1

After talking with community members, it turns out some people do this—syncing messages from Kafka to OSS.

Example of integrating maxwell-json message body with Databend

  1. Create an update table for recording binlog message details:
create table t01_update(
  database varchar,
  table varchar,
  type varchar,
  ts bigint,
  xid bigint,
  commit boolean,
  data variant,
  primary_key_columns array(varchar)
);
Enter fullscreen mode Exit fullscreen mode

This table’s data source can be obtained from S3, and loaded near real-time into t01_update using copy into.

  1. Create a target table:
create table t01(
  id int,
  name varchar,
  description varchar,
  weight double
);
Enter fullscreen mode Exit fullscreen mode
  1. Create a stream on t01_update table to capture its increments:
create stream stream_t01_update on table t01_update;
Enter fullscreen mode Exit fullscreen mode
  1. Merge the data into the target table in Databend:
MERGE INTO t01 AS a
USING (
    SELECT 
        data:id AS id,
        data:name AS name,
        data:description AS description,
        data:weight AS weight,
        ts,
        type
    FROM stream_t01_update
    QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) = 1
) AS b
ON a.id = b.id
WHEN MATCHED AND b.type = 'update' THEN
    UPDATE SET
        a.name = b.name,
        a.description = b.description,
        a.weight = b.weight
WHEN MATCHED AND b.type = 'delete' THEN
    DELETE
WHEN NOT MATCHED THEN
    INSERT (id, name, description, weight)
    VALUES (b.id, b.name, b.description, b.weight);
Enter fullscreen mode Exit fullscreen mode

This SQL uses window deduplication to merge raw binlog data into the target table.


SeaTunnel and Databend integration approaches

Based on analyzing MySQL-CDC output forms, there are three ways to integrate SeaTunnel with Databend:

  • First approach: directly develop a SeaTunnel connector for Databend supporting both sink and source. This is simpler to implement.
  • Second approach: add support for debezium-json and maxwell-json formats in S3File sink, which is more elegant. Incremental data can then be based on Databend Stream for easy external data source access.
  • Third approach: introduce Kafka as SeaTunnel sink to directly use debezium-json and maxwell-json messages, enabling downstream systems to subscribe increment data with data governance.

By testing SeaTunnel’s multiple output formats and behaviors, we preliminarily understand SeaTunnel MySQL-CDC capabilities, preparing for integration with Databend. Combined with Spark, Flink ecosystems, SeaTunnel can already handle large CDC tasks. If you have related practices, free free to share them in the comments!

Top comments (1)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.