From 2958b029c237a76fc40e07dbdcb80ba6cdd46957 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 24 Nov 2020 10:58:28 +0800 Subject: [PATCH] sink: Kafka message length and batch size limit for Open Protocol (#1079) --- cdc/sink/codec/avro.go | 6 + cdc/sink/codec/canal.go | 6 + cdc/sink/codec/canal_flat.go | 6 + cdc/sink/codec/interface.go | 8 ++ cdc/sink/codec/json.go | 95 ++++++++++--- cdc/sink/codec/json_test.go | 75 +++++++++- cdc/sink/codec/maxwell.go | 5 + cdc/sink/mq.go | 31 +++++ cdc/sink/producer/kafka/kafka.go | 4 +- .../framework/avro/kafka_single_table.go | 2 +- .../framework/canal/kafka_single_table.go | 2 +- integration/framework/task.go | 2 +- kafka_consumer/main.go | 49 ++++++- tests/kafka_messages/conf/diff_config.toml | 27 ++++ tests/kafka_messages/conf/workload | 13 ++ tests/kafka_messages/run.sh | 129 ++++++++++++++++++ 16 files changed, 429 insertions(+), 31 deletions(-) create mode 100644 tests/kafka_messages/conf/diff_config.toml create mode 100644 tests/kafka_messages/conf/workload create mode 100755 tests/kafka_messages/run.sh diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index 699087b6886..87250dc031b 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -163,6 +163,12 @@ func (a *AvroEventBatchEncoder) Size() int { return sum } +// SetParams is no-op for now +func (a *AvroEventBatchEncoder) SetParams(params map[string]string) error { + // no op + return nil +} + func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column) (*avroEncodeResult, error) { schemaGen := func() (string, error) { schema, err := ColumnInfoToAvroSchema(table.Table, cols) diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index f1c11365fa3..9bd2a88c29f 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -416,6 +416,12 @@ func (d *CanalEventBatchEncoder) Reset() { panic("Reset only used for JsonEncoder") } +// SetParams is no-op for now +func (d *CanalEventBatchEncoder) SetParams(params map[string]string) error { + // no op + return nil +} + // refreshPacketBody() marshals the messages to the packet body func (d *CanalEventBatchEncoder) refreshPacketBody() error { oldSize := len(d.packet.Body) diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index f3e8d31c56e..7945a4781b7 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -241,3 +241,9 @@ func (c *CanalFlatEventBatchEncoder) Size() int { func (c *CanalFlatEventBatchEncoder) Reset() { panic("not supported") } + +// SetParams is no-op for now +func (c *CanalFlatEventBatchEncoder) SetParams(params map[string]string) error { + // no op + return nil +} diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index c4b02643ade..584fd322076 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -42,9 +42,12 @@ type EventBatchEncoder interface { // TODO decouple it out MixedBuild(withVersion bool) []byte // Size returns the size of the batch(bytes) + // Deprecated: Size is deprecated Size() int // Reset reset the kv buffer Reset() + // SetParams provides the encoder with more info on the sink + SetParams(params map[string]string) error } // MQMessage represents an MQ message to the mqSink @@ -54,6 +57,11 @@ type MQMessage struct { Ts uint64 // reserved for possible output sorting } +// Length returns the expected size of the Kafka message +func (m *MQMessage) Length() int { + return len(m.Key) + len(m.Value) +} + // NewMQMessage should be used when creating a MQMessage struct. // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. func NewMQMessage(key []byte, value []byte, ts uint64) *MQMessage { diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 89078f6d182..000438a03cc 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -35,6 +35,8 @@ import ( const ( // BatchVersion1 represents the version of batch format BatchVersion1 uint64 = 1 + // DefaultMaxMessageBytes sets the default value for max-message-bytes + DefaultMaxMessageBytes int = 64 * 1024 * 1024 // 64M ) type column struct { @@ -297,9 +299,16 @@ func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent // JSONEventBatchEncoder encodes the events into the byte of a batch into. type JSONEventBatchEncoder struct { - keyBuf *bytes.Buffer - valueBuf *bytes.Buffer - supportMixedBuild bool // TODO decouple this out + // TODO remove deprecated fields + keyBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now + valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now + supportMixedBuild bool // TODO decouple this out + + messageBuf []*MQMessage + curBatchSize int + // configs + maxKafkaMessageSize int + maxBatchSize int } // SetMixedBuildSupport is used by CDC Log @@ -363,11 +372,37 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) var valueLenByte [8]byte binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) - d.keyBuf.Write(keyLenByte[:]) - d.keyBuf.Write(key) + if d.supportMixedBuild { + d.keyBuf.Write(keyLenByte[:]) + d.keyBuf.Write(key) + + d.valueBuf.Write(valueLenByte[:]) + d.valueBuf.Write(value) + } else { + if len(d.messageBuf) == 0 || + d.curBatchSize >= d.maxBatchSize || + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize { - d.valueBuf.Write(valueLenByte[:]) - d.valueBuf.Write(value) + versionHead := make([]byte, 8) + binary.BigEndian.PutUint64(versionHead, BatchVersion1) + + d.messageBuf = append(d.messageBuf, NewMQMessage(versionHead, nil, 0)) + d.curBatchSize = 0 + } + + message := d.messageBuf[len(d.messageBuf)-1] + message.Key = append(message.Key, keyLenByte[:]...) + message.Key = append(message.Key, key...) + message.Value = append(message.Value, valueLenByte[:]...) + message.Value = append(message.Value, value...) + + if message.Length() > d.maxKafkaMessageSize { + // `len(d.messageBuf) == 1` is implied + log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", + zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize)) + } + d.curBatchSize++ + } return EncoderNoOperation, nil } @@ -413,20 +448,17 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e // Build implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { - if d.valueBuf.Len() == 0 { - return nil + if d.supportMixedBuild { + if d.valueBuf.Len() == 0 { + return nil + } + ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0) + return []*MQMessage{ret} } - ret := NewMQMessage(d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0) - - if !d.supportMixedBuild { - d.keyBuf.Reset() - d.valueBuf.Reset() - var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) - d.keyBuf.Write(versionByte[:]) - } - return []*MQMessage{ret} + ret := d.messageBuf + d.messageBuf = make([]*MQMessage, 0) + return ret } // MixedBuild implements the EventBatchEncoder interface @@ -481,6 +513,31 @@ func (d *JSONEventBatchEncoder) Reset() { d.valueBuf.Reset() } +// SetParams reads relevant parameters for Open Protocol +func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { + var err error + if maxMessageBytes, ok := params["max-message-bytes"]; ok { + d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes) + if err != nil { + // TODO add error code + return errors.Trace(err) + } + } else { + d.maxKafkaMessageSize = DefaultMaxMessageBytes + } + + if maxBatchSize, ok := params["max-batch-size"]; ok { + d.maxBatchSize, err = strconv.Atoi(maxBatchSize) + if err != nil { + // TODO add error code + return errors.Trace(err) + } + } else { + d.maxBatchSize = 4096 + } + return nil +} + // NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder. func NewJSONEventBatchEncoder() EventBatchEncoder { batch := &JSONEventBatchEncoder{ diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index ba2b3d2f7f1..b91ae48eedf 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -133,6 +133,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco for _, cs := range s.rowCases { encoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + mixedEncoder := newEncoder() mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) for _, row := range cs { @@ -151,10 +154,8 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco checkRowDecoder(mixedDecoder, cs) // test normal decode if len(cs) > 0 { - size := encoder.Size() res := encoder.Build() c.Assert(res, check.HasLen, 1) - c.Assert(len(res[0].Key)+len(res[0].Value), check.Equals, size) decoder, err := newDecoder(res[0].Key, res[0].Value) c.Assert(err, check.IsNil) checkRowDecoder(decoder, cs) @@ -164,6 +165,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco for _, cs := range s.ddlCases { encoder := newEncoder() mixedEncoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) for i, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) @@ -189,6 +193,9 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco for _, cs := range s.resolvedTsCases { encoder := newEncoder() mixedEncoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true) for i, ts := range cs { msg, err := encoder.EncodeCheckpointEvent(ts) @@ -212,6 +219,70 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco } } +func (s *batchSuite) TestMaxMessageBytes(c *check.C) { + encoder := NewJSONEventBatchEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "256"}) + c.Check(err, check.IsNil) + + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + for _, msg := range messages { + c.Assert(msg.Length(), check.LessEqual, 256) + } +} + +func (s *batchSuite) TestMaxBatchSize(c *check.C) { + encoder := NewJSONEventBatchEncoder() + err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) + c.Check(err, check.IsNil) + + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + sum := 0 + for _, msg := range messages { + decoder, err := NewJSONEventBatchDecoder(msg.Key, msg.Value) + c.Check(err, check.IsNil) + count := 0 + for { + t, hasNext, err := decoder.HasNext() + c.Check(err, check.IsNil) + if !hasNext { + break + } + + c.Check(t, check.Equals, model.MqMessageTypeRow) + _, err = decoder.NextRowChangedEvent() + c.Check(err, check.IsNil) + count++ + } + c.Check(count, check.LessEqual, 64) + sum += count + } + c.Check(sum, check.Equals, 10000) +} + func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) { defer testleak.AfterTest(c)() s.testBatchCodec(c, func() EventBatchEncoder { diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 9f2fe7a4942..55de3c398f6 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -148,6 +148,11 @@ func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEven return EncoderNoOperation, nil } +// SetParams is no-op for Maxwell for now +func (d *MaxwellEventBatchEncoder) SetParams(params map[string]string) error { + return nil +} + // Column represents a column in maxwell type Column struct { Type string `json:"type"` diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 4797489f511..fcaa3aa69e2 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -111,6 +111,21 @@ func newMqSink( return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled")) } + // pre-flight verification of encoder parameters + if err := newEncoder().SetParams(opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + newEncoder1 := newEncoder + newEncoder = func() codec.EventBatchEncoder { + ret := newEncoder1() + err := ret.SetParams(opts) + if err != nil { + log.Fatal("MQ Encoder could not parse parameters", zap.Error(err)) + } + return ret + } + k := &mqSink{ mqProducer: mqProducer, dispatcher: d, @@ -404,6 +419,12 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } config.MaxMessageBytes = c + opts["max-message-bytes"] = s + } + + s = sinkURI.Query().Get("max-batch-size") + if s != "" { + opts["max-message-bytes"] = s } s = sinkURI.Query().Get("compression") @@ -456,6 +477,16 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if s != "" { replicaConfig.Sink.Protocol = s } + // These two options are not used by Pulsar producer itself, but the encoders + s = sinkURI.Query().Get("max-message-bytes") + if s != "" { + opts["max-message-bytes"] = s + } + + s = sinkURI.Query().Get("max-batch-size") + if s != "" { + opts["max-message-bytes"] = s + } // For now, it's a place holder. Avro format have to make connection to Schema Registery, // and it may needs credential. credential := &security.Credential{} diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 3c91fb81d00..872b51a2276 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -306,7 +306,9 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c NumPartitions: partitionNum, ReplicationFactor: config.ReplicationFactor, }, false) - if err != nil { + + // TODO idenfity the cause of "Topic with this name already exists" + if err != nil && !strings.Contains(err.Error(), "already exists") { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } } diff --git a/integration/framework/avro/kafka_single_table.go b/integration/framework/avro/kafka_single_table.go index f9ba1c35032..bad70fe0c18 100644 --- a/integration/framework/avro/kafka_single_table.go +++ b/integration/framework/avro/kafka_single_table.go @@ -43,7 +43,7 @@ func (a *SingleTableTask) Name() string { func (a *SingleTableTask) GetCDCProfile() *framework.CDCProfile { return &framework.CDCProfile{ PDUri: "http://upstream-pd:2379", - SinkURI: "kafka://kafka:9092/testdb_" + a.TableName + "?protocol=avro", + SinkURI: "kafka://kafka:9092/testdb_" + a.TableName + "?kafka-version=2.6.0&protocol=avro", Opts: map[string]string{"registry": "http://schema-registry:8081"}, } } diff --git a/integration/framework/canal/kafka_single_table.go b/integration/framework/canal/kafka_single_table.go index f59344627d8..ad15240af15 100644 --- a/integration/framework/canal/kafka_single_table.go +++ b/integration/framework/canal/kafka_single_table.go @@ -47,7 +47,7 @@ func (c *SingleTableTask) GetCDCProfile() *framework.CDCProfile { } return &framework.CDCProfile{ PDUri: "http://upstream-pd:2379", - SinkURI: "kafka://kafka:9092/" + testDbName + "?protocol=" + protocol, + SinkURI: "kafka://kafka:9092/" + testDbName + "?kafka-version=2.6.0&protocol=" + protocol, Opts: map[string]string{"force-handle-key-pkey": "true", "support-txn": "true"}, ConfigFile: "/config/canal-test-config.toml", } diff --git a/integration/framework/task.go b/integration/framework/task.go index f5f4701cf10..c122af35e04 100644 --- a/integration/framework/task.go +++ b/integration/framework/task.go @@ -92,7 +92,7 @@ func (p *CDCProfile) String() string { log.Fatal("SinkURI cannot be empty!") } - builder.WriteString("--sink-uri=" + p.SinkURI + " ") + builder.WriteString("--sink-uri=\"" + p.SinkURI + "\" ") if p.ConfigFile != "" { builder.WriteString("--config=" + p.ConfigFile + " ") diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index 1e67f2f3ad0..b0377838d48 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -46,11 +46,13 @@ import ( // Sarama configuration options var ( - kafkaAddrs []string - kafkaTopic string - kafkaPartitionNum int32 - kafkaGroupID = fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()) - kafkaVersion = "2.4.0" + kafkaAddrs []string + kafkaTopic string + kafkaPartitionNum int32 + kafkaGroupID = fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()) + kafkaVersion = "2.4.0" + kafkaMaxMessageBytes = math.MaxInt64 + kafkaMaxBatchSize = math.MaxInt64 downstreamURIStr string @@ -71,7 +73,6 @@ func init() { flag.StringVar(&ca, "ca", "", "CA certificate path for Kafka SSL connection") flag.StringVar(&cert, "cert", "", "Certificate path for Kafka SSL connection") flag.StringVar(&key, "key", "", "Private key path for Kafka SSL connection") - flag.Parse() err := logutil.InitLogger(&logutil.Config{ @@ -122,6 +123,27 @@ func init() { } kafkaPartitionNum = int32(c) } + + s = upstreamURI.Query().Get("max-message-bytes") + if s != "" { + c, err := strconv.Atoi(s) + if err != nil { + log.Fatal("invalid max-message-bytes of upstream-uri") + } + log.Info("Setting max-message-bytes", zap.Int("max-message-bytes", c)) + kafkaMaxMessageBytes = c + } + + s = upstreamURI.Query().Get("max-batch-size") + if s != "" { + c, err := strconv.Atoi(s) + if err != nil { + log.Fatal("invalid max-batch-size of upstream-uri") + } + log.Info("Setting max-batch-size", zap.Int("max-batch-size", c)) + kafkaMaxBatchSize = c + } + } func getPartitionNum(address []string, topic string, cfg *sarama.Config) (int32, error) { @@ -370,6 +392,8 @@ ClaimMessages: if err != nil { return errors.Trace(err) } + + counter := 0 for { tp, hasNext, err := batchDecoder.HasNext() if err != nil { @@ -378,6 +402,14 @@ ClaimMessages: if !hasNext { break } + + counter++ + // If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning. + if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes && counter > 1 { + log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), + zap.Int("recevied-bytes", len(message.Key)+len(message.Value))) + } + switch tp { case model.MqMessageTypeDDL: ddl, err := batchDecoder.NextDDLEvent() @@ -426,6 +458,11 @@ ClaimMessages: } session.MarkMessage(message, "") } + + if counter > kafkaMaxBatchSize { + log.Fatal("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize), + zap.Int("actual-batch-size", counter)) + } } return nil diff --git a/tests/kafka_messages/conf/diff_config.toml b/tests/kafka_messages/conf/diff_config.toml new file mode 100644 index 00000000000..fce21a177b7 --- /dev/null +++ b/tests/kafka_messages/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "kafka_message" + tables = ["~usertable.*"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/kafka_messages/conf/workload b/tests/kafka_messages/conf/workload new file mode 100644 index 00000000000..1763faf2338 --- /dev/null +++ b/tests/kafka_messages/conf/workload @@ -0,0 +1,13 @@ +threadcount=10 +recordcount=15000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/kafka_messages/run.sh b/tests/kafka_messages/run.sh new file mode 100755 index 00000000000..548331a495d --- /dev/null +++ b/tests/kafka_messages/run.sh @@ -0,0 +1,129 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run_length_limit() { + # test kafka sink only in this case + if [ "$SINK_TYPE" == "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_sql "DROP DATABASE if exists kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + run_sql "CREATE DATABASE kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" + + TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=4096" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-dir="$sort_dir" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=4096" + fi + + # Add a check table to reduce check time, or if we check data with sync diff + # directly, there maybe a lot of diff data at first because of the incremental scan + run_sql "CREATE table kafka_message.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "truncate table kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_sql "CREATE table kafka_message.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message + run_sql "CREATE table kafka_message.check3(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.check3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "create table kafka_message.USERTABLE2 like kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into kafka_message.USERTABLE2 select * from kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table kafka_message.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_batch_size_limit() { + # test kafka sink only in this case + if [ "$SINK_TYPE" == "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_sql "DROP DATABASE if exists kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + run_sql "CREATE DATABASE kafka_message;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" + + TOPIC_NAME="ticdc-kafka-message-test-$RANDOM" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-dir="$sort_dir" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-batch-size=3" + fi + + # Add a check table to reduce check time, or if we check data with sync diff + # directly, there maybe a lot of diff data at first because of the incremental scan + run_sql "CREATE table kafka_message.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "truncate table kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_sql "CREATE table kafka_message.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=kafka_message + run_sql "CREATE table kafka_message.check3(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.check3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "create table kafka_message.USERTABLE2 like kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into kafka_message.USERTABLE2 select * from kafka_message.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table kafka_message.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "kafka_message.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "kafka_message.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +trap stop_tidb_cluster EXIT +run_length_limit $* +run_batch_size_limit $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"