From af3f0096b6b1d896146059462b2241b256f0bd4a Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 22 Oct 2021 09:44:46 +0800 Subject: [PATCH] sink: set max-message-bytes to initialize producer. (#3080) (#3108) --- cdc/sink/codec/json.go | 11 +++++++- cdc/sink/codec/json_test.go | 37 +++++++++++++++------------ cdc/sink/producer/kafka/kafka.go | 11 +++----- cdc/sink/producer/kafka/kafka_test.go | 17 ++---------- errors.toml | 5 ++++ pkg/errors/errors.go | 1 + tests/dailytest/case.go | 4 +-- 7 files changed, 44 insertions(+), 42 deletions(-) diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 16a36da13a8..ddfb3bb9b9b 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -36,7 +36,7 @@ const ( // BatchVersion1 represents the version of batch format BatchVersion1 uint64 = 1 // DefaultMaxMessageBytes sets the default value for max-message-bytes - DefaultMaxMessageBytes int = 64 * 1024 * 1024 // 64M + DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M // DefaultMaxBatchSize sets the default value for max-batch-size DefaultMaxBatchSize int = 16 ) @@ -405,6 +405,15 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) d.valueBuf.Write(valueLenByte[:]) d.valueBuf.Write(value) } else { + // for single message that longer than max-message-size, do not send it. + // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` + length := len(key) + len(value) + maximumRecordOverhead + 16 + 8 + if length > d.maxKafkaMessageSize { + log.Warn("Single message too large", + zap.Int("max-message-size", d.maxKafkaMessageSize), zap.Int("length", length), zap.Any("table", e.Table)) + return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs() + } + if len(d.messageBuf) == 0 || d.curBatchSize >= d.maxBatchSize || d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize { diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 95b6bbb561b..5241fa0515b 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -226,8 +226,8 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) { encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder) err := encoder.SetParams(map[string]string{}) c.Assert(err, check.IsNil) - c.Assert(encoder.maxBatchSize, check.Equals, 16) - c.Assert(encoder.maxKafkaMessageSize, check.Equals, 64*1024*1024) + c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) + c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes) err = encoder.SetParams(map[string]string{"max-message-bytes": "0"}) c.Assert(err, check.ErrorMatches, ".*invalid.*") @@ -237,12 +237,12 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) { err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)}) c.Assert(err, check.IsNil) - c.Assert(encoder.maxBatchSize, check.Equals, 16) + c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32) err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)}) c.Assert(err, check.IsNil) - c.Assert(encoder.maxBatchSize, check.Equals, 16) + c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32) err = encoder.SetParams(map[string]string{"max-batch-size": "0"}) @@ -254,36 +254,39 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) { err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32) - c.Assert(encoder.maxKafkaMessageSize, check.Equals, 64*1024*1024) + c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes) err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32) - c.Assert(encoder.maxKafkaMessageSize, check.Equals, 64*1024*1024) + c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes) } func (s *batchSuite) TestMaxMessageBytes(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder() + // the size of `testEvent` is 87 testEvent := &model.RowChangedEvent{ CommitTs: 1, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } - // make producer's `max-message-bytes` must less than event, but we should still send it as possible. - err := encoder.SetParams(map[string]string{"max-message-bytes": "1"}) + // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it. + a := strconv.Itoa(87 + 44) + err := encoder.SetParams(map[string]string{"max-message-bytes": a}) c.Check(err, check.IsNil) - for i := 0; i < 100; i++ { - r, err := encoder.AppendRowChangedEvent(testEvent) - c.Check(r, check.Equals, EncoderNoOperation) - c.Check(err, check.IsNil) - } + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(err, check.IsNil) + c.Check(r, check.Equals, EncoderNoOperation) - // one message per batch, and can be build, which means the producer will try to send it. - messages := encoder.Build() - c.Assert(len(messages), check.Equals, 100) + a = strconv.Itoa(87 + 43) + err = encoder.SetParams(map[string]string{"max-message-bytes": a}) + c.Assert(err, check.IsNil) + r, err = encoder.AppendRowChangedEvent(testEvent) + c.Check(err, check.NotNil) + c.Check(r, check.Equals, EncoderNoOperation) // make sure each batch's `Length` not greater than `max-message-bytes` err = encoder.SetParams(map[string]string{"max-message-bytes": "256"}) @@ -295,7 +298,7 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { c.Check(err, check.IsNil) } - messages = encoder.Build() + messages := encoder.Build() for _, msg := range messages { c.Assert(msg.Length(), check.LessEqual, 256) } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index c7f6edff355..57efe3c30a2 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -57,8 +57,9 @@ type Config struct { // NewConfig returns a default Kafka configuration func NewConfig() *Config { return &Config{ - Version: "2.4.0", - MaxMessageBytes: 512 * 1024 * 1024, // 512M + Version: "2.4.0", + // MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker. + MaxMessageBytes: 1 * 1024 * 1024, ReplicationFactor: 1, Compression: "none", Credential: &security.Credential{}, @@ -98,11 +99,7 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi if err != nil { return err } - // `MaxMessageBytes` is set to `512 mb` by default, but it's still possible that a larger value expected. - // TiCDC should send the message at best. - if a > c.MaxMessageBytes { - c.MaxMessageBytes = a - } + c.MaxMessageBytes = a opts["max-message-bytes"] = s } diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index feae407ba28..b4f72df36bd 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "net/url" - "strconv" "sync" "testing" "time" @@ -73,7 +72,7 @@ func (s *kafkaSuite) TestInitializeConfig(c *check.C) { uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + "&max-message-bytes=%s&partition-num=1&replication-factor=3" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" - maxMessageSize := "4194304" + maxMessageSize := "4096" // 4kb uri := fmt.Sprintf(uriTemplate, maxMessageSize) sinkURI, err := url.Parse(uri) @@ -88,7 +87,7 @@ func (s *kafkaSuite) TestInitializeConfig(c *check.C) { c.Assert(cfg.PartitionNum, check.Equals, int32(1)) c.Assert(cfg.ReplicationFactor, check.Equals, int16(3)) c.Assert(cfg.Version, check.Equals, "2.6.0") - c.Assert(cfg.MaxMessageBytes, check.Equals, 512*1024*1024) + c.Assert(cfg.MaxMessageBytes, check.Equals, 4096) expectedOpts := map[string]string{ "max-message-bytes": maxMessageSize, @@ -97,18 +96,6 @@ func (s *kafkaSuite) TestInitializeConfig(c *check.C) { for k, v := range opts { c.Assert(v, check.Equals, expectedOpts[k]) } - - a := 512*1024*1024 + 1 - maxMessageSize = strconv.Itoa(a) - uri = fmt.Sprintf(uriTemplate, maxMessageSize) - - sinkURI, err = url.Parse(uri) - c.Assert(err, check.IsNil) - - err = cfg.Initialize(sinkURI, replicaConfig, opts) - c.Assert(err, check.IsNil) - - c.Assert(cfg.MaxMessageBytes, check.Equals, a) } func (s *kafkaSuite) TestSaramaProducer(c *check.C) { diff --git a/errors.toml b/errors.toml index 3ea3497c4f1..ea1eebce9e3 100755 --- a/errors.toml +++ b/errors.toml @@ -336,6 +336,11 @@ error = ''' json codec invalid data ''' +["CDC:ErrJSONCodecRowTooLarge"] +error = ''' +json codec single row too large +''' + ["CDC:ErrKVStorageBackoffFailed"] error = ''' backoff failed diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 6edb74e863e..843df682b20 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -104,6 +104,7 @@ var ( ErrMaxwellDecodeFailed = errors.Normalize("maxwell decode failed", errors.RFCCodeText("CDC:ErrMaxwellDecodeFailed")) ErrMaxwellInvalidData = errors.Normalize("maxwell invalid data", errors.RFCCodeText("CDC:ErrMaxwellInvalidData")) ErrJSONCodecInvalidData = errors.Normalize("json codec invalid data", errors.RFCCodeText("CDC:ErrJSONCodecInvalidData")) + ErrJSONCodecRowTooLarge = errors.Normalize("json codec single row too large", errors.RFCCodeText("CDC:ErrJSONCodecRowTooLarge")) ErrCanalDecodeFailed = errors.Normalize("canal decode failed", errors.RFCCodeText("CDC:ErrCanalDecodeFailed")) ErrCanalEncodeFailed = errors.Normalize("canal encode failed", errors.RFCCodeText("CDC:ErrCanalEncodeFailed")) ErrOldValueNotEnabled = errors.Normalize("old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled")) diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 3dd44770ad7..d9b476b722a 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -157,9 +157,9 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { if err != nil { log.S().Fatal(err) } - // insert 5 * 1M + // insert 5 * 512KB // note limitation of TiDB: https://github.com/pingcap/docs/blob/733a5b0284e70c5b4d22b93a818210a3f6fbb5a0/FAQ.md#the-error-message-transaction-too-large-is-displayed - data := make([]byte, 1<<20) + data := make([]byte, 1<<19) for i := 0; i < 5; i++ { _, err = tx.Query("INSERT INTO binlog_big(id, data) VALUES(?, ?);", i, data) if err != nil {