Skip to content

Commit

Permalink
sink: Kafka message length and batch size limit for Open Protocol (#1079
Browse files Browse the repository at this point in the history
)
  • Loading branch information
liuzix authored Nov 24, 2020
1 parent 0138edc commit 2958b02
Show file tree
Hide file tree
Showing 16 changed files with 429 additions and 31 deletions.
6 changes: 6 additions & 0 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func (a *AvroEventBatchEncoder) Size() int {
return sum
}

// SetParams is no-op for now
func (a *AvroEventBatchEncoder) SetParams(params map[string]string) error {
// no op
return nil
}

func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column) (*avroEncodeResult, error) {
schemaGen := func() (string, error) {
schema, err := ColumnInfoToAvroSchema(table.Table, cols)
Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (d *CanalEventBatchEncoder) Reset() {
panic("Reset only used for JsonEncoder")
}

// SetParams is no-op for now
func (d *CanalEventBatchEncoder) SetParams(params map[string]string) error {
// no op
return nil
}

// refreshPacketBody() marshals the messages to the packet body
func (d *CanalEventBatchEncoder) refreshPacketBody() error {
oldSize := len(d.packet.Body)
Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,9 @@ func (c *CanalFlatEventBatchEncoder) Size() int {
func (c *CanalFlatEventBatchEncoder) Reset() {
panic("not supported")
}

// SetParams is no-op for now
func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error {
// no op
return nil
}
8 changes: 8 additions & 0 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ type EventBatchEncoder interface {
// TODO decouple it out
MixedBuild(withVersion bool) []byte
// Size returns the size of the batch(bytes)
// Deprecated: Size is deprecated
Size() int
// Reset reset the kv buffer
Reset()
// SetParams provides the encoder with more info on the sink
SetParams(params map[string]string) error
}

// MQMessage represents an MQ message to the mqSink
Expand All @@ -54,6 +57,11 @@ type MQMessage struct {
Ts uint64 // reserved for possible output sorting
}

// Length returns the expected size of the Kafka message
func (m *MQMessage) Length() int {
return len(m.Key) + len(m.Value)
}

// NewMQMessage should be used when creating a MQMessage struct.
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(key []byte, value []byte, ts uint64) *MQMessage {
Expand Down
95 changes: 76 additions & 19 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 64 * 1024 * 1024 // 64M
)

type column struct {
Expand Down Expand Up @@ -297,9 +299,16 @@ func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent

// JSONEventBatchEncoder encodes the events into the byte of a batch into.
type JSONEventBatchEncoder struct {
keyBuf *bytes.Buffer
valueBuf *bytes.Buffer
supportMixedBuild bool // TODO decouple this out
// TODO remove deprecated fields
keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
supportMixedBuild bool // TODO decouple this out

messageBuf []*MQMessage
curBatchSize int
// configs
maxKafkaMessageSize int
maxBatchSize int
}

// SetMixedBuildSupport is used by CDC Log
Expand Down Expand Up @@ -363,11 +372,37 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
var valueLenByte [8]byte
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)
if d.supportMixedBuild {
d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)

d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
} else {
if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize {

d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)

d.messageBuf = append(d.messageBuf, NewMQMessage(versionHead, nil, 0))
d.curBatchSize = 0
}

message := d.messageBuf[len(d.messageBuf)-1]
message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)

if message.Length() > d.maxKafkaMessageSize {
// `len(d.messageBuf) == 1` is implied
log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
}
d.curBatchSize++
}
return EncoderNoOperation, nil
}

Expand Down Expand Up @@ -413,20 +448,17 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e

// Build implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
if d.valueBuf.Len() == 0 {
return nil
if d.supportMixedBuild {
if d.valueBuf.Len() == 0 {
return nil
}
ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0)
return []*MQMessage{ret}
}

ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0)

if !d.supportMixedBuild {
d.keyBuf.Reset()
d.valueBuf.Reset()
var versionByte [8]byte
binary.BigEndian.PutUint64(versionByte[:], BatchVersion1)
d.keyBuf.Write(versionByte[:])
}
return []*MQMessage{ret}
ret := d.messageBuf
d.messageBuf = make([]*MQMessage, 0)
return ret
}

// MixedBuild implements the EventBatchEncoder interface
Expand Down Expand Up @@ -481,6 +513,31 @@ func (d *JSONEventBatchEncoder) Reset() {
d.valueBuf.Reset()
}

// SetParams reads relevant parameters for Open Protocol
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes)
if err != nil {
// TODO add error code
return errors.Trace(err)
}
} else {
d.maxKafkaMessageSize = DefaultMaxMessageBytes
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
d.maxBatchSize, err = strconv.Atoi(maxBatchSize)
if err != nil {
// TODO add error code
return errors.Trace(err)
}
} else {
d.maxBatchSize = 4096
}
return nil
}

// NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder.
func NewJSONEventBatchEncoder() EventBatchEncoder {
batch := &JSONEventBatchEncoder{
Expand Down
75 changes: 73 additions & 2 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco

for _, cs := range s.rowCases {
encoder := newEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder := newEncoder()
mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for _, row := range cs {
Expand All @@ -151,10 +154,8 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
checkRowDecoder(mixedDecoder, cs)
// test normal decode
if len(cs) > 0 {
size := encoder.Size()
res := encoder.Build()
c.Assert(res, check.HasLen, 1)
c.Assert(len(res[0].Key)+len(res[0].Value), check.Equals, size)
decoder, err := newDecoder(res[0].Key, res[0].Value)
c.Assert(err, check.IsNil)
checkRowDecoder(decoder, cs)
Expand All @@ -164,6 +165,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
for _, cs := range s.ddlCases {
encoder := newEncoder()
mixedEncoder := newEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for i, ddl := range cs {
msg, err := encoder.EncodeDDLEvent(ddl)
Expand All @@ -189,6 +193,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
for _, cs := range s.resolvedTsCases {
encoder := newEncoder()
mixedEncoder := newEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for i, ts := range cs {
msg, err := encoder.EncodeCheckpointEvent(ts)
Expand All @@ -212,6 +219,70 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
}
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages := encoder.Build()
for _, msg := range messages {
c.Assert(msg.Length(), check.LessEqual, 256)
}
}

func (s *batchSuite) TestMaxBatchSize(c *check.C) {
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages := encoder.Build()
sum := 0
for _, msg := range messages {
decoder, err := NewJSONEventBatchDecoder(msg.Key, msg.Value)
c.Check(err, check.IsNil)
count := 0
for {
t, hasNext, err := decoder.HasNext()
c.Check(err, check.IsNil)
if !hasNext {
break
}

c.Check(t, check.Equals, model.MqMessageTypeRow)
_, err = decoder.NextRowChangedEvent()
c.Check(err, check.IsNil)
count++
}
c.Check(count, check.LessEqual, 64)
sum += count
}
c.Check(sum, check.Equals, 10000)
}

func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) {
defer testleak.AfterTest(c)()
s.testBatchCodec(c, func() EventBatchEncoder {
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEven
return EncoderNoOperation, nil
}

// SetParams is no-op for Maxwell for now
func (d *MaxwellEventBatchEncoder) SetParams(params map[string]string) error {
return nil
}

// Column represents a column in maxwell
type Column struct {
Type string `json:"type"`
Expand Down
31 changes: 31 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ func newMqSink(
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled"))
}

// pre-flight verification of encoder parameters
if err := newEncoder().SetParams(opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

newEncoder1 := newEncoder
newEncoder = func() codec.EventBatchEncoder {
ret := newEncoder1()
err := ret.SetParams(opts)
if err != nil {
log.Fatal("MQ Encoder could not parse parameters", zap.Error(err))
}
return ret
}

k := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
Expand Down Expand Up @@ -404,6 +419,12 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.MaxMessageBytes = c
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("compression")
Expand Down Expand Up @@ -456,6 +477,16 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
replicaConfig.Sink.Protocol = s
}
// These two options are not used by Pulsar producer itself, but the encoders
s = sinkURI.Query().Get("max-message-bytes")
if s != "" {
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-message-bytes"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
credential := &security.Credential{}
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
NumPartitions: partitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
if err != nil {

// TODO idenfity the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion integration/framework/avro/kafka_single_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (a *SingleTableTask) Name() string {
func (a *SingleTableTask) GetCDCProfile() *framework.CDCProfile {
return &framework.CDCProfile{
PDUri: "http://upstream-pd:2379",
SinkURI: "kafka://kafka:9092/testdb_" + a.TableName + "?protocol=avro",
SinkURI: "kafka://kafka:9092/testdb_" + a.TableName + "?kafka-version=2.6.0&protocol=avro",
Opts: map[string]string{"registry": "http://schema-registry:8081"},
}
}
Expand Down
2 changes: 1 addition & 1 deletion integration/framework/canal/kafka_single_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *SingleTableTask) GetCDCProfile() *framework.CDCProfile {
}
return &framework.CDCProfile{
PDUri: "http://upstream-pd:2379",
SinkURI: "kafka://kafka:9092/" + testDbName + "?protocol=" + protocol,
SinkURI: "kafka://kafka:9092/" + testDbName + "?kafka-version=2.6.0&protocol=" + protocol,
Opts: map[string]string{"force-handle-key-pkey": "true", "support-txn": "true"},
ConfigFile: "/config/canal-test-config.toml",
}
Expand Down
Loading

0 comments on commit 2958b02

Please sign in to comment.