Skip to content

Commit

Permalink
sink: Pass extra meta about the event to MQ producer (pingcap#1442)
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>
  • Loading branch information
sunxiaoguang authored Mar 1, 2021
1 parent 06401d5 commit 08d535c
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 54 deletions.
4 changes: 2 additions & 2 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
type MqMessageType int

const (
// MqMessageTypeUnknow is unknown type of message key
MqMessageTypeUnknow MqMessageType = iota
// MqMessageTypeUnknown is unknown type of message key
MqMessageTypeUnknown MqMessageType = iota
// MqMessageTypeRow is row type of message key
MqMessageTypeRow
// MqMessageTypeDDL is ddl type of message key
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (a *AvroEventBatchEncoder) GetKeySchemaManager() *AvroSchemaManager {
// AppendRowChangedEvent appends a row change event to the encoder
// NOTE: the encoder can only store one RowChangedEvent!
func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
mqMessage := NewMQMessage(nil, nil, e.CommitTs)
mqMessage := NewMQMessage(ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table)

if !e.IsDelete() {
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns)
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage,
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

return NewMQMessage(nil, b, e.CommitTs), nil
return newDDLMQMessage(ProtocolCanal, nil, b, e), nil
}

// Build implements the EventBatchEncoder interface
Expand All @@ -390,7 +390,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if err != nil {
log.Panic("Error when serializing Canal packet", zap.Error(err))
}
ret := NewMQMessage(nil, value, 0)
ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
d.messages.Reset()
d.resetPacket()
return []*MQMessage{ret}
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa
if err != nil {
return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err)
}
return NewMQMessage(nil, value, e.CommitTs), nil
return newDDLMQMessage(ProtocolCanalJSON, nil, value, e), nil
}

