From 86417f55fa7b0e710f18b67aec30318e387f582c Mon Sep 17 00:00:00 2001 From: thsun Date: Tue, 11 Feb 2020 11:41:12 +0800 Subject: [PATCH 1/2] add previousgtidsevent --- replication/event.go | 50 +++++++++++++++++++++++++++++++++++++++++++ replication/parser.go | 2 ++ 2 files changed, 52 insertions(+) diff --git a/replication/event.go b/replication/event.go index 837055c1c..3c0a1b1f8 100644 --- a/replication/event.go +++ b/replication/event.go @@ -2,6 +2,7 @@ package replication import ( "encoding/binary" + "encoding/hex" "fmt" "io" "strconv" @@ -217,6 +218,55 @@ func (e *RotateEvent) Dump(w io.Writer) { fmt.Fprintln(w) } +type PreviousGTIDsEvent struct { + GTIDSets string +} + +func (e *PreviousGTIDsEvent) Decode(data []byte) error { + var previousGTIDSets []string + pos := 0 + uuidCount := binary.LittleEndian.Uint16(data[pos:pos+8]) + pos += 8 + + for i := uint16(0);i < uuidCount; i++ { + uuid := e.decodeUuid(data[pos:pos+16]) + pos += 16 + sliceCount := binary.LittleEndian.Uint16(data[pos:pos+8]) + pos += 8 + var intervals []string + for i := uint16(0);i < sliceCount; i++ { + start := e.decodeInterval(data[pos:pos+8]) + pos += 8 + stop := e.decodeInterval(data[pos:pos+8]) + pos += 8 + interval := "" + if stop == start+1 { + interval = fmt.Sprintf("%d",start) + }else { + interval = fmt.Sprintf("%d-%d",start,stop-1) + } + intervals = append(intervals,interval) + } + previousGTIDSets = append(previousGTIDSets,fmt.Sprintf("%s:%s",uuid,strings.Join(intervals,":"))) + } + e.GTIDSets = fmt.Sprintf("%s",strings.Join(previousGTIDSets,",")) + return nil +} + +func (e *PreviousGTIDsEvent) Dump(w io.Writer) { + fmt.Fprintf(w, "Previous GTID Event: %s\n", e.GTIDSets) + fmt.Fprintln(w) +} + +func (e *PreviousGTIDsEvent) decodeUuid(data []byte) string { + return fmt.Sprintf("%s-%s-%s-%s-%s",hex.EncodeToString(data[0:4]),hex.EncodeToString(data[5:6]), + hex.EncodeToString(data[7:8]),hex.EncodeToString(data[8:9]),hex.EncodeToString(data[10:])) +} + +func (e *PreviousGTIDsEvent) decodeInterval(data []byte) uint64 { + return binary.LittleEndian.Uint64(data) +} + type XIDEvent struct { XID uint64 diff --git a/replication/parser.go b/replication/parser.go index 13f111bf7..38d909f8c 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -271,6 +271,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( ee := &MariadbGTIDEvent{} ee.GTID.ServerID = h.ServerID e = ee + case PREVIOUS_GTIDS_EVENT: + e = &PreviousGTIDsEvent{} default: e = &GenericEvent{} } From 409fd09733db14c499e10b2b10f07f720d4242a8 Mon Sep 17 00:00:00 2001 From: thsun Date: Tue, 11 Feb 2020 15:42:31 +0800 Subject: [PATCH 2/2] add previousgtidsevent --- replication/event.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/replication/event.go b/replication/event.go index 3c0a1b1f8..5c44140ab 100644 --- a/replication/event.go +++ b/replication/event.go @@ -259,8 +259,8 @@ func (e *PreviousGTIDsEvent) Dump(w io.Writer) { } func (e *PreviousGTIDsEvent) decodeUuid(data []byte) string { - return fmt.Sprintf("%s-%s-%s-%s-%s",hex.EncodeToString(data[0:4]),hex.EncodeToString(data[5:6]), - hex.EncodeToString(data[7:8]),hex.EncodeToString(data[8:9]),hex.EncodeToString(data[10:])) + return fmt.Sprintf("%s-%s-%s-%s-%s",hex.EncodeToString(data[0:4]),hex.EncodeToString(data[4:6]), + hex.EncodeToString(data[6:8]),hex.EncodeToString(data[8:10]),hex.EncodeToString(data[10:])) } func (e *PreviousGTIDsEvent) decodeInterval(data []byte) uint64 {