Skip to content

Commit

Permalink
Merge branch 'release-5.1' into cherry-pick-3062-to-release-5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 22, 2021
2 parents 122b1ad + af3f009 commit 80b27e5
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 42 deletions.
11 changes: 10 additions & 1 deletion cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// BatchVersion1 represents the version of batch format
BatchVersion1 uint64 = 1
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 64 * 1024 * 1024 // 64M
DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M
// DefaultMaxBatchSize sets the default value for max-batch-size
DefaultMaxBatchSize int = 16
)
Expand Down Expand Up @@ -405,6 +405,15 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
d.valueBuf.Write(valueLenByte[:])
d.valueBuf.Write(value)
} else {
// for single message that longer than max-message-size, do not send it.
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
length := len(key) + len(value) + maximumRecordOverhead + 16 + 8
if length > d.maxKafkaMessageSize {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxKafkaMessageSize), zap.Int("length", length), zap.Any("table", e.Table))
return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

if len(d.messageBuf) == 0 ||
d.curBatchSize >= d.maxBatchSize ||
d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize {
Expand Down
37 changes: 20 additions & 17 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, 16)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, 64*1024*1024)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")
Expand All @@ -237,12 +237,12 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, 16)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, 16)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
Expand All @@ -254,36 +254,39 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, 64*1024*1024)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, 64*1024*1024)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes)
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()

// the size of `testEvent` is 87
testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

// make producer's `max-message-bytes` must less than event, but we should still send it as possible.
err := encoder.SetParams(map[string]string{"max-message-bytes": "1"})
// for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it.
a := strconv.Itoa(87 + 44)
err := encoder.SetParams(map[string]string{"max-message-bytes": a})
c.Check(err, check.IsNil)
for i := 0; i < 100; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(err, check.IsNil)
c.Check(r, check.Equals, EncoderNoOperation)

// one message per batch, and can be build, which means the producer will try to send it.
messages := encoder.Build()
c.Assert(len(messages), check.Equals, 100)
a = strconv.Itoa(87 + 43)
err = encoder.SetParams(map[string]string{"max-message-bytes": a})
c.Assert(err, check.IsNil)
r, err = encoder.AppendRowChangedEvent(testEvent)
c.Check(err, check.NotNil)
c.Check(r, check.Equals, EncoderNoOperation)

// make sure each batch's `Length` not greater than `max-message-bytes`
err = encoder.SetParams(map[string]string{"max-message-bytes": "256"})
Expand All @@ -295,7 +298,7 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
c.Check(err, check.IsNil)
}

messages = encoder.Build()
messages := encoder.Build()
for _, msg := range messages {
c.Assert(msg.Length(), check.LessEqual, 256)
}
Expand Down
11 changes: 4 additions & 7 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ type Config struct {
// NewConfig returns a default Kafka configuration
func NewConfig() *Config {
return &Config{
Version: "2.4.0",
MaxMessageBytes: 512 * 1024 * 1024, // 512M
Version: "2.4.0",
// MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker.
MaxMessageBytes: 1 * 1024 * 1024,
ReplicationFactor: 1,
Compression: "none",
Credential: &security.Credential{},
Expand Down Expand Up @@ -98,11 +99,7 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi
if err != nil {
return err
}
// `MaxMessageBytes` is set to `512 mb` by default, but it's still possible that a larger value expected.
// TiCDC should send the message at best.
if a > c.MaxMessageBytes {
c.MaxMessageBytes = a
}
c.MaxMessageBytes = a
opts["max-message-bytes"] = s
}

Expand Down
17 changes: 2 additions & 15 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"net/url"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -73,7 +72,7 @@ func (s *kafkaSuite) TestInitializeConfig(c *check.C) {
uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" +
"&max-message-bytes=%s&partition-num=1&replication-factor=3" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip"
maxMessageSize := "4194304"
maxMessageSize := "4096" // 4kb
uri := fmt.Sprintf(uriTemplate, maxMessageSize)

sinkURI, err := url.Parse(uri)
Expand All @@ -88,7 +87,7 @@ func (s *kafkaSuite) TestInitializeConfig(c *check.C) {
c.Assert(cfg.PartitionNum, check.Equals, int32(1))
c.Assert(cfg.ReplicationFactor, check.Equals, int16(3))
c.Assert(cfg.Version, check.Equals, "2.6.0")
c.Assert(cfg.MaxMessageBytes, check.Equals, 512*1024*1024)
c.Assert(cfg.MaxMessageBytes, check.Equals, 4096)

expectedOpts := map[string]string{
"max-message-bytes": maxMessageSize,
Expand All @@ -97,18 +96,6 @@ func (s *kafkaSuite) TestInitializeConfig(c *check.C) {
for k, v := range opts {
c.Assert(v, check.Equals, expectedOpts[k])
}

a := 512*1024*1024 + 1
maxMessageSize = strconv.Itoa(a)
uri = fmt.Sprintf(uriTemplate, maxMessageSize)

sinkURI, err = url.Parse(uri)
c.Assert(err, check.IsNil)

err = cfg.Initialize(sinkURI, replicaConfig, opts)
c.Assert(err, check.IsNil)

c.Assert(cfg.MaxMessageBytes, check.Equals, a)
}

func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ error = '''
json codec invalid data
'''

["CDC:ErrJSONCodecRowTooLarge"]
error = '''
json codec single row too large
'''

["CDC:ErrKVStorageBackoffFailed"]
error = '''
backoff failed
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var (
ErrMaxwellDecodeFailed = errors.Normalize("maxwell decode failed", errors.RFCCodeText("CDC:ErrMaxwellDecodeFailed"))
ErrMaxwellInvalidData = errors.Normalize("maxwell invalid data", errors.RFCCodeText("CDC:ErrMaxwellInvalidData"))
ErrJSONCodecInvalidData = errors.Normalize("json codec invalid data", errors.RFCCodeText("CDC:ErrJSONCodecInvalidData"))
ErrJSONCodecRowTooLarge = errors.Normalize("json codec single row too large", errors.RFCCodeText("CDC:ErrJSONCodecRowTooLarge"))
ErrCanalDecodeFailed = errors.Normalize("canal decode failed", errors.RFCCodeText("CDC:ErrCanalDecodeFailed"))
ErrCanalEncodeFailed = errors.Normalize("canal encode failed", errors.RFCCodeText("CDC:ErrCanalEncodeFailed"))
ErrOldValueNotEnabled = errors.Normalize("old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled"))
Expand Down
4 changes: 2 additions & 2 deletions tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) {
if err != nil {
log.S().Fatal(err)
}
// insert 5 * 1M
// insert 5 * 512KB
// note limitation of TiDB: https://github.com/pingcap/docs/blob/733a5b0284e70c5b4d22b93a818210a3f6fbb5a0/FAQ.md#the-error-message-transaction-too-large-is-displayed
data := make([]byte, 1<<20)
data := make([]byte, 1<<19)
for i := 0; i < 5; i++ {
_, err = tx.Query("INSERT INTO binlog_big(id, data) VALUES(?, ?);", i, data)
if err != nil {
Expand Down

0 comments on commit 80b27e5

Please sign in to comment.