Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sorter(ticdc): fix dml order #8598

Merged
merged 8 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
if i.RawKV.OpType == OpTypeDelete && j.RawKV.OpType != OpTypeDelete {
return true
}
// update DML
if i.RawKV.OldValue != nil && j.RawKV.OldValue == nil {
return true
}
}
return i.CRTs < j.CRTs
}
Expand Down
30 changes: 30 additions & 0 deletions cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,33 @@ func TestResolvedTsEqual(t *testing.T) {
t5 := ResolvedTs{Mode: BatchResolvedMode, Ts: 2, BatchID: 1}
require.False(t, t1.Equal(t5))
}

func TestComparePolymorphicEvents(t *testing.T) {
cases := []struct {
a *PolymorphicEvent
b *PolymorphicEvent
}{
{
a: NewPolymorphicEvent(&RawKVEntry{
OpType: OpTypeDelete,
}),
b: NewPolymorphicEvent(&RawKVEntry{
OpType: OpTypePut,
}),
},
{
a: NewPolymorphicEvent(&RawKVEntry{
OpType: OpTypePut,
OldValue: []byte{0},
Value: []byte{0},
}),
b: NewPolymorphicEvent(&RawKVEntry{
OpType: OpTypePut,
Value: []byte{0},
}),
},
}
for _, item := range cases {
require.True(t, ComparePolymorphicEvents(item.a, item.b))
}
}
22 changes: 19 additions & 3 deletions cdc/processor/sourcemanager/engine/pebble/encoding/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"go.uber.org/zap"
)

const (
typeDelete = iota + 1
typeUpdate
typeInsert
)

// DecodeKey decodes a key to uniqueID, tableID, startTs, CRTs.
func DecodeKey(key []byte) (uniqueID uint32, tableID uint64, startTs, CRTs uint64) {
// uniqueID, tableID, CRTs, startTs, Key, Put/Delete
Expand Down Expand Up @@ -75,7 +81,7 @@ func EncodeTsKey(uniqueID uint32, tableID uint64, CRTs uint64, startTs ...uint64
}

// EncodeKey encodes a key according to event.
// Format: uniqueID, tableID, CRTs, startTs, Put/Delete, Key.
// Format: uniqueID, tableID, CRTs, startTs, delete/update/insert, Key.
func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) []byte {
if event.RawKV == nil {
log.Panic("rawkv must not be nil", zap.Any("event", event))
Expand All @@ -96,9 +102,19 @@ func EncodeKey(uniqueID uint32, tableID uint64, event *model.PolymorphicEvent) [
// startTs
binary.BigEndian.PutUint64(uint64Buf[:], event.StartTs)
buf = append(buf, uint64Buf[:]...)
// Let Delete < Put
binary.BigEndian.PutUint16(uint64Buf[:], ^uint16(event.RawKV.OpType))
// Let Delete < Update < Insert
binary.BigEndian.PutUint16(uint64Buf[:], getDMLOrder(event.RawKV))
buf = append(buf, uint64Buf[:2]...)
// key
return append(buf, event.RawKV.Key...)
}

// getDMLOrder returns the order of the dml types: delete<update<insert
func getDMLOrder(rowKV *model.RawKVEntry) uint16 {
if rowKV.OpType == model.OpTypeDelete {
return typeDelete
sdojjy marked this conversation as resolved.
Show resolved Hide resolved
} else if rowKV.OldValue != nil {
return typeUpdate
}
return typeInsert
}
21 changes: 21 additions & 0 deletions cdc/processor/sourcemanager/engine/pebble/encoding/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,27 @@ func TestEncodeKey(t *testing.T) {
CRTs: 3,
}),
)

// update < insert
mustLess(
0, 0,
1, 1,
model.NewPolymorphicEvent(&model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte{2},
StartTs: 1,
CRTs: 1,
Value: []byte{2},
OldValue: []byte{2},
}),
model.NewPolymorphicEvent(&model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte{2},
StartTs: 1,
CRTs: 1,
Value: []byte{2},
}),
)
}

func TestEncodeTsKey(t *testing.T) {
Expand Down