From 4e5ace9edd4e666b986b78366bcda563c846ba53 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 9 Nov 2020 16:03:33 +0800 Subject: [PATCH 01/20] WIP --- cdc/sink/codec/avro.go | 5 +++++ cdc/sink/codec/canal.go | 5 +++++ cdc/sink/codec/canal_flat.go | 5 +++++ cdc/sink/codec/interface.go | 2 ++ cdc/sink/codec/json.go | 23 +++++++++++++++++------ 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index 699087b6886..4d5e1c2b8f6 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -163,6 +163,11 @@ func (a *AvroEventBatchEncoder) Size() int { return sum } +// SetMaxMessageBytes is no-op for now +func (a *AvroEventBatchEncoder) SetMaxMessageBytes(size int) { + // no op +} + func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column) (*avroEncodeResult, error) { schemaGen := func() (string, error) { schema, err := ColumnInfoToAvroSchema(table.Table, cols) diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index f1c11365fa3..4b1a4676f42 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -416,6 +416,11 @@ func (d *CanalEventBatchEncoder) Reset() { panic("Reset only used for JsonEncoder") } +// SetMaxMessageBytes is no-op for now +func (d *CanalEventBatchEncoder) SetMaxMessageBytes(size int) { + // no op +} + // refreshPacketBody() marshals the messages to the packet body func (d *CanalEventBatchEncoder) refreshPacketBody() error { oldSize := len(d.packet.Body) diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index f3e8d31c56e..928bce97a3f 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -241,3 +241,8 @@ func (c *CanalFlatEventBatchEncoder) Size() int { func (c *CanalFlatEventBatchEncoder) Reset() { panic("not supported") } + +// SetMaxMessageBytes is no-op for now +func (c *CanalFlatEventBatchEncoder) SetMaxMessageBytes(size int) { + // no op +} diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index c4b02643ade..16d3c78a55a 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -45,6 +45,8 @@ type EventBatchEncoder interface { Size() int // Reset reset the kv buffer Reset() + // SetMaxMessageBytes instructs the encoder not to generate any single message larger than `size`. + SetMaxMessageBytes(size int) } // MQMessage represents an MQ message to the mqSink diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 89078f6d182..085e371390e 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -297,9 +297,12 @@ func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent // JSONEventBatchEncoder encodes the events into the byte of a batch into. type JSONEventBatchEncoder struct { - keyBuf *bytes.Buffer - valueBuf *bytes.Buffer + // TODO remove deprecated fields + keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now + valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now supportMixedBuild bool // TODO decouple this out + + messageBuf []*MQMessage } // SetMixedBuildSupport is used by CDC Log @@ -363,11 +366,13 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) var valueLenByte [8]byte binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) - d.keyBuf.Write(keyLenByte[:]) - d.keyBuf.Write(key) + if d.supportMixedBuild { + d.keyBuf.Write(keyLenByte[:]) + d.keyBuf.Write(key) - d.valueBuf.Write(valueLenByte[:]) - d.valueBuf.Write(value) + d.valueBuf.Write(valueLenByte[:]) + d.valueBuf.Write(value) + } return EncoderNoOperation, nil } @@ -481,6 +486,12 @@ func (d *JSONEventBatchEncoder) Reset() { d.valueBuf.Reset() } +// SetMaxMessageBytes is no-op for now +func (d *JSONEventBatchEncoder) SetMaxMessageBytes(size int) { + // no op +} + + // NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder. func NewJSONEventBatchEncoder() EventBatchEncoder { batch := &JSONEventBatchEncoder{ From dfa304f9353aa1066ac719a90a0e82f78a2ad74d Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 13 Nov 2020 14:58:17 +0800 Subject: [PATCH 02/20] WIP --- cdc/sink/codec/interface.go | 5 +++++ cdc/sink/codec/json.go | 30 +++++++++++++++++++++++++----- cdc/sink/codec/maxwell.go | 6 ++++++ 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 16d3c78a55a..bf6c55342dc 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -56,6 +56,11 @@ type MQMessage struct { Ts uint64 // reserved for possible output sorting } +// Length returns the expected size of the Kafka message +func (m *MQMessage) Length() int { + return len(m.Key) + len(m.Value) +} + // NewMQMessage should be used when creating a MQMessage struct. // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. func NewMQMessage(key []byte, value []byte, ts uint64) *MQMessage { diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 085e371390e..bc076f98359 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -298,11 +298,15 @@ func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent // JSONEventBatchEncoder encodes the events into the byte of a batch into. type JSONEventBatchEncoder struct { // TODO remove deprecated fields - keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now - valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now - supportMixedBuild bool // TODO decouple this out + keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now + valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now + supportMixedBuild bool // TODO decouple this out - messageBuf []*MQMessage + messageBuf []*MQMessage + curBatchSize int + // configs + maxKafkaMessageSize int + maxBatchSize int } // SetMixedBuildSupport is used by CDC Log @@ -372,6 +376,23 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) d.valueBuf.Write(valueLenByte[:]) d.valueBuf.Write(value) + } else { + if len(d.messageBuf) == 0 || + d.curBatchSize >= d.maxBatchSize || + d.messageBuf[len(d.messageBuf) - 1].Length() + len(key) + len(value) + 16 > d.maxKafkaMessageSize { + + var versionByte []byte + binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) + + d.messageBuf = append(d.messageBuf, NewMQMessage(versionByte, nil, 0)) + d.curBatchSize = 0 + } + + message := d.messageBuf[len(d.messageBuf) - 1] + message.Key = append(message.Key, keyLenByte[:]...) + message.Key = append(message.Key, key...) + message.Value = append(message.Value, valueLenByte[:]...) + message.Value = append(message.Value, value...) } return EncoderNoOperation, nil } @@ -491,7 +512,6 @@ func (d *JSONEventBatchEncoder) SetMaxMessageBytes(size int) { // no op } - // NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder. func NewJSONEventBatchEncoder() EventBatchEncoder { batch := &JSONEventBatchEncoder{ diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 9f2fe7a4942..08276e65fda 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -32,6 +32,7 @@ type MaxwellEventBatchEncoder struct { batchSize int } + type maxwellMessage struct { Database string `json:"database"` Table string `json:"table"` @@ -148,6 +149,11 @@ func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEven return EncoderNoOperation, nil } +// SetMaxMessageBytes is no-op for Maxwell for now +func (d *MaxwellEventBatchEncoder) SetMaxMessageBytes(size int) { + panic("implement me") +} + // Column represents a column in maxwell type Column struct { Type string `json:"type"` From 5571804241fb8d82927c6efb5c741441ae67a222 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 13 Nov 2020 15:30:24 +0800 Subject: [PATCH 03/20] WIP --- cdc/sink/codec/avro.go | 2 +- cdc/sink/codec/canal.go | 2 +- cdc/sink/codec/canal_flat.go | 2 +- cdc/sink/codec/interface.go | 2 +- cdc/sink/codec/json.go | 2 +- cdc/sink/codec/maxwell.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index 4d5e1c2b8f6..ed813ed38e3 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -164,7 +164,7 @@ func (a *AvroEventBatchEncoder) Size() int { } // SetMaxMessageBytes is no-op for now -func (a *AvroEventBatchEncoder) SetMaxMessageBytes(size int) { +func (a *AvroEventBatchEncoder) SetParams(params map[string]interface{}) { // no op } diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 4b1a4676f42..38f7eb5e73b 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -417,7 +417,7 @@ func (d *CanalEventBatchEncoder) Reset() { } // SetMaxMessageBytes is no-op for now -func (d *CanalEventBatchEncoder) SetMaxMessageBytes(size int) { +func (d *CanalEventBatchEncoder) SetParams(params map[string]interface{}) { // no op } diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index 928bce97a3f..d3765f9ac3d 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -243,6 +243,6 @@ func (c *CanalFlatEventBatchEncoder) Reset() { } // SetMaxMessageBytes is no-op for now -func (c *CanalFlatEventBatchEncoder) SetMaxMessageBytes(size int) { +func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]interface{}) { // no op } diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index bf6c55342dc..10262a7203a 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -46,7 +46,7 @@ type EventBatchEncoder interface { // Reset reset the kv buffer Reset() // SetMaxMessageBytes instructs the encoder not to generate any single message larger than `size`. - SetMaxMessageBytes(size int) + SetParams(params map[string]interface{}) } // MQMessage represents an MQ message to the mqSink diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index bc076f98359..4761181711d 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -508,7 +508,7 @@ func (d *JSONEventBatchEncoder) Reset() { } // SetMaxMessageBytes is no-op for now -func (d *JSONEventBatchEncoder) SetMaxMessageBytes(size int) { +func (d *JSONEventBatchEncoder) SetParams(params map[string]interface{}) { // no op } diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 08276e65fda..45a69f3a471 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -150,7 +150,7 @@ func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEven } // SetMaxMessageBytes is no-op for Maxwell for now -func (d *MaxwellEventBatchEncoder) SetMaxMessageBytes(size int) { +func (d *MaxwellEventBatchEncoder) SetParams(params map[string]interface{}) { panic("implement me") } From 3b2fa36b3fd519ee0ad2275098bd680c431ace66 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 13 Nov 2020 16:35:01 +0800 Subject: [PATCH 04/20] WIP --- cdc/sink/codec/avro.go | 3 ++- cdc/sink/codec/canal.go | 3 ++- cdc/sink/codec/canal_flat.go | 3 ++- cdc/sink/codec/interface.go | 4 ++-- cdc/sink/codec/json.go | 26 +++++++++++++++++++++++--- cdc/sink/codec/maxwell.go | 3 ++- cdc/sink/mq.go | 2 ++ 7 files changed, 35 insertions(+), 9 deletions(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index ed813ed38e3..89cadccab65 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -164,8 +164,9 @@ func (a *AvroEventBatchEncoder) Size() int { } // SetMaxMessageBytes is no-op for now -func (a *AvroEventBatchEncoder) SetParams(params map[string]interface{}) { +func (a *AvroEventBatchEncoder) SetParams(params map[string]string) error { // no op + return nil } func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column) (*avroEncodeResult, error) { diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 38f7eb5e73b..c179624ff93 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -417,8 +417,9 @@ func (d *CanalEventBatchEncoder) Reset() { } // SetMaxMessageBytes is no-op for now -func (d *CanalEventBatchEncoder) SetParams(params map[string]interface{}) { +func (d *CanalEventBatchEncoder) SetParams(params map[string]string) error { // no op + return nil } // refreshPacketBody() marshals the messages to the packet body diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index d3765f9ac3d..3aa0359e087 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -243,6 +243,7 @@ func (c *CanalFlatEventBatchEncoder) Reset() { } // SetMaxMessageBytes is no-op for now -func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]interface{}) { +func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error { // no op + return nil } diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 10262a7203a..a16bfd29767 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -45,8 +45,8 @@ type EventBatchEncoder interface { Size() int // Reset reset the kv buffer Reset() - // SetMaxMessageBytes instructs the encoder not to generate any single message larger than `size`. - SetParams(params map[string]interface{}) + // SetParams provides the encoder with more info on the sink + SetParams(params map[string]string) error } // MQMessage represents an MQ message to the mqSink diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 4761181711d..7e6f997b8f6 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -507,9 +507,29 @@ func (d *JSONEventBatchEncoder) Reset() { d.valueBuf.Reset() } -// SetMaxMessageBytes is no-op for now -func (d *JSONEventBatchEncoder) SetParams(params map[string]interface{}) { - // no op +// SetParams is no-op for now +func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { + var err error + if maxMessageBytes, ok := params["max-message-bytes"]; ok { + d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes) + if err != nil { + // TODO add error code + return errors.Trace(err) + } + } else { + d.maxKafkaMessageSize = 64 * 1024 * 1024 // 64M + } + + if maxBatchSize, ok := params["max-batch-size"]; ok { + d.maxBatchSize, err = strconv.Atoi(maxBatchSize) + if err != nil { + // TODO add error code + return errors.Trace(err) + } + } else { + d.maxBatchSize = 4096 + } + return nil } // NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder. diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 45a69f3a471..337c402a2b6 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -150,8 +150,9 @@ func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEven } // SetMaxMessageBytes is no-op for Maxwell for now -func (d *MaxwellEventBatchEncoder) SetParams(params map[string]interface{}) { +func (d *MaxwellEventBatchEncoder) SetParams(params map[string]string) error { panic("implement me") + return nil } // Column represents a column in maxwell diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 4797489f511..8896ab7ff38 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -111,6 +111,7 @@ func newMqSink( return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled")) } + k := &mqSink{ mqProducer: mqProducer, dispatcher: d, @@ -404,6 +405,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } config.MaxMessageBytes = c + opts["max-message-bytes"] = strconv.Itoa(c) } s = sinkURI.Query().Get("compression") From e4a597c28431ed3fc4ee8b093eacc543c69e308f Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 16 Nov 2020 12:19:24 +0800 Subject: [PATCH 05/20] WIP --- cdc/sink/codec/interface.go | 1 + cdc/sink/codec/json.go | 27 ++++++++++++--------------- cdc/sink/mq.go | 16 ++++++++++++++++ 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index a16bfd29767..584fd322076 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -42,6 +42,7 @@ type EventBatchEncoder interface { // TODO decouple it out MixedBuild(withVersion bool) []byte // Size returns the size of the batch(bytes) + // Deprecated: Size is deprecated Size() int // Reset reset the kv buffer Reset() diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 7e6f997b8f6..dddc744df42 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -381,10 +381,10 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) d.curBatchSize >= d.maxBatchSize || d.messageBuf[len(d.messageBuf) - 1].Length() + len(key) + len(value) + 16 > d.maxKafkaMessageSize { - var versionByte []byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) + versionHead := make([]byte, 8) + binary.BigEndian.PutUint64(versionHead, BatchVersion1) - d.messageBuf = append(d.messageBuf, NewMQMessage(versionByte, nil, 0)) + d.messageBuf = append(d.messageBuf, NewMQMessage(versionHead, nil, 0)) d.curBatchSize = 0 } @@ -439,20 +439,17 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e // Build implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { - if d.valueBuf.Len() == 0 { - return nil + if d.supportMixedBuild { + if d.valueBuf.Len() == 0 { + return nil + } + ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0) + return []*MQMessage{ret} } - ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0) - - if !d.supportMixedBuild { - d.keyBuf.Reset() - d.valueBuf.Reset() - var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) - d.keyBuf.Write(versionByte[:]) - } - return []*MQMessage{ret} + ret := d.messageBuf + d.messageBuf = make([]*MQMessage, 0) + return ret } // MixedBuild implements the EventBatchEncoder interface diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 8896ab7ff38..83be7a831a6 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -111,6 +111,22 @@ func newMqSink( return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled")) } + // pre-flight verification of encoder parameters + if err := newEncoder().SetParams(opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + + newEncoder1 := newEncoder + newEncoder = func() codec.EventBatchEncoder { + ret := newEncoder1() + err := ret.SetParams(opts) + if err != nil { + log.Fatal("MQ Encoder could not parse parameters", zap.Error(err)) + } + return ret + } + k := &mqSink{ mqProducer: mqProducer, From 49d2396fe9711d7c219da4419b6a96d38dfc5211 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 16 Nov 2020 13:32:28 +0800 Subject: [PATCH 06/20] fix --- cdc/sink/codec/avro.go | 2 +- cdc/sink/codec/canal.go | 2 +- cdc/sink/codec/canal_flat.go | 2 +- cdc/sink/codec/json.go | 9 +-- cdc/sink/codec/json_test.go | 75 +++++++++++++++++++++- cdc/sink/codec/maxwell.go | 4 +- cdc/sink/mq.go | 2 - kafka_consumer/main.go | 25 ++++++-- tests/kafka_messages/conf/diff_config.toml | 27 ++++++++ tests/kafka_messages/conf/workload | 13 ++++ tests/kafka_messages/run.sh | 69 ++++++++++++++++++++ 11 files changed, 211 insertions(+), 19 deletions(-) create mode 100644 tests/kafka_messages/conf/diff_config.toml create mode 100644 tests/kafka_messages/conf/workload create mode 100755 tests/kafka_messages/run.sh diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index 89cadccab65..87250dc031b 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -163,7 +163,7 @@ func (a *AvroEventBatchEncoder) Size() int { return sum } -// SetMaxMessageBytes is no-op for now +// SetParams is no-op for now func (a *AvroEventBatchEncoder) SetParams(params map[string]string) error { // no op return nil diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index c179624ff93..9bd2a88c29f 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -416,7 +416,7 @@ func (d *CanalEventBatchEncoder) Reset() { panic("Reset only used for JsonEncoder") } -// SetMaxMessageBytes is no-op for now +// SetParams is no-op for now func (d *CanalEventBatchEncoder) SetParams(params map[string]string) error { // no op return nil diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index 3aa0359e087..7945a4781b7 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -242,7 +242,7 @@ func (c *CanalFlatEventBatchEncoder) Reset() { panic("not supported") } -// SetMaxMessageBytes is no-op for now +// SetParams is no-op for now func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error { // no op return nil diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index dddc744df42..d9eda92b819 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -379,7 +379,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) } else { if len(d.messageBuf) == 0 || d.curBatchSize >= d.maxBatchSize || - d.messageBuf[len(d.messageBuf) - 1].Length() + len(key) + len(value) + 16 > d.maxKafkaMessageSize { + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize { versionHead := make([]byte, 8) binary.BigEndian.PutUint64(versionHead, BatchVersion1) @@ -388,11 +388,12 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) d.curBatchSize = 0 } - message := d.messageBuf[len(d.messageBuf) - 1] + message := d.messageBuf[len(d.messageBuf)-1] message.Key = append(message.Key, keyLenByte[:]...) message.Key = append(message.Key, key...) message.Value = append(message.Value, valueLenByte[:]...) message.Value = append(message.Value, value...) + d.curBatchSize++ } return EncoderNoOperation, nil } @@ -504,7 +505,7 @@ func (d *JSONEventBatchEncoder) Reset() { d.valueBuf.Reset() } -// SetParams is no-op for now +// SetParams reads relevant parameters for Open Protocol func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { var err error if maxMessageBytes, ok := params["max-message-bytes"]; ok { @@ -514,7 +515,7 @@ func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { return errors.Trace(err) } } else { - d.maxKafkaMessageSize = 64 * 1024 * 1024 // 64M + d.maxKafkaMessageSize = 64 * 1024 * 1024 // 64M } if maxBatchSize, ok := params["max-batch-size"]; ok { diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 7f5c0ff0572..3ea0ef5f6e1 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -132,6 +132,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco for _, cs := range s.rowCases { encoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + mixedEncoder := newEncoder() mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) for _, row := range cs { @@ -150,10 +153,8 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco checkRowDecoder(mixedDecoder, cs) // test normal decode if len(cs) > 0 { - size := encoder.Size() res := encoder.Build() c.Assert(res, check.HasLen, 1) - c.Assert(len(res[0].Key)+len(res[0].Value), check.Equals, size) decoder, err := newDecoder(res[0].Key, res[0].Value) c.Assert(err, check.IsNil) checkRowDecoder(decoder, cs) @@ -163,6 +164,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco for _, cs := range s.ddlCases { encoder := newEncoder() mixedEncoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) for i, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) @@ -188,6 +192,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco for _, cs := range s.resolvedTsCases { encoder := newEncoder() mixedEncoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) for i, ts := range cs { msg, err := encoder.EncodeCheckpointEvent(ts) @@ -211,6 +218,70 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco } } +func (s *batchSuite) TestMaxMessageBytes(c *check.C) { + encoder := NewJSONEventBatchEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "256"}) + c.Check(err, check.IsNil) + + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + for _, msg := range messages { + c.Assert(msg.Length(), check.LessEqual, 256) + } +} + +func (s *batchSuite) TestMaxBatchSize(c *check.C) { + encoder := NewJSONEventBatchEncoder() + err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) + c.Check(err, check.IsNil) + + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + sum := 0 + for _, msg := range messages { + decoder, err := NewJSONEventBatchDecoder(msg.Key, msg.Value) + c.Check(err, check.IsNil) + count := 0 + for { + t, hasNext, err := decoder.HasNext() + c.Check(err, check.IsNil) + if !hasNext { + break + } + + c.Check(t, check.Equals, model.MqMessageTypeRow) + _, err = decoder.NextRowChangedEvent() + c.Check(err, check.IsNil) + count++ + } + c.Check(count, check.LessEqual, 64) + sum += count + } + c.Check(sum, check.Equals, 10000) +} + func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) { s.testBatchCodec(c, func() EventBatchEncoder { encoder := NewJSONEventBatchEncoder() diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 337c402a2b6..55de3c398f6 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -32,7 +32,6 @@ type MaxwellEventBatchEncoder struct { batchSize int } - type maxwellMessage struct { Database string `json:"database"` Table string `json:"table"` @@ -149,9 +148,8 @@ func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEven return EncoderNoOperation, nil } -// SetMaxMessageBytes is no-op for Maxwell for now +// SetParams is no-op for Maxwell for now func (d *MaxwellEventBatchEncoder) SetParams(params map[string]string) error { - panic("implement me") return nil } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 83be7a831a6..b72888ae674 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -116,7 +116,6 @@ func newMqSink( return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - newEncoder1 := newEncoder newEncoder = func() codec.EventBatchEncoder { ret := newEncoder1() @@ -127,7 +126,6 @@ func newMqSink( return ret } - k := &mqSink{ mqProducer: mqProducer, dispatcher: d, diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index 60d4c538471..b62da14cfa7 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -46,11 +46,12 @@ import ( // Sarama configuration options var ( - kafkaAddrs []string - kafkaTopic string - kafkaPartitionNum int32 - kafkaGroupID = fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()) - kafkaVersion = "2.4.0" + kafkaAddrs []string + kafkaTopic string + kafkaPartitionNum int32 + kafkaGroupID = fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()) + kafkaVersion = "2.4.0" + kafkaMaxMessageBytes = math.MaxInt64 downstreamURIStr string @@ -122,6 +123,16 @@ func init() { } kafkaPartitionNum = int32(c) } + + s = upstreamURI.Query().Get("max-message-bytes") + if s != "" { + c, err := strconv.Atoi(s) + if err != nil { + log.Fatal("invalid max-message-bytes of upstream-uri") + } + log.Info("Setting max-message-bytes", zap.Int("max-message-bytes", c)) + kafkaMaxMessageBytes = c + } } func getPartitionNum(address []string, topic string, cfg *sarama.Config) (int32, error) { @@ -370,6 +381,10 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram ClaimMessages: for message := range claim.Messages() { log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) + if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes { + log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), + zap.Int("recevied-bytes", len(message.Key)+len(message.Value))) + } batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value) if err != nil { return errors.Trace(err) diff --git a/tests/kafka_messages/conf/diff_config.toml b/tests/kafka_messages/conf/diff_config.toml new file mode 100644 index 00000000000..5124e9d3bd5 --- /dev/null +++ b/tests/kafka_messages/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "file_sort" + tables = ["~usertable.*"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/kafka_messages/conf/workload b/tests/kafka_messages/conf/workload new file mode 100644 index 00000000000..1763faf2338 --- /dev/null +++ b/tests/kafka_messages/conf/workload @@ -0,0 +1,13 @@ +threadcount=10 +recordcount=15000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/kafka_messages/run.sh b/tests/kafka_messages/run.sh new file mode 100755 index 00000000000..2f2a1c07c75 --- /dev/null +++ b/tests/kafka_messages/run.sh @@ -0,0 +1,69 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run() { + # test kafka sink only in this case + if [ "$SINK_TYPE" == "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + run_sql "CREATE DATABASE kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" + + TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=4096" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-dir="$sort_dir" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=4096" + fi + + # Add a check table to reduce check time, or if we check data with sync diff + # directly, there maybe a lot of diff data at first because of the incremental scan + run_sql "CREATE table kafka_message.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "truncate table kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_sql "CREATE table kafka_message.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message + run_sql "CREATE table kafka_message.check3(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.check3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "create table kafka_message.USERTABLE2 like kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into kafka_message.USERTABLE2 select * from kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table kafka_message.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From d36bd8f63b277d69e963c7a40c2f801162bdac59 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 16 Nov 2020 13:47:18 +0800 Subject: [PATCH 07/20] fix integration test --- tests/kafka_messages/conf/diff_config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafka_messages/conf/diff_config.toml b/tests/kafka_messages/conf/diff_config.toml index 5124e9d3bd5..fce21a177b7 100644 --- a/tests/kafka_messages/conf/diff_config.toml +++ b/tests/kafka_messages/conf/diff_config.toml @@ -10,7 +10,7 @@ fix-sql-file = "fix.sql" # tables need to check. [[check-tables]] - schema = "file_sort" + schema = "kafka_message" tables = ["~usertable.*"] [[source-db]] From 6544d706519671c24b402afb68f442fcca31983c Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 16 Nov 2020 14:16:17 +0800 Subject: [PATCH 08/20] Update Kafka Version in integration test framework --- integration/framework/avro/kafka_single_table.go | 2 +- integration/framework/canal/kafka_single_table.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/framework/avro/kafka_single_table.go b/integration/framework/avro/kafka_single_table.go index f9ba1c35032..bad70fe0c18 100644 --- a/integration/framework/avro/kafka_single_table.go +++ b/integration/framework/avro/kafka_single_table.go @@ -43,7 +43,7 @@ func (a *SingleTableTask) Name() string { func (a *SingleTableTask) GetCDCProfile() *framework.CDCProfile { return &framework.CDCProfile{ PDUri: "http://upstream-pd:2379", - SinkURI: "kafka://kafka:9092/testdb_" + a.TableName + "?protocol=avro", + SinkURI: "kafka://kafka:9092/testdb_" + a.TableName + "?kafka-version=2.6.0&protocol=avro", Opts: map[string]string{"registry": "http://schema-registry:8081"}, } } diff --git a/integration/framework/canal/kafka_single_table.go b/integration/framework/canal/kafka_single_table.go index f59344627d8..ad15240af15 100644 --- a/integration/framework/canal/kafka_single_table.go +++ b/integration/framework/canal/kafka_single_table.go @@ -47,7 +47,7 @@ func (c *SingleTableTask) GetCDCProfile() *framework.CDCProfile { } return &framework.CDCProfile{ PDUri: "http://upstream-pd:2379", - SinkURI: "kafka://kafka:9092/" + testDbName + "?protocol=" + protocol, + SinkURI: "kafka://kafka:9092/" + testDbName + "?kafka-version=2.6.0&protocol=" + protocol, Opts: map[string]string{"force-handle-key-pkey": "true", "support-txn": "true"}, ConfigFile: "/config/canal-test-config.toml", } From 297628389675fcc4ec3ab9e387e7f0a9dadec41b Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 16 Nov 2020 14:31:04 +0800 Subject: [PATCH 09/20] fix integration framework --- integration/framework/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/framework/task.go b/integration/framework/task.go index f5f4701cf10..c122af35e04 100644 --- a/integration/framework/task.go +++ b/integration/framework/task.go @@ -92,7 +92,7 @@ func (p *CDCProfile) String() string { log.Fatal("SinkURI cannot be empty!") } - builder.WriteString("--sink-uri=" + p.SinkURI + " ") + builder.WriteString("--sink-uri=\"" + p.SinkURI + "\" ") if p.ConfigFile != "" { builder.WriteString("--config=" + p.ConfigFile + " ") From ecde6041828cc33a969c3e5e7c963abd43c9d944 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 19 Nov 2020 12:35:52 +0800 Subject: [PATCH 10/20] add const; add warning --- cdc/sink/codec/json.go | 10 +++++++++- kafka_consumer/main.go | 13 +++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index d9eda92b819..000438a03cc 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -35,6 +35,8 @@ import ( const ( // BatchVersion1 represents the version of batch format BatchVersion1 uint64 = 1 + // DefaultMaxMessageBytes sets the default value for max-message-bytes + DefaultMaxMessageBytes int = 64 * 1024 * 1024 // 64M ) type column struct { @@ -393,6 +395,12 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) message.Key = append(message.Key, key...) message.Value = append(message.Value, valueLenByte[:]...) message.Value = append(message.Value, value...) + + if message.Length() > d.maxKafkaMessageSize { + // `len(d.messageBuf) == 1` is implied + log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", + zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize)) + } d.curBatchSize++ } return EncoderNoOperation, nil @@ -515,7 +523,7 @@ func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { return errors.Trace(err) } } else { - d.maxKafkaMessageSize = 64 * 1024 * 1024 // 64M + d.maxKafkaMessageSize = DefaultMaxMessageBytes } if maxBatchSize, ok := params["max-batch-size"]; ok { diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index b62da14cfa7..a8dabdc1a7f 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -381,14 +381,12 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram ClaimMessages: for message := range claim.Messages() { log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) - if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes { - log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), - zap.Int("recevied-bytes", len(message.Key)+len(message.Value))) - } batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value) if err != nil { return errors.Trace(err) } + + counter := 0 for { tp, hasNext, err := batchDecoder.HasNext() if err != nil { @@ -397,6 +395,13 @@ ClaimMessages: if !hasNext { break } + + counter++ + // If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning. + if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes && counter > 1 { + log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), + zap.Int("recevied-bytes", len(message.Key)+len(message.Value))) + } switch tp { case model.MqMessageTypeDDL: ddl, err := batchDecoder.NextDDLEvent() From 8c0e06f02b0070b84973fdc43123fb5e47748316 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 19 Nov 2020 13:21:21 +0800 Subject: [PATCH 11/20] add maxBatchSize in kafka_consumer --- kafka_consumer/main.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index a8dabdc1a7f..50daebb7a38 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -53,6 +53,7 @@ var ( kafkaVersion = "2.4.0" kafkaMaxMessageBytes = math.MaxInt64 + maxBatchSize int downstreamURIStr string logPath string @@ -72,6 +73,7 @@ func init() { flag.StringVar(&ca, "ca", "", "CA certificate path for Kafka SSL connection") flag.StringVar(&cert, "cert", "", "Certificate path for Kafka SSL connection") flag.StringVar(&key, "key", "", "Private key path for Kafka SSL connection") + flag.IntVar(&maxBatchSize, "max-batch-size", 4096,"The limit of acceptable batch size in Open Protocol") flag.Parse() @@ -402,6 +404,7 @@ ClaimMessages: log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), zap.Int("recevied-bytes", len(message.Key)+len(message.Value))) } + switch tp { case model.MqMessageTypeDDL: ddl, err := batchDecoder.NextDDLEvent() @@ -450,6 +453,11 @@ ClaimMessages: } session.MarkMessage(message, "") } + + if counter > maxBatchSize { + log.Fatal("Open Protocl max-batch-size exceeded", zap.Int("max-batch-size", maxBatchSize), + zap.Int("actual-batch-size", counter)) + } } return nil From b98121b95b41d7b40a6e9904d792c7df145ae6e7 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 19 Nov 2020 13:31:21 +0800 Subject: [PATCH 12/20] fix format --- kafka_consumer/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index 50daebb7a38..1431078eb55 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -53,7 +53,7 @@ var ( kafkaVersion = "2.4.0" kafkaMaxMessageBytes = math.MaxInt64 - maxBatchSize int + maxBatchSize int downstreamURIStr string logPath string @@ -73,7 +73,7 @@ func init() { flag.StringVar(&ca, "ca", "", "CA certificate path for Kafka SSL connection") flag.StringVar(&cert, "cert", "", "Certificate path for Kafka SSL connection") flag.StringVar(&key, "key", "", "Private key path for Kafka SSL connection") - flag.IntVar(&maxBatchSize, "max-batch-size", 4096,"The limit of acceptable batch size in Open Protocol") + flag.IntVar(&maxBatchSize, "max-batch-size", 4096, "The limit of acceptable batch size in Open Protocol") flag.Parse() From 3bb533d9f12ecb8c67d20e8a50fbae001bbec1de Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 19 Nov 2020 14:15:34 +0800 Subject: [PATCH 13/20] add batch size limit integration test --- tests/kafka_messages/run.sh | 59 ++++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/tests/kafka_messages/run.sh b/tests/kafka_messages/run.sh index 2f2a1c07c75..ae81e8cb484 100755 --- a/tests/kafka_messages/run.sh +++ b/tests/kafka_messages/run.sh @@ -11,7 +11,7 @@ SINK_TYPE=$1 CDC_COUNT=3 DB_COUNT=4 -function run() { +function run_length_limit() { # test kafka sink only in this case if [ "$SINK_TYPE" == "mysql" ]; then return @@ -23,6 +23,8 @@ function run() { cd $WORK_DIR + run_sql "CREATE DATABASE if exists kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) run_sql "CREATE DATABASE kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message @@ -64,6 +66,61 @@ function run() { cleanup_process $CDC_BINARY } +function run_batch_size_limit() { + # test kafka sink only in this case + if [ "$SINK_TYPE" == "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_sql "CREATE DATABASE if exists kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + run_sql "CREATE DATABASE kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" + + TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-dir="$sort_dir" --opts="max-batch-size=3" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3" + fi + + # Add a check table to reduce check time, or if we check data with sync diff + # directly, there maybe a lot of diff data at first because of the incremental scan + run_sql "CREATE table kafka_message.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "truncate table kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_sql "CREATE table kafka_message.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message + run_sql "CREATE table kafka_message.check3(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.check3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "create table kafka_message.USERTABLE2 like kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into kafka_message.USERTABLE2 select * from kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table kafka_message.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + trap stop_tidb_cluster EXIT run $* echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 1ab2e35c0ffa49e9788165f03228bd13909d0524 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 19 Nov 2020 16:56:29 +0800 Subject: [PATCH 14/20] fix integration test --- tests/kafka_messages/run.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/kafka_messages/run.sh b/tests/kafka_messages/run.sh index ae81e8cb484..38c3e9855c1 100755 --- a/tests/kafka_messages/run.sh +++ b/tests/kafka_messages/run.sh @@ -64,6 +64,7 @@ function run_length_limit() { check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml cleanup_process $CDC_BINARY + stop_tidb_cluster } function run_batch_size_limit() { @@ -119,8 +120,10 @@ function run_batch_size_limit() { check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml cleanup_process $CDC_BINARY + stop_tidb_cluster } trap stop_tidb_cluster EXIT -run $* +run_length_limit $* +run_batch_size_limit $* echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From b514306c8d9b5e72bfa89f8acb6f31752a7a0d6f Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 19 Nov 2020 17:31:03 +0800 Subject: [PATCH 15/20] fix integration test --- tests/kafka_messages/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafka_messages/run.sh b/tests/kafka_messages/run.sh index 38c3e9855c1..26288c0ca03 100755 --- a/tests/kafka_messages/run.sh +++ b/tests/kafka_messages/run.sh @@ -23,7 +23,7 @@ function run_length_limit() { cd $WORK_DIR - run_sql "CREATE DATABASE if exists kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "DROP DATABASE if exists kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) run_sql "CREATE DATABASE kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} @@ -79,7 +79,7 @@ function run_batch_size_limit() { cd $WORK_DIR - run_sql "CREATE DATABASE if exists kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "DROP DATABASE if exists kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) run_sql "CREATE DATABASE kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} From e62cf6605dbc38eea6b6ec54ddebfc8fa745665c Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 19 Nov 2020 17:40:39 +0800 Subject: [PATCH 16/20] ignore topic already exists error --- cdc/sink/producer/kafka/kafka.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 3c91fb81d00..70b10cf3932 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -306,7 +306,9 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c NumPartitions: partitionNum, ReplicationFactor: config.ReplicationFactor, }, false) - if err != nil { + + // TODO idenfity the cause of "Topic with this name already exists" + if err != nil && !strings.Contains(err.Error(), "already exists"){ return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } } From cc4a9b60fbc8da0ffa6a973072205862c2b85e66 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 19 Nov 2020 18:01:49 +0800 Subject: [PATCH 17/20] fix format --- cdc/sink/producer/kafka/kafka.go | 2 +- tests/_utils/run_cdc_server | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 70b10cf3932..872b51a2276 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -308,7 +308,7 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c }, false) // TODO idenfity the cause of "Topic with this name already exists" - if err != nil && !strings.Contains(err.Error(), "already exists"){ + if err != nil && !strings.Contains(err.Error(), "already exists") { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } } diff --git a/tests/_utils/run_cdc_server b/tests/_utils/run_cdc_server index ad55d087757..0ef79eef620 100755 --- a/tests/_utils/run_cdc_server +++ b/tests/_utils/run_cdc_server @@ -86,6 +86,7 @@ if [[ "$restart" == "true" ]]; then GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ + --sorter-max-memory-consumption 0 \ $tls \ $certcn \ $addr \ @@ -99,6 +100,7 @@ else GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ + --sorter-max-memory-consumption 0 \ $tls \ $certcn \ $addr \ From 7fad6916c3572eef437eab708de47d8a9f419f2c Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 13:32:35 +0800 Subject: [PATCH 18/20] fix sink uri params --- cdc/sink/mq.go | 17 ++++++++++++++++- kafka_consumer/main.go | 19 ++++++++++++++----- tests/kafka_messages/run.sh | 4 ++-- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index b72888ae674..fcaa3aa69e2 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -419,7 +419,12 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } config.MaxMessageBytes = c - opts["max-message-bytes"] = strconv.Itoa(c) + opts["max-message-bytes"] = s + } + + s = sinkURI.Query().Get("max-batch-size") + if s != "" { + opts["max-message-bytes"] = s } s = sinkURI.Query().Get("compression") @@ -472,6 +477,16 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if s != "" { replicaConfig.Sink.Protocol = s } + // These two options are not used by Pulsar producer itself, but the encoders + s = sinkURI.Query().Get("max-message-bytes") + if s != "" { + opts["max-message-bytes"] = s + } + + s = sinkURI.Query().Get("max-batch-size") + if s != "" { + opts["max-message-bytes"] = s + } // For now, it's a place holder. Avro format have to make connection to Schema Registery, // and it may needs credential. credential := &security.Credential{} diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index 1431078eb55..443d1bad4be 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -52,8 +52,8 @@ var ( kafkaGroupID = fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()) kafkaVersion = "2.4.0" kafkaMaxMessageBytes = math.MaxInt64 + kafkaMaxBatchSize = math.MaxInt64 - maxBatchSize int downstreamURIStr string logPath string @@ -73,8 +73,6 @@ func init() { flag.StringVar(&ca, "ca", "", "CA certificate path for Kafka SSL connection") flag.StringVar(&cert, "cert", "", "Certificate path for Kafka SSL connection") flag.StringVar(&key, "key", "", "Private key path for Kafka SSL connection") - flag.IntVar(&maxBatchSize, "max-batch-size", 4096, "The limit of acceptable batch size in Open Protocol") - flag.Parse() err := logutil.InitLogger(&logutil.Config{ @@ -135,6 +133,17 @@ func init() { log.Info("Setting max-message-bytes", zap.Int("max-message-bytes", c)) kafkaMaxMessageBytes = c } + + s = upstreamURI.Query().Get("max-batch-size") + if s != "" { + c, err := strconv.Atoi(s) + if err != nil { + log.Fatal("invalid max-batch-size of upstream-uri") + } + log.Info("Setting max-batch-size", zap.Int("max-batch-size", c)) + kafkaMaxBatchSize = c + } + } func getPartitionNum(address []string, topic string, cfg *sarama.Config) (int32, error) { @@ -454,8 +463,8 @@ ClaimMessages: session.MarkMessage(message, "") } - if counter > maxBatchSize { - log.Fatal("Open Protocl max-batch-size exceeded", zap.Int("max-batch-size", maxBatchSize), + if counter > kafkaMaxBatchSize { + log.Fatal("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize), zap.Int("actual-batch-size", counter)) } } diff --git a/tests/kafka_messages/run.sh b/tests/kafka_messages/run.sh index 26288c0ca03..548331a495d 100755 --- a/tests/kafka_messages/run.sh +++ b/tests/kafka_messages/run.sh @@ -87,8 +87,8 @@ function run_batch_size_limit() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" - SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-dir="$sort_dir" --opts="max-batch-size=3" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-dir="$sort_dir" if [ "$SINK_TYPE" == "kafka" ]; then run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3" fi From 615811e06ffa089ca5df76921fecf0a7fc8d31b2 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 14:13:43 +0800 Subject: [PATCH 19/20] fix integration test script --- tests/_utils/run_cdc_server | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/_utils/run_cdc_server b/tests/_utils/run_cdc_server index 0ef79eef620..ad55d087757 100755 --- a/tests/_utils/run_cdc_server +++ b/tests/_utils/run_cdc_server @@ -86,7 +86,6 @@ if [[ "$restart" == "true" ]]; then GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ - --sorter-max-memory-consumption 0 \ $tls \ $certcn \ $addr \ @@ -100,7 +99,6 @@ else GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ - --sorter-max-memory-consumption 0 \ $tls \ $certcn \ $addr \ From e2c599d6a0b2f5dd4f1b7ef9e661bb5ad35066d3 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 16:20:11 +0800 Subject: [PATCH 20/20] Merge remote-tracking branch 'origin/master' into zixiong-fix-kafka-message-length