Skip to content

Commit 19622f5

Browse files
committed
Implement event validation code using crc32 4-byte
1 parent dc190cb commit 19622f5

File tree

4 files changed

+83
-6
lines changed

4 files changed

+83
-6
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ def __init__(self, connection_settings, server_id,
141141
slave_heartbeat=None,
142142
is_mariadb=False,
143143
annotate_rows_event=False,
144-
ignore_decode_errors=False):
144+
ignore_decode_errors=False,
145+
use_crc32=False,):
145146
"""
146147
Attributes:
147148
ctl_connection_settings: Connection settings for cluster holding
@@ -183,6 +184,7 @@ def __init__(self, connection_settings, server_id,
183184
used with 'is_mariadb'
184185
ignore_decode_errors: If true, any decode errors encountered
185186
when reading column data will be ignored.
187+
use_crc32: If true, use CRC32 4-byte for events validation, ensuring data integrity.
186188
"""
187189

188190
self.__connection_settings = connection_settings
@@ -205,6 +207,7 @@ def __init__(self, connection_settings, server_id,
205207
only_events, ignored_events, filter_non_implemented_events)
206208
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
207209
self.__ignore_decode_errors = ignore_decode_errors
210+
self.__use_crc32 = use_crc32
208211

209212
# We can't filter on packet level TABLE_MAP and rotate event because
210213
# we need them for handling other operations
@@ -534,7 +537,8 @@ def fetchone(self):
534537
self.__ignored_schemas,
535538
self.__freeze_schema,
536539
self.__fail_on_table_metadata_unavailable,
537-
self.__ignore_decode_errors)
540+
self.__ignore_decode_errors,
541+
self.__use_crc32,)
538542

539543
if binlog_event.event_type == ROTATE_EVENT:
540544
self.log_pos = binlog_event.event.position

pymysqlreplication/event.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import binascii
44
import struct
55
import datetime
6+
import zlib
67
from pymysqlreplication.constants.STATUS_VAR_KEY import *
78
from pymysqlreplication.exceptions import StatusVariableMismatch
89

@@ -16,7 +17,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
1617
ignored_schemas=None,
1718
freeze_schema=False,
1819
fail_on_table_metadata_unavailable=False,
19-
ignore_decode_errors=False):
20+
ignore_decode_errors=False,
21+
use_crc32=False,):
2022
self.packet = from_packet
2123
self.table_map = table_map
2224
self.event_type = self.packet.event_type
@@ -26,17 +28,32 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
2628
self.mysql_version = mysql_version
2729
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
2830
self._ignore_decode_errors = ignore_decode_errors
31+
self._use_crc32 = use_crc32
32+
self._is_event_valid = None
2933
# The event have been fully processed, if processed is false
3034
# the event will be skipped
3135
self._processed = True
3236
self.complete = True
37+
self._validate_event()
3338

3439
def _read_table_id(self):
3540
# Table ID is 6 byte
3641
# pad little-endian number
3742
table_id = self.packet.read(6) + b"\x00\x00"
3843
return struct.unpack('<Q', table_id)[0]
3944

