From 428d9ba3aeca69228cc171c7a7bc0e06bb6c1d50 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Thu, 20 Mar 2025 18:52:36 +0100 Subject: [PATCH 1/5] Give `RowsEvent` methods to inspect the underlying event type. It's currently impossible to figure out what kind of operation led to a `RowsEvent`; multiple results indicate an `UPDATE`, but there's no way to differentiate an `INSERT` from a `DELETE`. --- replication/row_event.go | 12 ++++++++++++ replication/row_event_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/replication/row_event.go b/replication/row_event.go index 5f481e18c..dc7c3c6a1 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1120,6 +1120,18 @@ func (e *RowsEvent) Decode(data []byte) error { return e.DecodeData(pos, data) } +func (e *RowsEvent) IsInsert() bool { + return e.eventType == WRITE_ROWS_EVENTv0 || e.eventType == WRITE_ROWS_EVENTv1 || e.eventType == WRITE_ROWS_EVENTv2 || e.eventType == MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1 +} + +func (e *RowsEvent) IsUpdate() bool { + return e.eventType == UPDATE_ROWS_EVENTv0 || e.eventType == UPDATE_ROWS_EVENTv1 || e.eventType == UPDATE_ROWS_EVENTv2 || e.eventType == MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1 +} + +func (e *RowsEvent) IsDelete() bool { + return e.eventType == DELETE_ROWS_EVENTv0 || e.eventType == DELETE_ROWS_EVENTv1 || e.eventType == DELETE_ROWS_EVENTv2 || e.eventType == MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1 +} + func isBitSet(bitmap []byte, i int) bool { return bitmap[i>>3]&(1<<(uint(i)&7)) > 0 } diff --git a/replication/row_event_test.go b/replication/row_event_test.go index ea3a5363e..b8e4ef51b 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -1176,6 +1176,41 @@ func TestRowsDataExtraData(t *testing.T) { } } +func TestRowsEventTypoe(t *testing.T) { + testcases := []struct { + eventType EventType + isInsert bool + isUpdate bool + isDelete bool + }{ + {WRITE_ROWS_EVENTv0, true, false, false}, + {WRITE_ROWS_EVENTv1, true, false, false}, + {WRITE_ROWS_EVENTv2, true, false, false}, + {MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1, true, false, false}, + {UPDATE_ROWS_EVENTv0, false, true, false}, + {UPDATE_ROWS_EVENTv1, false, true, false}, + {UPDATE_ROWS_EVENTv2, false, true, false}, + {MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, false, true, false}, + {DELETE_ROWS_EVENTv0, false, false, true}, + {DELETE_ROWS_EVENTv1, false, false, true}, + {DELETE_ROWS_EVENTv2, false, false, true}, + {MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1, false, false, true}, + + // Whoops, these are not rows events at all + {EXEC_LOAD_EVENT, false, false, false}, + {HEARTBEAT_EVENT, false, false, false}, + } + + for _, tc := range testcases { + rev := new(RowsEvent) + rev.eventType = tc.eventType + + require.Equal(t, tc.isInsert, rev.IsInsert()) + require.Equal(t, tc.isUpdate, rev.IsUpdate()) + require.Equal(t, tc.isDelete, rev.IsDelete()) + } +} + func TestTableMapHelperMaps(t *testing.T) { /* CREATE TABLE `_types` ( From 115cf55ca8f5a67706244e4de35a9f62865d583c Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Fri, 21 Mar 2025 11:34:36 +0100 Subject: [PATCH 2/5] Rework individual methods for `INSERT`/`UPDATE`/`DELETE` into a single public RowsEvent type. --- replication/row_event.go | 44 +++++++++++++++++++++++++++-------- replication/row_event_test.go | 38 ++++++++++++++---------------- 2 files changed, 51 insertions(+), 31 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index dc7c3c6a1..f2d5b7ede 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -950,6 +950,29 @@ type RowsEvent struct { ignoreJSONDecodeErr bool } +// EnumRowsEventType is an abridged type describing the operation which triggered the given RowsEvent. +type EnumRowsEventType byte + +const ( + EnumRowsEventTypeUnknown = EnumRowsEventType(iota) + EnumRowsEventTypeInsert + EnumRowsEventTypeUpdate + EnumRowsEventTypeDelete +) + +func (t EnumRowsEventType) String() string { + switch t { + case EnumRowsEventTypeInsert: + return "Insert" + case EnumRowsEventTypeUpdate: + return "Update" + case EnumRowsEventTypeDelete: + return "Delete" + default: + return fmt.Sprintf("unknown (%d)", t) + } +} + // EnumRowImageType is allowed types for every row in mysql binlog. // See https://github.com/mysql/mysql-server/blob/1bfe02bdad6604d54913c62614bde57a055c8332/sql/rpl_record.h#L39 // enum class enum_row_image_type { WRITE_AI, UPDATE_BI, UPDATE_AI, DELETE_BI }; @@ -1120,16 +1143,17 @@ func (e *RowsEvent) Decode(data []byte) error { return e.DecodeData(pos, data) } -func (e *RowsEvent) IsInsert() bool { - return e.eventType == WRITE_ROWS_EVENTv0 || e.eventType == WRITE_ROWS_EVENTv1 || e.eventType == WRITE_ROWS_EVENTv2 || e.eventType == MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1 -} - -func (e *RowsEvent) IsUpdate() bool { - return e.eventType == UPDATE_ROWS_EVENTv0 || e.eventType == UPDATE_ROWS_EVENTv1 || e.eventType == UPDATE_ROWS_EVENTv2 || e.eventType == MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1 -} - -func (e *RowsEvent) IsDelete() bool { - return e.eventType == DELETE_ROWS_EVENTv0 || e.eventType == DELETE_ROWS_EVENTv1 || e.eventType == DELETE_ROWS_EVENTv2 || e.eventType == MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1 +func (e *RowsEvent) Type() EnumRowsEventType { + switch e.eventType { + case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: + return EnumRowsEventTypeInsert + case UPDATE_ROWS_EVENTv0, UPDATE_ROWS_EVENTv1, UPDATE_ROWS_EVENTv2, MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: + return EnumRowsEventTypeUpdate + case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: + return EnumRowsEventTypeDelete + default: + return EnumRowsEventTypeUnknown + } } func isBitSet(bitmap []byte, i int) bool { diff --git a/replication/row_event_test.go b/replication/row_event_test.go index b8e4ef51b..7b004c50a 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -1176,38 +1176,34 @@ func TestRowsDataExtraData(t *testing.T) { } } -func TestRowsEventTypoe(t *testing.T) { +func TestRowsEventType(t *testing.T) { testcases := []struct { eventType EventType - isInsert bool - isUpdate bool - isDelete bool + want EnumRowsEventType }{ - {WRITE_ROWS_EVENTv0, true, false, false}, - {WRITE_ROWS_EVENTv1, true, false, false}, - {WRITE_ROWS_EVENTv2, true, false, false}, - {MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1, true, false, false}, - {UPDATE_ROWS_EVENTv0, false, true, false}, - {UPDATE_ROWS_EVENTv1, false, true, false}, - {UPDATE_ROWS_EVENTv2, false, true, false}, - {MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, false, true, false}, - {DELETE_ROWS_EVENTv0, false, false, true}, - {DELETE_ROWS_EVENTv1, false, false, true}, - {DELETE_ROWS_EVENTv2, false, false, true}, - {MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1, false, false, true}, + {WRITE_ROWS_EVENTv0, EnumRowsEventTypeInsert}, + {WRITE_ROWS_EVENTv1, EnumRowsEventTypeInsert}, + {WRITE_ROWS_EVENTv2, EnumRowsEventTypeInsert}, + {MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeInsert}, + {UPDATE_ROWS_EVENTv0, EnumRowsEventTypeUpdate}, + {UPDATE_ROWS_EVENTv1, EnumRowsEventTypeUpdate}, + {UPDATE_ROWS_EVENTv2, EnumRowsEventTypeUpdate}, + {MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeUpdate}, + {DELETE_ROWS_EVENTv0, EnumRowsEventTypeDelete}, + {DELETE_ROWS_EVENTv1, EnumRowsEventTypeDelete}, + {DELETE_ROWS_EVENTv2, EnumRowsEventTypeDelete}, + {MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeDelete}, // Whoops, these are not rows events at all - {EXEC_LOAD_EVENT, false, false, false}, - {HEARTBEAT_EVENT, false, false, false}, + {EXEC_LOAD_EVENT, EnumRowsEventTypeUnknown}, + {HEARTBEAT_EVENT, EnumRowsEventTypeUnknown}, } for _, tc := range testcases { rev := new(RowsEvent) rev.eventType = tc.eventType - require.Equal(t, tc.isInsert, rev.IsInsert()) - require.Equal(t, tc.isUpdate, rev.IsUpdate()) - require.Equal(t, tc.isDelete, rev.IsDelete()) + require.Equal(t, tc.want, rev.Type()) } } From 70a112c58f34bb3a9ac361938a197313c8103643 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Fri, 21 Mar 2025 11:43:41 +0100 Subject: [PATCH 3/5] Nit: clarify difference between private an public `RowsEvent` types. --- replication/row_event.go | 1 + 1 file changed, 1 insertion(+) diff --git a/replication/row_event.go b/replication/row_event.go index f2d5b7ede..3c8e1e998 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -905,6 +905,7 @@ type RowsEvent struct { // for mariadb *_COMPRESSED_EVENT_V1 compressed bool + // raw event type associated with a RowsEvent eventType EventType Table *TableMapEvent From 66163dcb00fe448ec075325551dcf4a0970ae4d2 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Fri, 21 Mar 2025 13:19:04 +0100 Subject: [PATCH 4/5] List event type information on dumps for `RowsEvent`. --- replication/row_event.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/replication/row_event.go b/replication/row_event.go index 3c8e1e998..3883ae1dc 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -964,11 +964,11 @@ const ( func (t EnumRowsEventType) String() string { switch t { case EnumRowsEventTypeInsert: - return "Insert" + return "insert" case EnumRowsEventTypeUpdate: - return "Update" + return "update" case EnumRowsEventTypeDelete: - return "Delete" + return "delete" default: return fmt.Sprintf("unknown (%d)", t) } @@ -1854,6 +1854,7 @@ func (e *RowsEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Flags: %d\n", e.Flags) fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount) fmt.Fprintf(w, "NDB data: %s\n", e.NdbData) + fmt.Fprintf(w, "Event type: %s (%s)", e.Type(), e.eventType) fmt.Fprintf(w, "Values:\n") for _, rows := range e.Rows { From 3d48019a4500280d5e4787d2f82334cee8771d2e Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Fri, 28 Mar 2025 15:23:27 +0100 Subject: [PATCH 5/5] Verify `RowsEvent` type handling within `TransactionPayloadEvent` decoded event tests. --- replication/transaction_payload_event_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/replication/transaction_payload_event_test.go b/replication/transaction_payload_event_test.go index 829d4602a..1d517a11a 100644 --- a/replication/transaction_payload_event_test.go +++ b/replication/transaction_payload_event_test.go @@ -69,6 +69,8 @@ func TestTransactionPayloadEventDecode(t *testing.T) { } err := e.decodePayload() require.NoError(t, err) + + // Check raw events require.Len(t, e.Events, 8) require.Equal(t, QUERY_EVENT, e.Events[0].Header.EventType) require.Equal(t, TABLE_MAP_EVENT, e.Events[1].Header.EventType) @@ -78,4 +80,17 @@ func TestTransactionPayloadEventDecode(t *testing.T) { require.Equal(t, TABLE_MAP_EVENT, e.Events[5].Header.EventType) require.Equal(t, DELETE_ROWS_EVENTv2, e.Events[6].Header.EventType) require.Equal(t, XID_EVENT, e.Events[7].Header.EventType) + + // Check insert/update/delete rows events casting + ievent, ok := e.Events[2].Event.(*RowsEvent) + require.True(t, ok) + require.Equal(t, ievent.Type(), EnumRowsEventTypeInsert) + + uevent, ok := e.Events[4].Event.(*RowsEvent) + require.True(t, ok) + require.Equal(t, uevent.Type(), EnumRowsEventTypeUpdate) + + devent, ok := e.Events[6].Event.(*RowsEvent) + require.True(t, ok) + require.Equal(t, devent.Type(), EnumRowsEventTypeDelete) }