DFS is a data synchronisation framework that implements data capture by streaming reads of mysql binlogs, targeting either mysql or other databases. The framework consists of a millisecond incremental data capture module (CDC), a persistent queue persist-queue based on a sqlite database, an automatic relational processing module (rel_mgr), and a business logical complex data transformation module (DPU) using dfs-entrypoint.py, which can be called in any combination. In a clustered database with an average daily load of 75,000 events, incremental data synchronisation can be completed with a latency of seconds (average latency of less than 1 second), while maintaining data integrity and consistency.
-
Used for Data Synchronisation - Used for data synchronisation when the table structure of the source database and the target database are consistent.
-
Used for Data Pipeline - Used for data synchronisation when the table structure of the source database and the target database are inconsistent, after DPU custom table structure mapping and data processing, data pipeline function can be achieved.
-
Used for Data Backup - without setting the target database, use the built-in DFS log database to achieve the incremental data event backup function.
flowchart
subgraph "Cloud"
SDB[("Source Database")]
BL[["fa:fa-file Binlog Stream"]]
SDB -.- BL
end
subgraph "fa:fa-cubes DFS Server"
subgraph "fa:fa-cube CDC Unit"
BLS{{"fa:fa-code binlog_stream_reader"}}
FM1{{"fa:fa-code cdc_field_mapper"}}
BL ==> BLS
BLS ==> |"Event"| FM1
end
Q[("fa:fa-database Persist Queue")]
LDB[("fa:fa-database DFS Database")]
FM1 ==> |"PUT"| Q
subgraph "fa:fa-cube DPUs"
RM{{"fa:fa-code relationship_manager"}}
FM2{{"fa:fa-code dpu_field_mapper"}}
SG{{"fa:fa-code statement_generator"}}
FM2 ==> |"Event"| SG
RM ==> |"Event"| FM2
end
Q ==> |"GET"| RM
subgraph "fa:fa-cube DFS Monitor"
RTM["fa:fa-window-maximize WEB UI"]
API{{"fa:fa-code DFS_fastapi_router"}}
end
LDB -.-> |"DFS INFO"| API -.-> |"Websocket"| RTM
BLS -.-> |"CDC Receive Log"| LDB
RM <==> |"DPU Relationship Creation AND Query"| LDB
FM2 -.-> |"DPU Processing Log"| LDB
SG -.-> |"DML Execution Log"| LDB
end
subgraph "Cloud"
TDB[("fa:fa-database Target Database")]
SG ==> |"DML EXECUTE"| TDB
end
Edit on mermaid.live: DFS workflow
- Source Database: MySQL 5.7.x (with binlog turned on and
binlog-format = ROW
) - Target database: MySQL 8.0.x
- Python version: 3.12+
- Python library: requirements.txt
-
Modify
config.example.yaml
as necessary and rename it toconfig.yaml
.If you want to specify the binlog file and binlog location, set
binlog_file
andbinlog_pos
inconfig.yaml
. -
Build and start the container
docker build -t mysql-dataflowsync:latest . docker compose up -d
Configuration: Using a 2022 M2 MacBook Pro (16GB), Python 3.12, source database is MySQL 5.7.x (PolarDB), target database is MySQL 8.0.x (PolarDB)
A change data capture unit implemented using the Python mysql-replication
and persist-queue
libraries.
CDC processing speed: Maximum
59 records per second (MAX 59rps
) on a single node, with an average of 17ms
per record.
Custom Data Processing Unit for unidirectional data synchronisation between two databases.
DPU processing speed: in the case of a single node, the maximum processing 18
records per second (MAX 18rps
), the average 55ms
processing a record.
This project is licensed under the MIT License.
- python-mysql-replication - Apache License, Version 2.0
- persist-queue - BSD-3-Clause license