diff --git a/cdc/processor/sourcemanager/engine/factory/pebble.go b/cdc/processor/sourcemanager/engine/factory/pebble.go index 6b8bdf39b9f..9bf29663bdb 100644 --- a/cdc/processor/sourcemanager/engine/factory/pebble.go +++ b/cdc/processor/sourcemanager/engine/factory/pebble.go @@ -34,6 +34,8 @@ func createPebbleDBs( dbs := make([]*pebble.DB, 0, cfg.Count) writeStalls := make([]writeStall, cfg.Count) + cache := pebble.NewCache(int64(memQuotaInBytes)) + defer cache.Unref() for id := 0; id < cfg.Count; id++ { ws := writeStalls[id] adjust := func(opts *pebble.Options) { @@ -57,7 +59,7 @@ func createPebbleDBs( } } - db, err := epebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust) + db, err := epebble.OpenPebble(id, dir, cfg, cache, adjust) if err != nil { log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err)) for _, db := range dbs { @@ -67,7 +69,7 @@ func createPebbleDBs( } log.Info("create pebble instance success", zap.Int("id", id+1), - zap.Uint64("cacheSize", memQuotaInBytes/uint64(cfg.Count))) + zap.Uint64("sharedCacheSize", memQuotaInBytes)) dbs = append(dbs, db) } return dbs, writeStalls, nil diff --git a/cdc/processor/sourcemanager/engine/pebble/db.go b/cdc/processor/sourcemanager/engine/pebble/db.go index 3c48ff1aa07..695fc210df6 100644 --- a/cdc/processor/sourcemanager/engine/pebble/db.go +++ b/cdc/processor/sourcemanager/engine/pebble/db.go @@ -89,7 +89,7 @@ func iterTable( // OpenPebble opens a pebble. func OpenPebble( id int, path string, cfg *config.DBConfig, - cacheSize uint64, + cache *pebble.Cache, adjusts ...func(*pebble.Options), ) (db *pebble.DB, err error) { dbDir := filepath.Join(path, fmt.Sprintf("%04d", id)) @@ -99,11 +99,7 @@ func OpenPebble( } opts := buildPebbleOption(cfg) - if cacheSize > 0 { - opts.Cache = pebble.NewCache(int64(cacheSize)) - defer opts.Cache.Unref() - } - + opts.Cache = cache for _, adjust := range adjusts { adjust(opts) } @@ -136,10 +132,14 @@ func buildPebbleOption(cfg *config.DBConfig) (opts *pebble.Options) { l.IndexBlockSize = 256 << 10 // 256 KB l.FilterPolicy = bloom.FilterPolicy(10) l.FilterType = pebble.TableFilter - if i == 0 { - l.TargetFileSize = 8 << 20 // 8 MB - } else if i < 4 { - l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2 + // 8M is large enough because generally Sorter won't carry too much data. + // Avoiding large targe file is helpful to reduce write-amplification. + l.TargetFileSize = 8 << 20 // 8 MB + switch cfg.Compression { + case "none": + l.Compression = pebble.NoCompression + case "snappy": + l.Compression = pebble.SnappyCompression } l.EnsureDefaults() } diff --git a/cdc/processor/sourcemanager/engine/pebble/db_test.go b/cdc/processor/sourcemanager/engine/pebble/db_test.go index 16fbcc938a1..af65f2d331d 100644 --- a/cdc/processor/sourcemanager/engine/pebble/db_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/db_test.go @@ -30,7 +30,7 @@ func TestIteratorWithTableFilter(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) db, err := OpenPebble( 1, dbPath, &config.DBConfig{Count: 1}, - 1024*1024*10, + nil, // Disable auto compactions to make the case more stable. func(opts *pebble.Options) { opts.DisableAutomaticCompactions = true }, ) diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go index c199303a734..a50abd6bd98 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go @@ -28,7 +28,7 @@ import ( func TestTableOperations(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -50,7 +50,7 @@ func TestTableOperations(t *testing.T) { // TestNoResolvedTs tests resolved timestamps shouldn't be emitted. func TestNoResolvedTs(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -80,7 +80,7 @@ func TestNoResolvedTs(t *testing.T) { // TestEventFetch tests events can be sorted correctly. func TestEventFetch(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -161,7 +161,7 @@ func TestEventFetch(t *testing.T) { func TestCleanData(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() diff --git a/cdc/server/server.go b/cdc/server/server.go index d26fb0db923..ee8ffe22f99 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -255,12 +255,7 @@ func (s *server) startActorSystems(ctx context.Context) error { sortDir := config.GetGlobalServerConfig().Sorter.SortDir if s.useEventSortEngine { - totalMemory, err := util.GetMemoryLimit() - if err != nil { - return errors.Trace(err) - } - memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100 - memInBytes := uint64(float64(totalMemory) * memPercentage) + memInBytes := conf.Sorter.CacheSizeInMB * (1 << 20) if config.GetGlobalServerConfig().Debug.EnableDBSorter { s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB) } else { diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index ca020feb946..d0b8aa26775 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -80,23 +80,7 @@ func (o *options) addFlags(cmd *cobra.Command) { cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(o.serverConfig.ProcessorFlushInterval), "processor flushes task status interval") _ = cmd.Flags().MarkHidden("processor-flush-interval") - // sorter related parameters, hidden them since cannot be configured by TiUP easily. - cmd.Flags().IntVar(&o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size") - _ = cmd.Flags().MarkHidden("sorter-num-workerpool-goroutine") - - cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level") - _ = cmd.Flags().MarkHidden("sorter-num-concurrent-worker") - - cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting") - _ = cmd.Flags().MarkHidden("sorter-chunk-size-limit") - // 80 is safe on most systems. - cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPercentage, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPercentage, "system memory usage threshold for forcing in-disk sort") - _ = cmd.Flags().MarkHidden("sorter-max-memory-percentage") - // We use 8GB as a safe default before we support local configuration file. - cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort") - _ = cmd.Flags().MarkHidden("sorter-max-memory-consumption") - // sort-dir id deprecate, hidden it. cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory") _ = cmd.Flags().MarkHidden("sort-dir") @@ -207,16 +191,6 @@ func (o *options) complete(cmd *cobra.Command) error { cfg.OwnerFlushInterval = o.serverConfig.OwnerFlushInterval case "processor-flush-interval": cfg.ProcessorFlushInterval = o.serverConfig.ProcessorFlushInterval - case "sorter-num-workerpool-goroutine": - cfg.Sorter.NumWorkerPoolGoroutine = o.serverConfig.Sorter.NumWorkerPoolGoroutine - case "sorter-num-concurrent-worker": - cfg.Sorter.NumConcurrentWorker = o.serverConfig.Sorter.NumConcurrentWorker - case "sorter-chunk-size-limit": - cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit - case "sorter-max-memory-percentage": - cfg.Sorter.MaxMemoryPercentage = o.serverConfig.Sorter.MaxMemoryPercentage - case "sorter-max-memory-consumption": - cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption case "ca": cfg.Security.CAPath = o.serverConfig.Security.CAPath case "cert": diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 2b4448ac207..a3fec87175d 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -124,11 +124,6 @@ func TestParseCfg(t *testing.T) { "--cert", "bb", "--key", "cc", "--cert-allowed-cn", "dd,ee", - "--sorter-chunk-size-limit", "50000000", - "--sorter-max-memory-consumption", "60000", - "--sorter-max-memory-percentage", "70", - "--sorter-num-concurrent-worker", "80", - "--sorter-num-workerpool-goroutine", "90", "--sort-dir", "/tmp/just_a_test", })) @@ -156,12 +151,14 @@ func TestParseCfg(t *testing.T) { OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond), Sorter: &config.SorterConfig{ - NumConcurrentWorker: 80, - ChunkSizeLimit: 50000000, - MaxMemoryPercentage: 70, - MaxMemoryConsumption: 60000, - NumWorkerPoolGoroutine: 90, - SortDir: config.DefaultSortDir, + SortDir: config.DefaultSortDir, + CacheSizeInMB: 128, + MaxMemoryPercentage: 10, + + NumConcurrentWorker: 4, + ChunkSizeLimit: 128 * 1024 * 1024, + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + NumWorkerPoolGoroutine: 16, }, Security: &config.SecurityConfig{ CertPath: "bb", @@ -248,11 +245,6 @@ max-days = 1 max-backups = 1 [sorter] -chunk-size-limit = 10000000 -max-memory-consumption = 2000000 -max-memory-percentage = 3 -num-concurrent-worker = 4 -num-workerpool-goroutine = 5 sort-dir = "/tmp/just_a_test" [kv-client] @@ -324,12 +316,14 @@ check-balance-interval = "10s" OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(600 * time.Millisecond), Sorter: &config.SorterConfig{ + SortDir: config.DefaultSortDir, + CacheSizeInMB: 128, + MaxMemoryPercentage: 10, + NumConcurrentWorker: 4, - ChunkSizeLimit: 10000000, - MaxMemoryPercentage: 3, - MaxMemoryConsumption: 2000000, - NumWorkerPoolGoroutine: 5, - SortDir: config.DefaultSortDir, + ChunkSizeLimit: 128 * 1024 * 1024, + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + NumWorkerPoolGoroutine: 16, }, Security: &config.SecurityConfig{}, PerTableMemoryQuota: config.DefaultTableMemoryQuota, @@ -411,11 +405,6 @@ max-days = 1 max-backups = 1 [sorter] -chunk-size-limit = 10000000 -max-memory-consumption = 2000000 -max-memory-percentage = 3 -num-concurrent-worker = 4 -num-workerpool-goroutine = 5 sort-dir = "/tmp/just_a_test" [security] @@ -441,10 +430,6 @@ cert-allowed-cn = ["dd","ee"] "--owner-flush-interval", "150ms", "--processor-flush-interval", "150ms", "--ca", "", - "--sorter-chunk-size-limit", "50000000", - "--sorter-max-memory-consumption", "60000000", - "--sorter-max-memory-percentage", "70", - "--sorter-num-concurrent-worker", "3", "--config", configPath, })) @@ -472,12 +457,14 @@ cert-allowed-cn = ["dd","ee"] OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond), Sorter: &config.SorterConfig{ - NumConcurrentWorker: 3, - ChunkSizeLimit: 50000000, - MaxMemoryPercentage: 70, - MaxMemoryConsumption: 60000000, - NumWorkerPoolGoroutine: 5, - SortDir: config.DefaultSortDir, + SortDir: config.DefaultSortDir, + CacheSizeInMB: 128, + MaxMemoryPercentage: 10, + + NumConcurrentWorker: 4, + ChunkSizeLimit: 128 * 1024 * 1024, + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + NumWorkerPoolGoroutine: 16, }, Security: &config.SecurityConfig{ CertPath: "bb", diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 07aa9093824..029a6984279 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -84,12 +84,13 @@ const ( "owner-flush-interval": 50000000, "processor-flush-interval": 50000000, "sorter": { + "sort-dir": "/tmp/sorter", + "cache-size-in-mb": 128, + "max-memory-percentage": 10, "num-concurrent-worker": 4, "chunk-size-limit": 999, - "max-memory-percentage": 10, "max-memory-consumption": 17179869184, - "num-workerpool-goroutine": 16, - "sort-dir": "/tmp/sorter" + "num-workerpool-goroutine": 16 }, "security": { "ca-path": "", diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 18b1ebf9d5a..2d327103032 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -111,12 +111,14 @@ var defaultServerConfig = &ServerConfig{ OwnerFlushInterval: TomlDuration(50 * time.Millisecond), ProcessorFlushInterval: TomlDuration(50 * time.Millisecond), Sorter: &SorterConfig{ + SortDir: DefaultSortDir, + CacheSizeInMB: 128, // By default use 128M memory as sorter cache. + MaxMemoryPercentage: 10, // Only for unified sorter. + NumConcurrentWorker: 4, ChunkSizeLimit: 128 * 1024 * 1024, // 128MB - MaxMemoryPercentage: 10, // 10% is safe on machines with memory capacity <= 16GB MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16GB NumWorkerPoolGoroutine: 16, - SortDir: DefaultSortDir, }, Security: &SecurityConfig{}, PerTableMemoryQuota: DefaultTableMemoryQuota, diff --git a/pkg/config/sorter.go b/pkg/config/sorter.go index 8eafdaef19c..58e47ea2bb2 100644 --- a/pkg/config/sorter.go +++ b/pkg/config/sorter.go @@ -13,22 +13,30 @@ package config -import "github.com/pingcap/tiflow/pkg/errors" +import ( + "math" + + "github.com/pingcap/tiflow/pkg/errors" +) // SorterConfig represents sorter config for a changefeed type SorterConfig struct { + // the directory used to store the temporary files generated by the sorter + SortDir string `toml:"sort-dir" json:"sort-dir"` + // Cache size of sorter in MB. + CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"` + // the maximum memory use percentage that allows in-memory sorting + // Deprecated: use CacheSizeInMB instead. + MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"` + // number of concurrent heap sorts NumConcurrentWorker int `toml:"num-concurrent-worker" json:"num-concurrent-worker"` // maximum size for a heap ChunkSizeLimit uint64 `toml:"chunk-size-limit" json:"chunk-size-limit"` - // the maximum memory use percentage that allows in-memory sorting - MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"` // the maximum memory consumption allowed for in-memory sorting MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"` // the size of workerpool NumWorkerPoolGoroutine int `toml:"num-workerpool-goroutine" json:"num-workerpool-goroutine"` - // the directory used to store the temporary files generated by the sorter - SortDir string `toml:"sort-dir" json:"sort-dir"` } // ValidateAndAdjust validates and adjusts the sorter configuration @@ -53,10 +61,12 @@ func (c *SorterConfig) ValidateAndAdjust() error { return errors.ErrIllegalSorterParameter.GenWithStackByArgs( "num-workerpool-goroutine should be at least 1, larger than 8 is recommended") } - if c.MaxMemoryPercentage <= 0 || c.MaxMemoryPercentage > 80 { + if c.MaxMemoryPercentage < 0 || c.MaxMemoryPercentage > 80 { return errors.ErrIllegalSorterParameter.GenWithStackByArgs( "max-memory-percentage should be a percentage and within (0, 80]") } - + if c.CacheSizeInMB < 8 || c.CacheSizeInMB*uint64(1<<20) > uint64(math.MaxInt64) { + return errors.ErrIllegalSorterParameter.GenWithStackByArgs("cache-size-in-mb should be greater than 8(MB)") + } return nil } diff --git a/tests/integration_tests/_utils/run_cdc_server b/tests/integration_tests/_utils/run_cdc_server index d0ea0f3b82c..6b450f08175 100755 --- a/tests/integration_tests/_utils/run_cdc_server +++ b/tests/integration_tests/_utils/run_cdc_server @@ -120,7 +120,6 @@ if [[ "$restart" == "true" ]]; then GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ - --sorter-num-workerpool-goroutine 4 \ --data-dir "$data_dir" \ --cluster-id "$cluster_id" \ $config_path \ @@ -137,7 +136,6 @@ else GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ - --sorter-num-workerpool-goroutine 4 \ --data-dir "$data_dir" \ --cluster-id "$cluster_id" \ $config_path \ diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml new file mode 100644 index 00000000000..14e7fd80c09 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml @@ -0,0 +1,3 @@ +[consistent] +level = "eventual" +storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/redo" diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml new file mode 100644 index 00000000000..1fab13aa043 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["consistent_replicate_storage_file_large_value.usertable*","consistent_replicate_storage_file_large_value.t*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload new file mode 100644 index 00000000000..144a0e04a53 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload @@ -0,0 +1,16 @@ +threadcount=10 +recordcount=100 +operationcount=0 +workload=core +fieldcount=1000 +fieldlengthdistribution=constant +fieldlength=15000 + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql b/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql new file mode 100644 index 00000000000..93dc7cd7a1c --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql @@ -0,0 +1,13 @@ +use `consistent_replicate_storage_file_large_value`; +set @@global.tidb_enable_exchange_partition=on; + +create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21)); +insert into t1 values (1),(2),(3),(4),(5),(6); +insert into t1 values (7),(8),(9); +insert into t1 values (11),(12),(20); +alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40)); +insert into t1 values (25),(29),(35); /*these values in p3,p4*/ + +create table t2 (a int primary key); + + diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh b/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh new file mode 100644 index 00000000000..591db8bfb38 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh @@ -0,0 +1,84 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +rm -rf "$WORK_DIR" +mkdir -p "$WORK_DIR" + +stop() { + # to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase + echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_storage_file_large_value.usertable;") + stop_tidb_cluster +} + +function run() { + # we only support eventually consistent replication with MySQL sink + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="mysql://normal:123456@127.0.0.1:3306/" + changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + + run_sql "CREATE DATABASE consistent_replicate_storage_file_large_value;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_storage_file_large_value + run_sql "CREATE table consistent_replicate_storage_file_large_value.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "consistent_replicate_storage_file_large_value.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "consistent_replicate_storage_file_large_value.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "consistent_replicate_storage_file_large_value.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + # Inject the failpoint to prevent sink execution, but the global resolved can be moved forward. + # Then we can apply redo log to reach an eventual consistent state in downstream. + cleanup_process $CDC_BINARY + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_sql "create table consistent_replicate_storage_file_large_value.usertable2 like consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE consistent_replicate_storage_file_large_value.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file_large_value.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.usertable2 select * from consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # to ensure row changed events have been replicated to TiCDC + sleep 20 + + storage_path="file://$WORK_DIR/redo" + tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id + current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta + cleanup_process $CDC_BINARY + + export GO_FAILPOINTS='' + + # This value is generated by: + # echo -n '123456' | base64 + # MTIzNDU2 + # Use this value here to test redo apply function works well + # when use base64 encoded password + ENPASSWORD="MTIzNDU2" + + cdc redo apply --tmp-dir="$tmp_download_path/apply" \ + --storage="$storage_path" \ + --sink-uri="mysql://normal:${ENPASSWORD}@127.0.0.1:3306/" + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml +} + +trap stop EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"