Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: Kafka message length and batch size limit for Open Protocol #1079

Merged
merged 21 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func (a *AvroEventBatchEncoder) Size() int {
return sum
}

// SetParams is no-op for now
func (a *AvroEventBatchEncoder) SetParams(params map[string]string) error {
// no op
return nil
}

func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column) (*avroEncodeResult, error) {
schemaGen := func() (string, error) {
schema, err := ColumnInfoToAvroSchema(table.Table, cols)
Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (d *CanalEventBatchEncoder) Reset() {
panic("Reset only used for JsonEncoder")
}

// SetParams is no-op for now
func (d *CanalEventBatchEncoder) SetParams(params map[string]string) error {
// no op
return nil
}

// refreshPacketBody() marshals the messages to the packet body
func (d *CanalEventBatchEncoder) refreshPacketBody() error {
oldSize := len(d.packet.Body)
Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,9 @@ func (c *CanalFlatEventBatchEncoder) Size() int {
func (c *CanalFlatEventBatchEncoder) Reset() {
panic("not supported")
}

// SetParams is no-op for now
func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error {
// no op
return nil
}
8 changes: 8 additions & 0 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ type EventBatchEncoder interface {
// TODO decouple it out
MixedBuild(withVersion bool) []byte
// Size returns the size of the batch(bytes)
// Deprecated: Size is deprecated
Size() int
// Reset reset the kv buffer
Reset()
// SetParams provides the encoder with more info on the sink
SetParams(params map[string]string) error
}

// MQMessage represents an MQ message to the mqSink
Expand All @@ -54,6 +57,11 @@ type MQMessage struct {
Ts uint64 // reserved for possible output sorting
}

// Length returns the expected size of the Kafka message
func (m *MQMessage) Length() int {
return len(m.Key) + len(m.Value)
}

// NewMQMessage should be used when creating a MQMessage struct.
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(key []byte, value []byte, ts uint64) *MQMessage {
Expand Down
95 changes: 76 additions & 19 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 64 * 1024 * 1024 // 64M
)

type column struct {
Expand Down Expand Up @@ -297,9 +299,16 @@ func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent

// JSONEventBatchEncoder encodes the events into the byte of a batch into.
type JSONEventBatchEncoder struct {
keyBuf *bytes.Buffer
valueBuf *bytes.Buffer
supportMixedBuild bool // TODO decouple this out
// TODO remove deprecated fields
keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
supportMixedBuild bool // TODO decouple this out

messageBuf []*MQMessage
curBatchSize int
// configs
maxKafkaMessageSize int
maxBatchSize int
}

// SetMixedBuildSupport is used by CDC Log
Expand Down Expand Up @@ -363,11 +372,37 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
var valueLenByte [8]byte
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)
if d.supportMixedBuild {
d.keyBuf.Write(keyLenByte[:])
d.keyBuf.Write(key)

d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
} else {
if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize {

d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)

d.messageBuf = append(d.messageBuf, NewMQMessage(versionHead, nil, 0))
d.curBatchSize = 0
}

message := d.messageBuf[len(d.messageBuf)-1]
message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)

if message.Length() > d.maxKafkaMessageSize {
// `len(d.messageBuf) == 1` is implied
log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.",
zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize))
}
d.curBatchSize++
}
return EncoderNoOperation, nil
}

Expand Down Expand Up @@ -413,20 +448,17 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e

// Build implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
if d.valueBuf.Len() == 0 {
return nil
if d.supportMixedBuild {
if d.valueBuf.Len() == 0 {
return nil
}
ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0)
return []*MQMessage{ret}
}

ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0)

if !d.supportMixedBuild {
d.keyBuf.Reset()
d.valueBuf.Reset()
var versionByte [8]byte
binary.BigEndian.PutUint64(versionByte[:], BatchVersion1)
d.keyBuf.Write(versionByte[:])
}
return []*MQMessage{ret}
ret := d.messageBuf
d.messageBuf = make([]*MQMessage, 0)
return ret
}

// MixedBuild implements the EventBatchEncoder interface
Expand Down Expand Up @@ -481,6 +513,31 @@ func (d *JSONEventBatchEncoder) Reset() {
d.valueBuf.Reset()
}

// SetParams reads relevant parameters for Open Protocol
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes)
if err != nil {
// TODO add error code
return errors.Trace(err)
}
} else {
d.maxKafkaMessageSize = DefaultMaxMessageBytes
}

if maxBatchSize, ok := params["max-batch-size"]; ok {
d.maxBatchSize, err = strconv.Atoi(maxBatchSize)
if err != nil {
// TODO add error code
return errors.Trace(err)
}
} else {
d.maxBatchSize = 4096
}
return nil
}

// NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder.
func NewJSONEventBatchEncoder() EventBatchEncoder {
batch := &JSONEventBatchEncoder{
Expand Down
75 changes: 73 additions & 2 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco

for _, cs := range s.rowCases {
encoder := newEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder := newEncoder()
mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for _, row := range cs {
Expand All @@ -151,10 +154,8 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
checkRowDecoder(mixedDecoder, cs)
// test normal decode
if len(cs) > 0 {
size := encoder.Size()
res := encoder.Build()
c.Assert(res, check.HasLen, 1)
c.Assert(len(res[0].Key)+len(res[0].Value), check.Equals, size)
decoder, err := newDecoder(res[0].Key, res[0].Value)
c.Assert(err, check.IsNil)
checkRowDecoder(decoder, cs)
Expand All @@ -164,6 +165,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
for _, cs := range s.ddlCases {
encoder := newEncoder()
mixedEncoder := newEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for i, ddl := range cs {
msg, err := encoder.EncodeDDLEvent(ddl)
Expand All @@ -189,6 +193,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
for _, cs := range s.resolvedTsCases {
encoder := newEncoder()
mixedEncoder := newEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"})
c.Assert(err, check.IsNil)

mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for i, ts := range cs {
msg, err := encoder.EncodeCheckpointEvent(ts)
Expand All @@ -212,6 +219,70 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
}
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages := encoder.Build()
for _, msg := range messages {
c.Assert(msg.Length(), check.LessEqual, 256)
}
}

func (s *batchSuite) TestMaxBatchSize(c *check.C) {
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages := encoder.Build()
sum := 0
for _, msg := range messages {
decoder, err := NewJSONEventBatchDecoder(msg.Key, msg.Value)
c.Check(err, check.IsNil)
count := 0
for {
t, hasNext, err := decoder.HasNext()
c.Check(err, check.IsNil)
if !hasNext {
break
}

c.Check(t, check.Equals, model.MqMessageTypeRow)
_, err = decoder.NextRowChangedEvent()
c.Check(err, check.IsNil)
count++
}
c.Check(count, check.LessEqual, 64)
sum += count
}
c.Check(sum, check.Equals, 10000)
}

func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) {
defer testleak.AfterTest(c)()
s.testBatchCodec(c, func() EventBatchEncoder {
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEven
return EncoderNoOperation, nil
}

// SetParams is no-op for Maxwell for now
func (d *MaxwellEventBatchEncoder) SetParams(params map[string]string) error {
return nil
}

// Column represents a column in maxwell
type Column struct {
Type string `json:"type"`
Expand Down
31 changes: 31 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ func newMqSink(
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled"))
}

// pre-flight verification of encoder parameters
if err := newEncoder().SetParams(opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

newEncoder1 := newEncoder
newEncoder = func() codec.EventBatchEncoder {
ret := newEncoder1()
err := ret.SetParams(opts)
if err != nil {
log.Fatal("MQ Encoder could not parse parameters", zap.Error(err))
}
return ret
}

k := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
Expand Down Expand Up @@ -404,6 +419,12 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
config.MaxMessageBytes = c
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("compression")
Expand Down Expand Up @@ -456,6 +477,16 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
replicaConfig.Sink.Protocol = s
}
// These two options are not used by Pulsar producer itself, but the encoders
s = sinkURI.Query().Get("max-message-bytes")
if s != "" {
opts["max-message-bytes"] = s
}

s = sinkURI.Query().Get("max-batch-size")
if s != "" {
opts["max-message-bytes"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
credential := &security.Credential{}
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
NumPartitions: partitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
if err != nil {

// TODO idenfity the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion integration/framework/avro/kafka_single_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (a *SingleTableTask) Name() string {
func (a *SingleTableTask) GetCDCProfile() *framework.CDCProfile {
return &framework.CDCProfile{
PDUri: "http://upstream-pd:2379",
SinkURI: "kafka://kafka:9092/testdb_" + a.TableName + "?protocol=avro",
SinkURI: "kafka://kafka:9092/testdb_" + a.TableName + "?kafka-version=2.6.0&protocol=avro",
Opts: map[string]string{"registry": "http://schema-registry:8081"},
}
}
Expand Down
2 changes: 1 addition & 1 deletion integration/framework/canal/kafka_single_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *SingleTableTask) GetCDCProfile() *framework.CDCProfile {
}
return &framework.CDCProfile{
PDUri: "http://upstream-pd:2379",
SinkURI: "kafka://kafka:9092/" + testDbName + "?protocol=" + protocol,
SinkURI: "kafka://kafka:9092/" + testDbName + "?kafka-version=2.6.0&protocol=" + protocol,
Opts: map[string]string{"force-handle-key-pkey": "true", "support-txn": "true"},
ConfigFile: "/config/canal-test-config.toml",
}
Expand Down
Loading