45+
def _validate_event(self):
46+
if not self._use_crc32:
47+
return
48+
49+
self.packet.rewind(1)
50+
data = self.packet.read(19 + self.event_size)
51+
footer = self.packet.read(4)
52+
byte_data = zlib.crc32(data).to_bytes(4, byteorder='little')
53+
self._is_event_valid = True if byte_data == footer else False
54+
self.packet.read_bytes -= (19 + self.event_size + 4)
55+
self.packet.rewind(20)
56+
4057
def dump(self):
4158
print("=== %s ===" % (self.__class__.__name__))
4259
print("Date: %s" % (datetime.datetime.fromtimestamp(self.timestamp)

pymysqlreplication/packet.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ def __init__(self, from_packet, table_map,
104104
ignored_schemas,
105105
freeze_schema,
106106
fail_on_table_metadata_unavailable,
107-
ignore_decode_errors):
107+
ignore_decode_errors,
108+
use_crc32,):
108109
# -1 because we ignore the ok byte
109110
self.read_bytes = 0
110111
# Used when we want to override a value in the data buffer
@@ -134,6 +135,7 @@ def __init__(self, from_packet, table_map,
134135
if use_checksum:
135136
event_size_without_header = self.event_size - 23
136137
else:
138+
use_crc32 = False
137139
event_size_without_header = self.event_size - 19
138140

139141
self.event = None
@@ -150,7 +152,8 @@ def __init__(self, from_packet, table_map,
150152
ignored_schemas=ignored_schemas,
151153
freeze_schema=freeze_schema,
152154
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable,
153-
ignore_decode_errors=ignore_decode_errors)
155+
ignore_decode_errors=ignore_decode_errors,
156+
use_crc32=use_crc32)
154157
if self.event._processed == False:
155158
self.event = None
156159

pymysqlreplication/tests/test_basic.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
from pymysqlreplication.event import *
1919
from pymysqlreplication.constants.BINLOG import *
2020
from pymysqlreplication.row_event import *
21+
from pymysqlreplication.packet import BinLogPacketWrapper
22+
from pymysql.protocol import MysqlPacket
2123

22-
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"]
24+
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings",
25+
"TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting",
26+
"TestRowsQueryLogEvents"]
2327

2428

2529
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
@@ -522,6 +526,55 @@ def test_end_log_pos(self):
522526
self.assertEqual(last_log_pos, 888)
523527
self.assertEqual(last_event_type, TABLE_MAP_EVENT)
524528

529+
def test_event_validation(self):
530+
def create_binlog_packet_wrapper(pkt):
531+
return BinLogPacketWrapper(pkt, self.stream.table_map,
532+
self.stream._ctl_connection, self.stream.mysql_version,
533+
self.stream._BinLogStreamReader__use_checksum,
534+
self.stream._BinLogStreamReader__allowed_events_in_packet,
535+
self.stream._BinLogStreamReader__only_tables,
536+
self.stream._BinLogStreamReader__ignored_tables,
537+
self.stream._BinLogStreamReader__only_schemas,
538+
self.stream._BinLogStreamReader__ignored_schemas,
539+
self.stream._BinLogStreamReader__freeze_schema,
540+
self.stream._BinLogStreamReader__fail_on_table_metadata_unavailable,
541+
self.stream._BinLogStreamReader__ignore_decode_errors,
542+
self.stream._BinLogStreamReader__verify_crc32)
543+
544+
self.stream.close()
545+
self.stream = BinLogStreamReader(
546+
self.database,
547+
server_id=1024,
548+
blocking=False,
549+
use_crc32=True
550+
)
551+
# For event data, refer to the official document example data of mariaDB.
552+
# https://mariadb.com/kb/en/query_event/#example-with-crc32
553+
correct_event_data = (
554+
# OK value
555+
b"\x00"
556+
# Header
557+
b"q\x17(Z\x02\x8c'\x00\x00U\x00\x00\x00\x01\t\x00\x00\x00\x00"
558+
# Content
559+
b"f\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1a\x00"
560+
b"\x00\x00\x00\x00\x00\x01\x00\x00\x00P\x00\x00"
561+
b"\x00\x00\x06\x03std\x04\x08\x00\x08\x00\x08\x00\x00"
562+
b"TRUNCATE TABLE test.t4"
563+
# CRC 32, 4 Bytes
564+
b"Ji\x9e\xed"
565+
)
566+
# Assume a bit flip occurred while data was being transmitted q(1001000) -> U(0110111)
567+
modified_byte = b"U"
568+
wrong_event_data = correct_event_data[:1] + modified_byte + correct_event_data[2:]
569+
570+
packet = MysqlPacket(correct_event_data, 0)
571+
wrong_packet = MysqlPacket(wrong_event_data, 0)
572+
self.stream.fetchone() # for '_ctl_connection' parameter
573+
binlog_event = create_binlog_packet_wrapper(packet)
574+
wrong_event = create_binlog_packet_wrapper(wrong_packet)
575+
self.assertEqual(binlog_event.event._is_event_valid, True)
576+
self.assertNotEqual(wrong_event.event._is_event_valid, True)
577+
525578

526579
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
527580
def ignoredEvents(self):

0 commit comments

Comments
 (0)