Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial version of craft binary mq message format #1621

Merged
merged 52 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
161d0a8
Initial version of craft binary mq message format
sunxiaoguang Apr 8, 2021
96ee865
Add license
sunxiaoguang Apr 8, 2021
7347d7c
Fix some lint
sunxiaoguang Apr 8, 2021
86f4612
Add more test to supress not used function lint
sunxiaoguang Apr 8, 2021
1cb0a2d
Remove not useful decodeRowChangedEvent function
sunxiaoguang Apr 8, 2021
b570de2
Use the correct size table to decode column groups
sunxiaoguang Apr 8, 2021
d1908d5
Fix more lints
sunxiaoguang Apr 8, 2021
d2bdea4
Fix lint
sunxiaoguang Apr 8, 2021
0e21331
Fix lint
sunxiaoguang Apr 8, 2021
97b9e1c
Change to little endian like what protobuf does.
sunxiaoguang Apr 12, 2021
dc77557
Remove comments and fix column group values encoding
sunxiaoguang Apr 13, 2021
b3a5ff9
Set schema and table to nil when they are empty
sunxiaoguang Apr 13, 2021
e9d080f
Merge branch 'master' into craft
sunxiaoguang Apr 14, 2021
f1c602f
Add more test cases and fix bugs about the fail tests
sunxiaoguang Apr 17, 2021
51d7980
Add protobuf vs craft benchmark related code
sunxiaoguang Apr 20, 2021
5cce40c
Fix column group & year type seralization bug
sunxiaoguang Apr 22, 2021
3f21064
Force encoding year type to varint
sunxiaoguang Apr 24, 2021
495ffb2
Fix body size table
sunxiaoguang Apr 30, 2021
051a9a7
Merge branch 'master' into craft
sunxiaoguang May 8, 2021
aca3558
Merge branch 'master' into craft
sunxiaoguang Jun 8, 2021
aec1ee4
Disable lint for not yet used utility functions
sunxiaoguang Jun 12, 2021
fd67c9e
Merge branch 'master' into craft
sunxiaoguang Jun 12, 2021
c1af304
Fix lint
sunxiaoguang Jun 12, 2021
eec4235
Merge branch 'craft' of github.com:sunxiaoguang/ticdc into craft
sunxiaoguang Jun 12, 2021
b9c3e56
Fix lint
sunxiaoguang Jun 12, 2021
8a14395
Fix lint
sunxiaoguang Jun 12, 2021
2311f10
Fix lint
sunxiaoguang Jun 12, 2021
a29ab5b
Fix lint
sunxiaoguang Jun 12, 2021
f31ddea
Remove unnecessary assignment
sunxiaoguang Jun 12, 2021
dc24ba3
Fix errdoc
sunxiaoguang Jun 12, 2021
c7f1018
Change decoder to use body size table instead of offset table
sunxiaoguang Jun 13, 2021
e5e87e9
Merge branch 'master' into craft
sunxiaoguang Jun 13, 2021
dfa4929
Fix json.Number.Int64() overflow
sunxiaoguang Jun 13, 2021
c3987fa
Convert from float64 to int64 or uint64
sunxiaoguang Jun 13, 2021
695358c
Use c.Logf instead of fmt.Printf
sunxiaoguang Jun 15, 2021
66990b8
Merge branch 'master' into craft
sunxiaoguang Jun 15, 2021
c143701
Seed random number generator in init function
sunxiaoguang Jun 15, 2021
c0ae253
Add comments to varint encoding logic
sunxiaoguang Jun 15, 2021
61bf907
Merge branch 'master' into craft
sunxiaoguang Jun 16, 2021
8cc9326
Merge branch 'master' into craft
sunxiaoguang Jun 16, 2021
b17d59c
Fix couple of typos and add some comments
sunxiaoguang Jun 18, 2021
c20427a
Merge branch 'master' into craft
ti-chi-bot Jun 21, 2021
686b55c
Remove deprecated RowID from header
sunxiaoguang Jun 21, 2021
618b773
Optimize codec size
sunxiaoguang Jun 21, 2021
282af92
Merge remote-tracking branch 'myrepo/craft' into craft
sunxiaoguang Jun 21, 2021
34a222f
Fix lints
sunxiaoguang Jun 21, 2021
7c09d32
Change size table to encode with varint chunk
sunxiaoguang Jun 21, 2021
8ff5468
Always encode terms in delta varint chunk
sunxiaoguang Jun 21, 2021
1314185
Change header to encode type with delta varint chunk
sunxiaoguang Jun 21, 2021
b96c3f5
Use uvarint chunk to encode type of headers
sunxiaoguang Jun 21, 2021
b581269
Merge branch 'master' into craft
amyangfei Jun 22, 2021
d36d5bf
Merge branch 'master' into craft
ti-chi-bot Jun 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
558 changes: 558 additions & 0 deletions cdc/sink/codec/codec_test.go

