From 4a1a13a80287fc1ffcf6f8e0e120a5808468e1f2 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 23 May 2023 19:23:38 +0800 Subject: [PATCH] sorter(cdc): add config cache-size (#9024) ref pingcap/tiflow#8974 Signed-off-by: qupeng --- .../sourcemanager/engine/factory/pebble.go | 6 +- .../sourcemanager/engine/pebble/db.go | 8 +- .../sourcemanager/engine/pebble/db_test.go | 2 +- .../engine/pebble/event_sorter_test.go | 8 +- cdc/server/server.go | 7 +- pkg/cmd/server/server.go | 26 ------ pkg/config/config_test_data.go | 3 +- pkg/config/server_config.go | 8 +- pkg/config/sorter.go | 13 ++- tests/integration_tests/_utils/run_cdc_server | 2 - .../conf/changefeed.toml | 3 + .../conf/diff_config.toml | 29 +++++++ .../conf/workload | 16 ++++ .../data/prepare.sql | 13 +++ .../run.sh | 84 +++++++++++++++++++ 15 files changed, 174 insertions(+), 54 deletions(-) create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh diff --git a/cdc/processor/sourcemanager/engine/factory/pebble.go b/cdc/processor/sourcemanager/engine/factory/pebble.go index 6b8bdf39b9f..9bf29663bdb 100644 --- a/cdc/processor/sourcemanager/engine/factory/pebble.go +++ b/cdc/processor/sourcemanager/engine/factory/pebble.go @@ -34,6 +34,8 @@ func createPebbleDBs( dbs := make([]*pebble.DB, 0, cfg.Count) writeStalls := make([]writeStall, cfg.Count) + cache := pebble.NewCache(int64(memQuotaInBytes)) + defer cache.Unref() for id := 0; id < cfg.Count; id++ { ws := writeStalls[id] adjust := func(opts *pebble.Options) { @@ -57,7 +59,7 @@ func createPebbleDBs( } } - db, err := epebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust) + db, err := epebble.OpenPebble(id, dir, cfg, cache, adjust) if err != nil { log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err)) for _, db := range dbs { @@ -67,7 +69,7 @@ func createPebbleDBs( } log.Info("create pebble instance success", zap.Int("id", id+1), - zap.Uint64("cacheSize", memQuotaInBytes/uint64(cfg.Count))) + zap.Uint64("sharedCacheSize", memQuotaInBytes)) dbs = append(dbs, db) } return dbs, writeStalls, nil diff --git a/cdc/processor/sourcemanager/engine/pebble/db.go b/cdc/processor/sourcemanager/engine/pebble/db.go index b299ecbd8e0..695fc210df6 100644 --- a/cdc/processor/sourcemanager/engine/pebble/db.go +++ b/cdc/processor/sourcemanager/engine/pebble/db.go @@ -89,7 +89,7 @@ func iterTable( // OpenPebble opens a pebble. func OpenPebble( id int, path string, cfg *config.DBConfig, - cacheSize uint64, + cache *pebble.Cache, adjusts ...func(*pebble.Options), ) (db *pebble.DB, err error) { dbDir := filepath.Join(path, fmt.Sprintf("%04d", id)) @@ -99,11 +99,7 @@ func OpenPebble( } opts := buildPebbleOption(cfg) - if cacheSize > 0 { - opts.Cache = pebble.NewCache(int64(cacheSize)) - defer opts.Cache.Unref() - } - + opts.Cache = cache for _, adjust := range adjusts { adjust(opts) } diff --git a/cdc/processor/sourcemanager/engine/pebble/db_test.go b/cdc/processor/sourcemanager/engine/pebble/db_test.go index 16fbcc938a1..af65f2d331d 100644 --- a/cdc/processor/sourcemanager/engine/pebble/db_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/db_test.go @@ -30,7 +30,7 @@ func TestIteratorWithTableFilter(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) db, err := OpenPebble( 1, dbPath, &config.DBConfig{Count: 1}, - 1024*1024*10, + nil, // Disable auto compactions to make the case more stable. func(opts *pebble.Options) { opts.DisableAutomaticCompactions = true }, ) diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go index c199303a734..a50abd6bd98 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go @@ -28,7 +28,7 @@ import ( func TestTableOperations(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -50,7 +50,7 @@ func TestTableOperations(t *testing.T) { // TestNoResolvedTs tests resolved timestamps shouldn't be emitted. func TestNoResolvedTs(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -80,7 +80,7 @@ func TestNoResolvedTs(t *testing.T) { // TestEventFetch tests events can be sorted correctly. func TestEventFetch(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -161,7 +161,7 @@ func TestEventFetch(t *testing.T) { func TestCleanData(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() diff --git a/cdc/server/server.go b/cdc/server/server.go index d26fb0db923..ee8ffe22f99 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -255,12 +255,7 @@ func (s *server) startActorSystems(ctx context.Context) error { sortDir := config.GetGlobalServerConfig().Sorter.SortDir if s.useEventSortEngine { - totalMemory, err := util.GetMemoryLimit() - if err != nil { - return errors.Trace(err) - } - memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100 - memInBytes := uint64(float64(totalMemory) * memPercentage) + memInBytes := conf.Sorter.CacheSizeInMB * (1 << 20) if config.GetGlobalServerConfig().Debug.EnableDBSorter { s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB) } else { diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index ca020feb946..d0b8aa26775 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -80,23 +80,7 @@ func (o *options) addFlags(cmd *cobra.Command) { cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(o.serverConfig.ProcessorFlushInterval), "processor flushes task status interval") _ = cmd.Flags().MarkHidden("processor-flush-interval") - // sorter related parameters, hidden them since cannot be configured by TiUP easily. - cmd.Flags().IntVar(&o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size") - _ = cmd.Flags().MarkHidden("sorter-num-workerpool-goroutine") - - cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level") - _ = cmd.Flags().MarkHidden("sorter-num-concurrent-worker") - - cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting") - _ = cmd.Flags().MarkHidden("sorter-chunk-size-limit") - // 80 is safe on most systems. - cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPercentage, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPercentage, "system memory usage threshold for forcing in-disk sort") - _ = cmd.Flags().MarkHidden("sorter-max-memory-percentage") - // We use 8GB as a safe default before we support local configuration file. - cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort") - _ = cmd.Flags().MarkHidden("sorter-max-memory-consumption") - // sort-dir id deprecate, hidden it. cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory") _ = cmd.Flags().MarkHidden("sort-dir") @@ -207,16 +191,6 @@ func (o *options) complete(cmd *cobra.Command) error { cfg.OwnerFlushInterval = o.serverConfig.OwnerFlushInterval case "processor-flush-interval": cfg.ProcessorFlushInterval = o.serverConfig.ProcessorFlushInterval - case "sorter-num-workerpool-goroutine": - cfg.Sorter.NumWorkerPoolGoroutine = o.serverConfig.Sorter.NumWorkerPoolGoroutine - case "sorter-num-concurrent-worker": - cfg.Sorter.NumConcurrentWorker = o.serverConfig.Sorter.NumConcurrentWorker - case "sorter-chunk-size-limit": - cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit - case "sorter-max-memory-percentage": - cfg.Sorter.MaxMemoryPercentage = o.serverConfig.Sorter.MaxMemoryPercentage - case "sorter-max-memory-consumption": - cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption case "ca": cfg.Security.CAPath = o.serverConfig.Security.CAPath case "cert": diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 2dfa019fe76..029a6984279 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -85,7 +85,8 @@ const ( "processor-flush-interval": 50000000, "sorter": { "sort-dir": "/tmp/sorter", - "max-memory-percentage": 0, + "cache-size-in-mb": 128, + "max-memory-percentage": 10, "num-concurrent-worker": 4, "chunk-size-limit": 999, "max-memory-consumption": 17179869184, diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 01fcdda0e05..2d327103032 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -111,14 +111,14 @@ var defaultServerConfig = &ServerConfig{ OwnerFlushInterval: TomlDuration(50 * time.Millisecond), ProcessorFlushInterval: TomlDuration(50 * time.Millisecond), Sorter: &SorterConfig{ + SortDir: DefaultSortDir, + CacheSizeInMB: 128, // By default use 128M memory as sorter cache. + MaxMemoryPercentage: 10, // Only for unified sorter. + NumConcurrentWorker: 4, ChunkSizeLimit: 128 * 1024 * 1024, // 128MB MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16GB NumWorkerPoolGoroutine: 16, - // Disable block-cache by default. TiCDC only scans events instead of - // accessing them randomly, so block-cache is unnecessary. - MaxMemoryPercentage: 0, - SortDir: DefaultSortDir, }, Security: &SecurityConfig{}, PerTableMemoryQuota: DefaultTableMemoryQuota, diff --git a/pkg/config/sorter.go b/pkg/config/sorter.go index 2c39b0359ff..58e47ea2bb2 100644 --- a/pkg/config/sorter.go +++ b/pkg/config/sorter.go @@ -13,13 +13,20 @@ package config -import "github.com/pingcap/tiflow/pkg/errors" +import ( + "math" + + "github.com/pingcap/tiflow/pkg/errors" +) // SorterConfig represents sorter config for a changefeed type SorterConfig struct { // the directory used to store the temporary files generated by the sorter SortDir string `toml:"sort-dir" json:"sort-dir"` + // Cache size of sorter in MB. + CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"` // the maximum memory use percentage that allows in-memory sorting + // Deprecated: use CacheSizeInMB instead. MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"` // number of concurrent heap sorts @@ -58,6 +65,8 @@ func (c *SorterConfig) ValidateAndAdjust() error { return errors.ErrIllegalSorterParameter.GenWithStackByArgs( "max-memory-percentage should be a percentage and within (0, 80]") } - + if c.CacheSizeInMB < 8 || c.CacheSizeInMB*uint64(1<<20) > uint64(math.MaxInt64) { + return errors.ErrIllegalSorterParameter.GenWithStackByArgs("cache-size-in-mb should be greater than 8(MB)") + } return nil } diff --git a/tests/integration_tests/_utils/run_cdc_server b/tests/integration_tests/_utils/run_cdc_server index d0ea0f3b82c..6b450f08175 100755 --- a/tests/integration_tests/_utils/run_cdc_server +++ b/tests/integration_tests/_utils/run_cdc_server @@ -120,7 +120,6 @@ if [[ "$restart" == "true" ]]; then GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ - --sorter-num-workerpool-goroutine 4 \ --data-dir "$data_dir" \ --cluster-id "$cluster_id" \ $config_path \ @@ -137,7 +136,6 @@ else GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ - --sorter-num-workerpool-goroutine 4 \ --data-dir "$data_dir" \ --cluster-id "$cluster_id" \ $config_path \ diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml new file mode 100644 index 00000000000..14e7fd80c09 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml @@ -0,0 +1,3 @@ +[consistent] +level = "eventual" +storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/redo" diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml new file mode 100644 index 00000000000..1fab13aa043 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["consistent_replicate_storage_file_large_value.usertable*","consistent_replicate_storage_file_large_value.t*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload new file mode 100644 index 00000000000..144a0e04a53 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload @@ -0,0 +1,16 @@ +threadcount=10 +recordcount=100 +operationcount=0 +workload=core +fieldcount=1000 +fieldlengthdistribution=constant +fieldlength=15000 + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql b/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql new file mode 100644 index 00000000000..93dc7cd7a1c --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql @@ -0,0 +1,13 @@ +use `consistent_replicate_storage_file_large_value`; +set @@global.tidb_enable_exchange_partition=on; + +create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21)); +insert into t1 values (1),(2),(3),(4),(5),(6); +insert into t1 values (7),(8),(9); +insert into t1 values (11),(12),(20); +alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40)); +insert into t1 values (25),(29),(35); /*these values in p3,p4*/ + +create table t2 (a int primary key); + + diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh b/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh new file mode 100644 index 00000000000..591db8bfb38 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh @@ -0,0 +1,84 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +rm -rf "$WORK_DIR" +mkdir -p "$WORK_DIR" + +stop() { + # to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase + echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_storage_file_large_value.usertable;") + stop_tidb_cluster +} + +function run() { + # we only support eventually consistent replication with MySQL sink + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="mysql://normal:123456@127.0.0.1:3306/" + changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + + run_sql "CREATE DATABASE consistent_replicate_storage_file_large_value;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_storage_file_large_value + run_sql "CREATE table consistent_replicate_storage_file_large_value.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "consistent_replicate_storage_file_large_value.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "consistent_replicate_storage_file_large_value.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "consistent_replicate_storage_file_large_value.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + # Inject the failpoint to prevent sink execution, but the global resolved can be moved forward. + # Then we can apply redo log to reach an eventual consistent state in downstream. + cleanup_process $CDC_BINARY + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_sql "create table consistent_replicate_storage_file_large_value.usertable2 like consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE consistent_replicate_storage_file_large_value.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file_large_value.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.usertable2 select * from consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # to ensure row changed events have been replicated to TiCDC + sleep 20 + + storage_path="file://$WORK_DIR/redo" + tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id + current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta + cleanup_process $CDC_BINARY + + export GO_FAILPOINTS='' + + # This value is generated by: + # echo -n '123456' | base64 + # MTIzNDU2 + # Use this value here to test redo apply function works well + # when use base64 encoded password + ENPASSWORD="MTIzNDU2" + + cdc redo apply --tmp-dir="$tmp_download_path/apply" \ + --storage="$storage_path" \ + --sink-uri="mysql://normal:${ENPASSWORD}@127.0.0.1:3306/" + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml +} + +trap stop EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"