Skip to content

Commit 4b93dd5

Browse files
committed
Feat: Implement PreviousGtidEvent
1 parent 45f823f commit 4b93dd5

File tree

5 files changed

+48
-10
lines changed

5 files changed

+48
-10
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
1515
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1616
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
17-
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
17+
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
18+
PreviousGtidEvent)
1819
from .exceptions import BinLogNotEnabled
1920
from .row_event import (
2021
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
@@ -600,7 +601,8 @@ def _allowed_event_list(self, only_events, ignored_events,
600601
TableMapEvent,
601602
HeartbeatLogEvent,
602603
NotImplementedEvent,
603-
MariadbGtidEvent
604+
MariadbGtidEvent,
605+
PreviousGtidEvent
604606
))
605607
if ignored_events is not None:
606608
for e in ignored_events:

pymysqlreplication/event.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,40 @@ def __repr__(self):
9090
return '<GtidEvent "%s">' % self.gtid
9191

9292

93+
class PreviousGtidEvent(BinLogEvent):
94+
"""
95+
PreviousGtidEvent is contains the Gtids executed in the last binary log file.
96+
Attributes:
97+
n_sid: which size is the gtid_set
98+
sid: 16bytes UUID as a binary
99+
n_intervals: how many intervals are sent
100+
Eg: [4c9e3dfc-9d25-11e9-8d2e-0242ac1cfd7e:1-100, 4c9e3dfc-9d25-11e9-8d2e-0242ac1cfd7e:1-10:20-30]
101+
"""
102+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
103+
super(PreviousGtidEvent, self).__init__(from_packet, event_size, table_map,
104+
ctl_connection, **kwargs)
105+
106+
self._n_sid = self.packet.read_int64()
107+
self._gtids = []
108+
109+
for _ in range(self._n_sid):
110+
sid = self.packet.read(16)
111+
n_intervals = self.packet.read_uint64()
112+
intervals = [f"{self.packet.read_int64()}-{self.packet.read_uint64()}" for _ in range(n_intervals)]
113+
nibbles = binascii.hexlify(sid).decode('ascii')
114+
gtid = '%s-%s-%s-%s-%s:%s' % (
115+
nibbles[:8], nibbles[8:12], nibbles[12:16], nibbles[16:20], nibbles[20:], ':'.join(intervals))
116+
self._gtids.append(gtid)
117+
118+
self._previous_gtids = ','.join(self._gtids)
119+
120+
def _dump(self):
121+
print("previous_gtids: %s" % self._previous_gtids)
122+
123+
def __repr__(self):
124+
return '<PreviousGtidEvent "%s">' % self._previous_gtids
125+
126+
93127
class MariadbGtidEvent(BinLogEvent):
94128
"""
95129
GTID change in binlog event in MariaDB
@@ -289,7 +323,7 @@ def _read_status_vars_value_for_key(self, key):
289323
elif key == Q_TIME_ZONE_CODE: # 0x05
290324
time_zone_len = self.packet.read_uint8()
291325
if time_zone_len:
292-
self.time_zone = self.packet.read(time_zone_len)
326+
self.time_zone = self.packet.read(time_zone_len)
293327
elif key == Q_CATALOG_NZ_CODE: # 0x06
294328
catalog_len = self.packet.read_uint8()
295329
if catalog_len:

pymysqlreplication/packet.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class BinLogPacketWrapper(object):
6565
constants.XID_EVENT: event.XidEvent,
6666
constants.INTVAR_EVENT: event.IntvarEvent,
6767
constants.GTID_LOG_EVENT: event.GtidEvent,
68+
constants.PREVIOUS_GTIDS_LOG_EVENT: event.PreviousGtidEvent,
6869
constants.STOP_EVENT: event.StopEvent,
6970
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
7071
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
@@ -81,7 +82,6 @@ class BinLogPacketWrapper(object):
8182

8283
#5.6 GTID enabled replication events
8384
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
84-
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
8585
# MariaDB GTID
8686
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent,
8787
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,

pymysqlreplication/tests/test_basic.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222

2323
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
2424
def ignoredEvents(self):
25-
return [GtidEvent]
25+
return [GtidEvent, PreviousGtidEvent]
2626

2727
def test_allowed_event_list(self):
28-
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 16)
29-
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 15)
30-
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 15)
28+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 17)
29+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 16)
30+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 16)
3131
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
3232

3333
def test_read_query_event(self):
@@ -522,7 +522,7 @@ def test_end_log_pos(self):
522522

523523
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
524524
def ignoredEvents(self):
525-
return [GtidEvent]
525+
return [GtidEvent, PreviousGtidEvent]
526526

527527
def test_insert_multiple_row_event(self):
528528
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
@@ -844,6 +844,7 @@ def test_read_query_event(self):
844844
query = "COMMIT;"
845845
self.execute(query)
846846

847+
self.assertIsInstance(self.stream.fetchone(), PreviousGtidEvent)
847848
firstevent = self.stream.fetchone()
848849
self.assertIsInstance(firstevent, GtidEvent)
849850

@@ -893,6 +894,7 @@ def test_position_gtid(self):
893894

894895
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
895896
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
897+
self.assertIsInstance(self.stream.fetchone(), PreviousGtidEvent)
896898
self.assertIsInstance(self.stream.fetchone(), GtidEvent)
897899
event = self.stream.fetchone()
898900

pymysqlreplication/tests/test_data_type.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def encode_value(v):
3232

3333
class TestDataType(base.PyMySQLReplicationTestCase):
3434
def ignoredEvents(self):
35-
return [GtidEvent]
35+
return [GtidEvent, PreviousGtidEvent]
3636

3737
def create_and_insert_value(self, create_query, insert_query):
3838
self.execute(create_query)

0 commit comments

Comments
 (0)