// Build implements the EventBatchEncoder interface
Expand All @@ -215,13 +215,13 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
return nil
}
ret := make([]*MQMessage, len(c.resolvedBuf))
for i := range c.resolvedBuf {
value, err := json.Marshal(c.resolvedBuf[i])
for i, msg := range c.resolvedBuf {
value, err := json.Marshal(msg)
if err != nil {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
ret[i] = NewMQMessage(nil, value, c.resolvedBuf[i].tikvTs)
ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
}
c.resolvedBuf = c.resolvedBuf[0:0]
return ret
Expand Down
37 changes: 30 additions & 7 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ package codec

import (
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -52,23 +54,44 @@ type EventBatchEncoder interface {

// MQMessage represents an MQ message to the mqSink
type MQMessage struct {
Key []byte
Value []byte
Ts uint64 // reserved for possible output sorting
Key []byte
Value []byte
Ts uint64 // reserved for possible output sorting
Schema *string // schema
Table *string // table
Type model.MqMessageType // type
Protocol Protocol // protocol
}

// Length returns the expected size of the Kafka message
func (m *MQMessage) Length() int {
return len(m.Key) + len(m.Value)
}

// PhysicalTime returns physical time part of Ts in time.Time
func (m *MQMessage) PhysicalTime() time.Time {
return oracle.GetTimeFromTS(m.Ts)
}

func newDDLMQMessage(proto Protocol, key, value []byte, event *model.DDLEvent) *MQMessage {
return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table)
}

func newResolvedMQMessage(proto Protocol, key, value []byte, ts uint64) *MQMessage {
return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved, nil, nil)
}

// NewMQMessage should be used when creating a MQMessage struct.
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(key []byte, value []byte, ts uint64) *MQMessage {
func NewMQMessage(proto Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage {
ret := &MQMessage{
Key: nil,
Value: nil,
Ts: ts,
Key: nil,
Value: nil,
Ts: ts,
Schema: schema,
Table: table,
Type: ty,
Protocol: proto,
}

if key != nil {
Expand Down
121 changes: 121 additions & 0 deletions cdc/sink/codec/interface_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import (
"github.com/pingcap/check"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/types"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
)

type codecInterfaceSuite struct {
}

var _ = check.Suite(&codecInterfaceSuite{})

func (s *codecInterfaceSuite) SetUpSuite(c *check.C) {
}

func (s *codecInterfaceSuite) TearDownSuite(c *check.C) {
}

func (s *codecInterfaceSuite) TestCreate(c *check.C) {
defer testleak.AfterTest(c)()
rowEvent := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t1",
},
PreColumns: []*model.Column{
{
Name: "a",
Value: 1,
Flag: model.HandleKeyFlag | model.PrimaryKeyFlag,
}, {
Name: "b",
Value: 2,
Flag: 0,
},
},
StartTs: 1234,
CommitTs: 5678,
}

msg := NewMQMessage(ProtocolDefault, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MqMessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table)

c.Assert(msg.Key, check.BytesEquals, []byte("key1"))
c.Assert(msg.Value, check.BytesEquals, []byte("value1"))
c.Assert(msg.Ts, check.Equals, rowEvent.CommitTs)
c.Assert(msg.Type, check.Equals, model.MqMessageTypeRow)
c.Assert(*msg.Schema, check.Equals, rowEvent.Table.Schema)
c.Assert(*msg.Table, check.Equals, rowEvent.Table.Table)
c.Assert(msg.Protocol, check.Equals, ProtocolDefault)

job := &timodel.Job{
ID: 1071,
TableID: 49,
SchemaName: "test",
Type: timodel.ActionAddColumn,
StartTS: 420536581131337731,
Query: "alter table t1 add column a int",
BinlogInfo: &timodel.HistoryInfo{
TableInfo: &timodel.TableInfo{
ID: 49,
Name: timodel.CIStr{O: "t1"},
Columns: []*timodel.ColumnInfo{
{ID: 1, Name: timodel.CIStr{O: "id"}, FieldType: types.FieldType{Flag: mysql.PriKeyFlag}, State: timodel.StatePublic},
{ID: 2, Name: timodel.CIStr{O: "a"}, FieldType: types.FieldType{}, State: timodel.StatePublic},
},
},
FinishedTS: 420536581196873729,
},
}
preTableInfo := &model.TableInfo{
TableName: model.TableName{
Schema: "test",
Table: "t1",
TableID: 49,
},
TableInfo: &timodel.TableInfo{
ID: 49,
Name: timodel.CIStr{O: "t1"},
Columns: []*timodel.ColumnInfo{
{ID: 1, Name: timodel.CIStr{O: "id"}, FieldType: types.FieldType{Flag: mysql.PriKeyFlag}, State: timodel.StatePublic},
},
},
}
ddlEvent := &model.DDLEvent{}
ddlEvent.FromJob(job, preTableInfo)

msg = newDDLMQMessage(ProtocolMaxwell, nil, []byte("value1"), ddlEvent)
c.Assert(msg.Key, check.IsNil)
c.Assert(msg.Value, check.BytesEquals, []byte("value1"))
c.Assert(msg.Ts, check.Equals, ddlEvent.CommitTs)
c.Assert(msg.Type, check.Equals, model.MqMessageTypeDDL)
c.Assert(*msg.Schema, check.Equals, ddlEvent.TableInfo.Schema)
c.Assert(*msg.Table, check.Equals, ddlEvent.TableInfo.Table)
c.Assert(msg.Protocol, check.Equals, ProtocolMaxwell)

msg = newResolvedMQMessage(ProtocolCanal, []byte("key1"), nil, 1234)
c.Assert(msg.Key, check.BytesEquals, []byte("key1"))
c.Assert(msg.Value, check.IsNil)
c.Assert(msg.Ts, check.Equals, uint64(1234))
c.Assert(msg.Type, check.Equals, model.MqMessageTypeResolved)
c.Assert(msg.Schema, check.IsNil)
c.Assert(msg.Table, check.IsNil)
c.Assert(msg.Protocol, check.Equals, ProtocolCanal)
}
12 changes: 8 additions & 4 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er
valueBuf := new(bytes.Buffer)
valueBuf.Write(valueLenByte[:])

ret := NewMQMessage(keyBuf.Bytes(), valueBuf.Bytes(), ts)
ret := newResolvedMQMessage(ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), ts)
return ret, nil
}

Expand Down Expand Up @@ -398,7 +398,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)

d.messageBuf = append(d.messageBuf, NewMQMessage(versionHead, nil, 0))
d.messageBuf = append(d.messageBuf, NewMQMessage(ProtocolDefault, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil))
d.curBatchSize = 0
}

