Skip to content

Commit feb62e9

Browse files
authored
Merge branch 'julien-duponchelle:main' into feat/footer-crc32
2 parents 78d55d4 + a19a5a5 commit feb62e9

File tree

11 files changed

+353
-164
lines changed

11 files changed

+353
-164
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,14 @@ Limitations
5656

5757
https://python-mysql-replication.readthedocs.org/en/latest/limitations.html
5858

59-
Featured Books
59+
Featured
6060
=============
6161

6262
[Data Pipelines Pocket Reference](https://www.oreilly.com/library/view/data-pipelines-pocket/9781492087823/) (by James Densmore, O'Reilly): Introduced and exemplified in Chapter 4: Data Ingestion: Extracting Data.
6363

64+
[Streaming Changes in a Database with Amazon Kinesis](https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/) (by Emmanuel Espina, Amazon Web Services)
65+
66+
6467
Projects using this library
6568
===========================
6669

@@ -84,6 +87,7 @@ Projects using this library
8487
* MySQL to Kafka (https://github.com/scottpersinger/mysql-to-kafka/)
8588
* Aventri MySQL Monitor (https://github.com/aventri/mysql-monitor)
8689
* BitSwanPump: A real-time stream processor (https://github.com/LibertyAces/BitSwanPump)
90+
* clickhouse-mysql-data-reader: https://github.com/Altinity/clickhouse-mysql-data-reader
8791

8892
MySQL server settings
8993
=========================

docker-compose-test.yml

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,56 @@
1-
version: '3.2'
1+
version: '3.4'
2+
3+
x-mysql: &mysql
4+
environment:
5+
MYSQL_ALLOW_EMPTY_PASSWORD: true
6+
command: >
7+
mysqld
8+
--log-bin=mysql-bin.log
9+
--server-id 1
10+
--binlog-format=row
11+
--gtid_mode=on
12+
--enforce-gtid-consistency=on
13+
14+
x-mariadb: &mariadb
15+
environment:
16+
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
17+
command: >
18+
--server-id=1
19+
--default-authentication-plugin=mysql_native_password
20+
--log-bin=master-bin
21+
--binlog-format=row
22+
223
services:
3-
percona-5.7:
4-
platform: linux/amd64
24+
percona-5.7-ctl:
25+
<<: *mysql
526
image: percona:5.7
6-
environment:
7-
MYSQL_ALLOW_EMPTY_PASSWORD: true
8-
MYSQL_DATABASE: pymysqlreplication_test
927
ports:
10-
- 3306:3306
11-
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates
12-
restart: always
28+
- "3307:3306"
1329
networks:
1430
- default
1531

16-
percona-5.7-ctl:
32+
percona-5.7:
33+
<<: *mysql
1734
image: percona:5.7
18-
environment:
19-
MYSQL_ALLOW_EMPTY_PASSWORD: true
20-
MYSQL_DATABASE: pymysqlreplication_test
2135
ports:
22-
- 3307:3307
23-
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307
36+
- "3306:3306"
37+
networks:
38+
- default
39+
40+
mariadb-10.6:
41+
<<: *mariadb
42+
image: mariadb:10.6
43+
ports:
44+
- "3308:3306"
45+
volumes:
46+
- type: bind
47+
source: ./.mariadb
48+
target: /opt/key_file
49+
- type: bind
50+
source: ./.mariadb/my.cnf
51+
target: /etc/mysql/my.cnf
52+
networks:
53+
- default
2454

2555
pymysqlreplication:
2656
build:
@@ -30,6 +60,9 @@ services:
3060
BASE_IMAGE: python:3.11-alpine
3161
MYSQL_5_7: percona-5.7
3262
MYSQL_5_7_CTL: percona-5.7-ctl
63+
MYSQL_5_7_CTL_PORT: 3306
64+
MARIADB_10_6: mariadb-10.6
65+
MARIADB_10_6_PORT: 3306
3366

3467
command:
3568
- /bin/sh
@@ -39,7 +72,7 @@ services:
3972
4073
while :
4174
do
42-
if mysql -h percona-5.7 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null && mysql -h percona-5.7-ctl --port=3307 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null; then
75+
if mysql -h percona-5.7 --user=root --execute "SELECT version();" 2>&1 >/dev/null && mysql -h percona-5.7-ctl --user=root --execute "SELECT version();" 2>&1 >/dev/null; then
4376
break
4477
fi
4578
sleep 1
@@ -56,4 +89,5 @@ services:
5689
- percona-5.7-ctl
5790

5891
networks:
59-
default: {}
92+
default:
93+
driver: bridge

docker-compose.yml

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,47 @@
1-
version: '3.2'
1+
version: '3.4'
2+
3+
x-mysql: &mysql
4+
environment:
5+
MYSQL_ALLOW_EMPTY_PASSWORD: true
6+
command: >
7+
mysqld
8+
--log-bin=mysql-bin.log
9+
--server-id 1
10+
--binlog-format=row
11+
--gtid_mode=on
12+
--enforce-gtid-consistency=on
13+
14+
x-mariadb: &mariadb
15+
environment:
16+
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
17+
command: >
18+
--log-bin=master-bin
19+
--server-id=1
20+
--default-authentication-plugin=mysql_native_password
21+
--binlog-format=row
22+
223
services:
324
percona-5.7:
25+
<<: *mysql
426
image: percona:5.7
5-
environment:
6-
MYSQL_ALLOW_EMPTY_PASSWORD: true
727
ports:
8-
- 3306:3306
9-
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates
28+
- "3306:3306"
1029

1130
percona-5.7-ctl:
31+
<<: *mysql
1232
image: percona:5.7
13-
environment:
14-
MYSQL_ALLOW_EMPTY_PASSWORD: true
1533
ports:
16-
- 3307:3307
17-
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307
34+
- "3307:3306"
1835

1936
mariadb-10.6:
37+
<<: *mariadb
2038
image: mariadb:10.6
21-
environment:
22-
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
2339
ports:
2440
- "3308:3306"
25-
command: |
26-
--server-id=1
27-
--default-authentication-plugin=mysql_native_password
28-
--log-bin=master-bin
29-
--binlog-format=row
30-
--log-slave-updates=on
3141
volumes:
3242
- type: bind
3343
source: ./.mariadb
3444
target: /opt/key_file
3545
- type: bind
3646
source: ./.mariadb/my.cnf
37-
target: /etc/mysql/my.cnf
47+
target: /etc/mysql/my.cnf

examples/mariadb_gtid/read_event.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pymysql
22

33
from pymysqlreplication import BinLogStreamReader, gtid
4-
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent
4+
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent, MariadbBinLogCheckPointEvent
55
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
66

77
MARIADB_SETTINGS = {
@@ -62,6 +62,7 @@ def query_server_id(self):
6262
blocking=False,
6363
only_events=[
6464
MariadbGtidEvent,
65+
MariadbBinLogCheckPointEvent,
6566
RotateEvent,
6667
WriteRowsEvent,
6768
UpdateRowsEvent,

pymysqlreplication/binlogstream.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1414
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1515
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
16-
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent)
16+
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent,
17+
MariadbGtidListEvent, MariadbBinLogCheckPointEvent)
1718
from .exceptions import BinLogNotEnabled
1819
from .gtid import GtidSet
1920
from .packet import BinLogPacketWrapper
@@ -628,6 +629,8 @@ def _allowed_event_list(self, only_events, ignored_events,
628629
MariadbAnnotateRowsEvent,
629630
RandEvent,
630631
MariadbStartEncryptionEvent,
632+
MariadbGtidListEvent,
633+
MariadbBinLogCheckPointEvent
631634
))
632635
if ignored_events is not None:
633636
for e in ignored_events:
@@ -655,9 +658,8 @@ def __get_table_information(self, schema, table):
655658
information_schema.columns
656659
WHERE
657660
table_schema = %s AND table_name = %s
658-
ORDER BY ORDINAL_POSITION
659661
""", (schema, table))
660-
result = cur.fetchall()
662+
result = sorted(cur.fetchall(), key=lambda x: x['ORDINAL_POSITION'])
661663
cur.close()
662664

663665
return result

pymysqlreplication/event.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def _validate_event(self):
5656

5757
def dump(self):
5858
print("=== %s ===" % (self.__class__.__name__))
59-
print("Date: %s" % (datetime.datetime.fromtimestamp(self.timestamp)
59+
print("Date: %s" % (datetime.datetime.utcfromtimestamp(self.timestamp)
6060
.isoformat()))
6161
print("Log position: %d" % self.packet.log_pos)
6262
print("Event size: %d" % (self.event_size))
@@ -127,6 +127,25 @@ def _dump(self):
127127
print("Flags:", self.flags)
128128
print('GTID:', self.gtid)
129129

130+
class MariadbBinLogCheckPointEvent(BinLogEvent):
131+
"""
132+
Represents a checkpoint in a binlog event in MariaDB.
133+
134+
More details are available in the MariaDB Knowledge Base:
135+
https://mariadb.com/kb/en/binlog_checkpoint_event/
136+
137+
:ivar filename_length: int - The length of the filename.
138+
:ivar filename: str - The name of the file saved at the checkpoint.
139+
"""
140+
141+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
142+
super(MariadbBinLogCheckPointEvent, self).__init__(from_packet, event_size, table_map, ctl_connection,
143+
**kwargs)
144+
filename_length = self.packet.read_uint32()
145+
self.filename = self.packet.read(filename_length).decode()
146+
147+
def _dump(self):
148+
print('Filename:', self.filename)
130149

131150
class MariadbAnnotateRowsEvent(BinLogEvent):
132151
"""
@@ -143,7 +162,41 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
143162

144163
def _dump(self):
145164
super()._dump()
146-
print("SQL statement :", self.sql_statement)
165+
print("SQL statement :", self.sql_statement)
166+
167+
class MariadbGtidListEvent(BinLogEvent):
168+
"""
169+
GTID List event
170+
https://mariadb.com/kb/en/gtid_list_event/
171+
172+
Attributes:
173+
gtid_length: Number of GTIDs
174+
gtid_list: list of 'MariadbGtidObejct'
175+
176+
'MariadbGtidObejct' Attributes:
177+
domain_id: Replication Domain ID
178+
server_id: Server_ID
179+
gtid_seq_no: GTID sequence
180+
gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no'
181+
"""
182+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
183+
184+
super(MariadbGtidListEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
185+
186+
class MariadbGtidObejct(BinLogEvent):
187+
"""
188+
Information class of elements in GTID list
189+
"""
190+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
191+
super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
192+
self.domain_id = self.packet.read_uint32()
193+
self.server_id = self.packet.read_uint32()
194+
self.gtid_seq_no = self.packet.read_uint64()
195+
self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no)
196+
197+
198+
self.gtid_length = self.packet.read_uint32()
199+
self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)]
147200

148201

149202
class RotateEvent(BinLogEvent):

pymysqlreplication/packet.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,9 @@ class BinLogPacketWrapper(object):
8888
# MariaDB GTID
8989
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,
9090
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,
91+
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.MariadbBinLogCheckPointEvent,
9192
constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent,
92-
constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent,
93+
constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent,
9394
constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent
9495
}
9596

0 commit comments

Comments
 (0)