Large diffs are not rendered by default.

260 changes: 260 additions & 0 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.orglicensesLICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import (
"math"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink/codec/craft"
cerror "github.com/pingcap/ticdc/pkg/errors"
)

// CraftEventBatchEncoder encodes the events into the byte of a batch into craft binary format.
type CraftEventBatchEncoder struct {
rowChangedBuffer *craft.RowChangedEventBuffer
messageBuf []*MQMessage

// configs
maxMessageSize int
maxBatchSize int

allocator *craft.SliceAllocator
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
return newResolvedMQMessage(ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil
}

func (e *CraftEventBatchEncoder) flush() {
headers := e.rowChangedBuffer.GetHeaders()
ts := headers.GetTs(0)
schema := headers.GetSchema(0)
table := headers.GetTable(0)
e.messageBuf = append(e.messageBuf, NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table))
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent) (EncoderResult, error) {
rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev)
if size > e.maxMessageSize || rows >= e.maxBatchSize {
e.flush()
}
return EncoderNoOperation, nil
}

// AppendResolvedEvent is no-op
func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeDDLEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) {
return newDDLMQMessage(ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil
}

// Build implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Build() []*MQMessage {
if e.rowChangedBuffer.Size() > 0 {
// flush buffered data to message buffer
e.flush()
}
ret := e.messageBuf
e.messageBuf = make([]*MQMessage, 0, 2)
return ret
}

// MixedBuild implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) MixedBuild(withVersion bool) []byte {
panic("Only JsonEncoder supports mixed build")
}

// Size implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Size() int {
return e.rowChangedBuffer.Size()
}

// Reset implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Reset() {
e.rowChangedBuffer.Reset()
}

// SetParams reads relevant parameters for craft protocol
func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error {
var err error
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have this defined somewhere, not hardcoded, i will add a todo., fix this later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will keep it as it is.

e.maxMessageSize, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
} else {
e.maxMessageSize = DefaultMaxMessageBytes
}

if e.maxMessageSize <= 0 || e.maxMessageSize > math.MaxInt32 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stupid question, why >math.MaxInt32

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question. Craft codec made some assumptions to encode data more efficiently. One of such assumption is that single encoded message will not be greater than 2GB.

return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageSize))
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
e.maxBatchSize, err = strconv.Atoi(maxBatchSize)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
} else {
e.maxBatchSize = DefaultMaxBatchSize
}

if e.maxBatchSize <= 0 || e.maxBatchSize > math.MaxUint16 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another stupid question, why this one e.maxMessageSize > math.MaxInt32, not MaxUint32, but e.maxBatchSize > math.MaxUint16 not MaxInt16

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like last question, this is to limit number of encoded events in a single message to be less than 64K which should be large enough.

return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-batch-size %d", e.maxBatchSize))
}
return nil
}

// NewCraftEventBatchEncoder creates a new CraftEventBatchEncoder.
func NewCraftEventBatchEncoder() EventBatchEncoder {
// 64 is a magic number that come up with these assumptions and manual benchmark.
// 1. Most table will not have more than 64 columns
// 2. It only worth allocating slices in batch for slices that's small enough
return NewCraftEventBatchEncoderWithAllocator(craft.NewSliceAllocator(64))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could u add some comments why 64, thx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this is a magic number that based on codec design that can cover most small allocation size that worth using allocation buffer. Let me put a comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added

}