Expand All @@ -407,6 +407,9 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table

if message.Length() > d.maxKafkaMessageSize {
// `len(d.messageBuf) == 1` is implied
Expand Down Expand Up @@ -454,7 +457,7 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e
valueBuf.Write(valueLenByte[:])
valueBuf.Write(value)

ret := NewMQMessage(keyBuf.Bytes(), valueBuf.Bytes(), e.CommitTs)
ret := newDDLMQMessage(ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), e)
return ret, nil
}

Expand All @@ -464,7 +467,8 @@ func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
if d.valueBuf.Len() == 0 {
return nil
}
ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0)
/* there could be multiple types of event encoded within a single message which means the type is not sure */
ret := NewMQMessage(ProtocolDefault, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown, nil, nil)
return []*MQMessage{ret}
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (d *MaxwellEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage
return nil, errors.Trace(err)
}

return NewMQMessage(key, value, e.CommitTs), nil
return newDDLMQMessage(ProtocolMaxwell, key, value, e), nil
}

// Build implements the EventBatchEncoder interface
Expand All @@ -277,7 +277,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage {
return nil
}

ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0)
ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
d.Reset()
return []*MQMessage{ret}
}
Expand Down
18 changes: 9 additions & 9 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
if msg == nil {
return nil
}
err = k.writeToProducer(ctx, msg.Key, msg.Value, codec.EncoderNeedSyncWrite, -1)
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1)
return errors.Trace(err)
}

Expand All @@ -255,7 +255,7 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return nil
}
log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commit-ts", ddl.CommitTs))
err = k.writeToProducer(ctx, msg.Key, msg.Value, codec.EncoderNeedSyncWrite, -1)
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1)
return errors.Trace(err)
}

Expand Down Expand Up @@ -299,7 +299,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
}

for _, msg := range messages {
err := k.writeToProducer(ctx, msg.Key, msg.Value, codec.EncoderNeedAsyncWrite, partition)
err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -363,27 +363,27 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
}
}

func (k *mqSink) writeToProducer(ctx context.Context, key []byte, value []byte, op codec.EncoderResult, partition int32) error {
func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, op codec.EncoderResult, partition int32) error {
switch op {
case codec.EncoderNeedAsyncWrite:
if partition >= 0 {
return k.mqProducer.SendMessage(ctx, key, value, partition)
return k.mqProducer.SendMessage(ctx, message, partition)
}
return cerror.ErrAsyncBroadcaseNotSupport.GenWithStackByArgs()
case codec.EncoderNeedSyncWrite:
if partition >= 0 {
err := k.mqProducer.SendMessage(ctx, key, value, partition)
err := k.mqProducer.SendMessage(ctx, message, partition)
if err != nil {
return err
}
return k.mqProducer.Flush(ctx)
}
return k.mqProducer.SyncBroadcastMessage(ctx, key, value)
return k.mqProducer.SyncBroadcastMessage(ctx, message)
}

log.Warn("writeToProducer called with no-op",
zap.ByteString("key", key),
zap.ByteString("value", value),
zap.ByteString("key", message.Key),
zap.ByteString("value", message.Value),
zap.Int32("partition", partition))
return nil
}
Expand Down
Loading

0 comments on commit 08d535c

Please sign in to comment.