From 38beb08b5c0963875f22fa03782c0c8b05561901 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 20 Mar 2023 16:03:45 +0800 Subject: [PATCH 1/5] fix dml order --- .../engine/pebble/encoding/key.go | 25 ++++++++++++++++--- .../engine/pebble/encoding/key_test.go | 21 ++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go index 711ebc5389a..64ea9e7b968 100644 --- a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go +++ b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go @@ -21,6 +21,13 @@ import ( "go.uber.org/zap" ) +const ( + typeUnknown = iota + typeDelete + typeUpdate + typeInsert +) + // DecodeKey decodes a key to uniqueID, tableID, startTs, CRTs. func DecodeKey(key []byte) (uniqueID uint32, tableID uint64, startTs, CRTs uint64) { // uniqueID, tableID, CRTs, startTs, Key, Put/Delete @@ -75,7 +82,7 @@ func EncodeTsKey(uniqueID uint32, tableID uint64, CRTs uint64, startTs ...uint64 } // EncodeKey encodes a key according to event. -// Format: uniqueID, tableID, CRTs, startTs, Put/Delete, Key. +// Format: uniqueID, tableID, CRTs, startTs, delete/update/insert, Key. func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) []byte { if event.RawKV == nil { log.Panic("rawkv must not be nil", zap.Any("event", event)) @@ -96,9 +103,21 @@ func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) [ // startTs binary.BigEndian.PutUint64(uint64Buf[:], event.StartTs) buf = append(buf, uint64Buf[:]...) - // Let Delete < Put - binary.BigEndian.PutUint16(uint64Buf[:], ^uint16(event.RawKV.OpType)) + // Let Delete < Update < Insert + binary.BigEndian.PutUint16(uint64Buf[:], getDMLOrder(event.RawKV)) buf = append(buf, uint64Buf[:2]...) // key return append(buf, event.RawKV.Key...) } + +// getDMLOrder returns the order of the dml types: delete Date: Mon, 20 Mar 2023 16:40:32 +0800 Subject: [PATCH 2/5] fix dml order --- cdc/processor/sourcemanager/engine/pebble/encoding/key.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go index 64ea9e7b968..4395fcc8209 100644 --- a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go +++ b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go @@ -22,8 +22,7 @@ import ( ) const ( - typeUnknown = iota - typeDelete + typeDelete = iota + 1 typeUpdate typeInsert ) From e592e4257827d203a7b54cc2c3b82b7fd081f38b Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 20 Mar 2023 17:02:23 +0800 Subject: [PATCH 3/5] fix dml order --- cdc/processor/sourcemanager/engine/pebble/encoding/key.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go index 4395fcc8209..b0fa4f4e4e1 100644 --- a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go +++ b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go @@ -111,11 +111,9 @@ func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) [ // getDMLOrder returns the order of the dml types: delete 0 { return typeUpdate } return typeInsert From bf7ac1cc923915725148e41900125afa8f1c7649 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 20 Mar 2023 22:19:11 +0800 Subject: [PATCH 4/5] fix dml order --- cdc/model/mounter.go | 4 +++ cdc/model/mounter_test.go | 30 +++++++++++++++++++ .../engine/pebble/encoding/key.go | 2 +- 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 69be1f6f620..d38d6c7491c 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -131,6 +131,10 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { if i.RawKV.OpType == OpTypeDelete && j.RawKV.OpType != OpTypeDelete { return true } + // update DML + if i.RawKV.OldValue != nil && j.RawKV.OldValue == nil { + return true + } } return i.CRTs < j.CRTs } diff --git a/cdc/model/mounter_test.go b/cdc/model/mounter_test.go index b92b4c213dc..dd5e29a3f24 100644 --- a/cdc/model/mounter_test.go +++ b/cdc/model/mounter_test.go @@ -98,3 +98,33 @@ func TestResolvedTsEqual(t *testing.T) { t5 := ResolvedTs{Mode: BatchResolvedMode, Ts: 2, BatchID: 1} require.False(t, t1.Equal(t5)) } + +func TestComparePolymorphicEvents(t *testing.T) { + cases := []struct { + a *PolymorphicEvent + b *PolymorphicEvent + }{ + { + a: NewPolymorphicEvent(&RawKVEntry{ + OpType: OpTypeDelete, + }), + b: NewPolymorphicEvent(&RawKVEntry{ + OpType: OpTypePut, + }), + }, + { + a: NewPolymorphicEvent(&RawKVEntry{ + OpType: OpTypePut, + OldValue: []byte{0}, + Value: []byte{0}, + }), + b: NewPolymorphicEvent(&RawKVEntry{ + OpType: OpTypePut, + Value: []byte{0}, + }), + }, + } + for _, item := range cases { + require.True(t, ComparePolymorphicEvents(item.a, item.b)) + } +} diff --git a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go index b0fa4f4e4e1..fe7c99d6a75 100644 --- a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go +++ b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go @@ -113,7 +113,7 @@ func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) [ func getDMLOrder(rowKV *model.RawKVEntry) uint16 { if rowKV.OpType == model.OpTypeDelete { return typeDelete - } else if len(rowKV.OldValue) > 0 { + } else if rowKV.OldValue == nil { return typeUpdate } return typeInsert From ee3317aba3e7f386b10805f2db270f7f4f01531e Mon Sep 17 00:00:00 2001 From: sdojjy Date: Tue, 21 Mar 2023 09:38:34 +0800 Subject: [PATCH 5/5] fix dml order --- cdc/processor/sourcemanager/engine/pebble/encoding/key.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go index fe7c99d6a75..6a38a4909b5 100644 --- a/cdc/processor/sourcemanager/engine/pebble/encoding/key.go +++ b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go @@ -113,7 +113,7 @@ func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) [ func getDMLOrder(rowKV *model.RawKVEntry) uint16 { if rowKV.OpType == model.OpTypeDelete { return typeDelete - } else if rowKV.OldValue == nil { + } else if rowKV.OldValue != nil { return typeUpdate } return typeInsert