// NewCraftEventBatchEncoderWithAllocator creates a new CraftEventBatchEncoder with given allocator.
func NewCraftEventBatchEncoderWithAllocator(allocator *craft.SliceAllocator) EventBatchEncoder {
return &CraftEventBatchEncoder{
allocator: allocator,
messageBuf: make([]*MQMessage, 0, 2),
rowChangedBuffer: craft.NewRowChangedEventBuffer(allocator),
}
}

// CraftEventBatchDecoder decodes the byte of a batch into the original messages.
type CraftEventBatchDecoder struct {
headers *craft.Headers
decoder *craft.MessageDecoder
index int

allocator *craft.SliceAllocator
}

// HasNext implements the EventBatchDecoder interface
func (b *CraftEventBatchDecoder) HasNext() (model.MqMessageType, bool, error) {
if b.index >= b.headers.Count() {
return model.MqMessageTypeUnknown, false, nil
}
return b.headers.GetType(b.index), true, nil
}

// NextResolvedEvent implements the EventBatchDecoder interface
func (b *CraftEventBatchDecoder) NextResolvedEvent() (uint64, error) {
ty, hasNext, err := b.HasNext()
if err != nil {
return 0, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeResolved {
return 0, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message")
}
ts := b.headers.GetTs(b.index)
b.index++
return ts, nil
}

// NextRowChangedEvent implements the EventBatchDecoder interface
func (b *CraftEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
ty, hasNext, err := b.HasNext()
if err != nil {
return nil, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeRow {
return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message")
}
oldValue, newValue, err := b.decoder.RowChangedEvent(b.index)
if err != nil {
return nil, errors.Trace(err)
}
ev := &model.RowChangedEvent{}
if oldValue != nil {
if ev.PreColumns, err = oldValue.ToModel(); err != nil {
return nil, errors.Trace(err)
}
}
if newValue != nil {
if ev.Columns, err = newValue.ToModel(); err != nil {
return nil, errors.Trace(err)
}
}
ev.CommitTs = b.headers.GetTs(b.index)
ev.Table = &model.TableName{
Schema: b.headers.GetSchema(b.index),
Table: b.headers.GetTable(b.index),
}
partition := b.headers.GetPartition(b.index)
if partition >= 0 {
ev.Table.TableID = partition
ev.Table.IsPartition = true
}
b.index++
return ev, nil
}

// NextDDLEvent implements the EventBatchDecoder interface
func (b *CraftEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
ty, hasNext, err := b.HasNext()
if err != nil {
return nil, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeDDL {
return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found ddl event message")
}
ddlType, query, err := b.decoder.DDLEvent(b.index)
if err != nil {
return nil, errors.Trace(err)
}
event := &model.DDLEvent{
CommitTs: b.headers.GetTs(b.index),
Query: query,
Type: ddlType,
TableInfo: &model.SimpleTableInfo{
Schema: b.headers.GetSchema(b.index),
Table: b.headers.GetTable(b.index),
},
}
b.index++
return event, nil
}

// NewCraftEventBatchDecoder creates a new CraftEventBatchDecoder.
func NewCraftEventBatchDecoder(bits []byte) (EventBatchDecoder, error) {
return NewCraftEventBatchDecoderWithAllocator(bits, craft.NewSliceAllocator(64))
}

// NewCraftEventBatchDecoderWithAllocator creates a new CraftEventBatchDecoder with given allocator.
func NewCraftEventBatchDecoderWithAllocator(bits []byte, allocator *craft.SliceAllocator) (EventBatchDecoder, error) {
decoder, err := craft.NewMessageDecoder(bits, allocator)
if err != nil {
return nil, errors.Trace(err)
}
headers, err := decoder.Headers()
if err != nil {
return nil, errors.Trace(err)
}

return &CraftEventBatchDecoder{
headers: headers,
decoder: decoder,
allocator: allocator,
}, nil
}
Loading