Skip to content

Commit

Permalink
sorter(cdc): add config cache-size (#9024)
Browse files Browse the repository at this point in the history
ref #8974

Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed May 30, 2023
1 parent 2b12089 commit e4070dd
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 50 deletions.
6 changes: 4 additions & 2 deletions cdc/processor/sourcemanager/engine/factory/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func createPebbleDBs(
dbs := make([]*pebble.DB, 0, cfg.Count)
writeStalls := make([]writeStall, cfg.Count)

cache := pebble.NewCache(int64(memQuotaInBytes))
defer cache.Unref()
for id := 0; id < cfg.Count; id++ {
ws := writeStalls[id]
adjust := func(opts *pebble.Options) {
Expand All @@ -57,7 +59,7 @@ func createPebbleDBs(
}
}

db, err := epebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust)
db, err := epebble.OpenPebble(id, dir, cfg, cache, adjust)
if err != nil {
log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err))
for _, db := range dbs {
Expand All @@ -67,7 +69,7 @@ func createPebbleDBs(
}
log.Info("create pebble instance success",
zap.Int("id", id+1),
zap.Uint64("cacheSize", memQuotaInBytes/uint64(cfg.Count)))
zap.Uint64("sharedCacheSize", memQuotaInBytes))
dbs = append(dbs, db)
}
return dbs, writeStalls, nil
Expand Down
8 changes: 2 additions & 6 deletions cdc/processor/sourcemanager/engine/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func iterTable(
// OpenPebble opens a pebble.
func OpenPebble(
id int, path string, cfg *config.DBConfig,
cacheSize uint64,
cache *pebble.Cache,
adjusts ...func(*pebble.Options),
) (db *pebble.DB, err error) {
dbDir := filepath.Join(path, fmt.Sprintf("%04d", id))
Expand All @@ -99,11 +99,7 @@ func OpenPebble(
}

opts := buildPebbleOption(cfg)
if cacheSize > 0 {
opts.Cache = pebble.NewCache(int64(cacheSize))
defer opts.Cache.Unref()
}

opts.Cache = cache
for _, adjust := range adjusts {
adjust(opts)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sourcemanager/engine/pebble/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestIteratorWithTableFilter(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(
1, dbPath, &config.DBConfig{Count: 1},
1024*1024*10,
nil,
// Disable auto compactions to make the case more stable.
func(opts *pebble.Options) { opts.DisableAutomaticCompactions = true },
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func TestTableOperations(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand All @@ -50,7 +50,7 @@ func TestTableOperations(t *testing.T) {
// TestNoResolvedTs tests resolved timestamps shouldn't be emitted.
func TestNoResolvedTs(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down Expand Up @@ -80,7 +80,7 @@ func TestNoResolvedTs(t *testing.T) {
// TestEventFetch tests events can be sorted correctly.
func TestEventFetch(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down Expand Up @@ -161,7 +161,7 @@ func TestEventFetch(t *testing.T) {

func TestCleanData(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down
3 changes: 1 addition & 2 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ func (s *server) startActorSystems(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100
memInBytes := uint64(float64(totalMemory) * memPercentage)
memInBytes := conf.Sorter.CacheSizeInMB * (1 << 20)
if config.GetGlobalServerConfig().Debug.EnableDBSorter {
s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB)
} else {
Expand Down
26 changes: 0 additions & 26 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,7 @@ func (o *options) addFlags(cmd *cobra.Command) {
cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(o.serverConfig.ProcessorFlushInterval), "processor flushes task status interval")
_ = cmd.Flags().MarkHidden("processor-flush-interval")

// sorter related parameters, hidden them since cannot be configured by TiUP easily.
cmd.Flags().IntVar(&o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size")
_ = cmd.Flags().MarkHidden("sorter-num-workerpool-goroutine")

cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level")
_ = cmd.Flags().MarkHidden("sorter-num-concurrent-worker")

cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting")
_ = cmd.Flags().MarkHidden("sorter-chunk-size-limit")

// 80 is safe on most systems.
cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPercentage, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPercentage, "system memory usage threshold for forcing in-disk sort")
_ = cmd.Flags().MarkHidden("sorter-max-memory-percentage")
// We use 8GB as a safe default before we support local configuration file.
cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort")
_ = cmd.Flags().MarkHidden("sorter-max-memory-consumption")

// sort-dir id deprecate, hidden it.
cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory")
_ = cmd.Flags().MarkHidden("sort-dir")
Expand Down Expand Up @@ -207,16 +191,6 @@ func (o *options) complete(cmd *cobra.Command) error {
cfg.OwnerFlushInterval = o.serverConfig.OwnerFlushInterval
case "processor-flush-interval":
cfg.ProcessorFlushInterval = o.serverConfig.ProcessorFlushInterval
case "sorter-num-workerpool-goroutine":
cfg.Sorter.NumWorkerPoolGoroutine = o.serverConfig.Sorter.NumWorkerPoolGoroutine
case "sorter-num-concurrent-worker":
cfg.Sorter.NumConcurrentWorker = o.serverConfig.Sorter.NumConcurrentWorker
case "sorter-chunk-size-limit":
cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit
case "sorter-max-memory-percentage":
cfg.Sorter.MaxMemoryPercentage = o.serverConfig.Sorter.MaxMemoryPercentage
case "sorter-max-memory-consumption":
cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption
case "ca":
cfg.Security.CAPath = o.serverConfig.Security.CAPath
case "cert":
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ const (
"processor-flush-interval": 50000000,
"sorter": {
"sort-dir": "/tmp/sorter",
"max-memory-percentage": 0,
"cache-size-in-mb": 128,
"max-memory-percentage": 10,
"num-concurrent-worker": 4,
"chunk-size-limit": 999,
"max-memory-consumption": 17179869184,
Expand Down
8 changes: 4 additions & 4 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ var defaultServerConfig = &ServerConfig{
OwnerFlushInterval: TomlDuration(50 * time.Millisecond),
ProcessorFlushInterval: TomlDuration(50 * time.Millisecond),
Sorter: &SorterConfig{
SortDir: DefaultSortDir,
CacheSizeInMB: 128, // By default use 128M memory as sorter cache.
MaxMemoryPercentage: 10, // Deprecated.

NumConcurrentWorker: 4,
ChunkSizeLimit: 128 * 1024 * 1024, // 128MB
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16GB
NumWorkerPoolGoroutine: 16,
// Disable block-cache by default. TiCDC only scans events instead of
// accessing them randomly, so block-cache is unnecessary.
MaxMemoryPercentage: 0,
SortDir: DefaultSortDir,
},
Security: &SecurityConfig{},
PerTableMemoryQuota: DefaultTableMemoryQuota,
Expand Down
13 changes: 11 additions & 2 deletions pkg/config/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@

package config

import "github.com/pingcap/tiflow/pkg/errors"
import (
"math"

"github.com/pingcap/tiflow/pkg/errors"
)

// SorterConfig represents sorter config for a changefeed
type SorterConfig struct {
// the directory used to store the temporary files generated by the sorter
SortDir string `toml:"sort-dir" json:"sort-dir"`
// Cache size of sorter in MB.
CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"`
// the maximum memory use percentage that allows in-memory sorting
// Deprecated: use CacheSizeInMB instead.
MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`

// number of concurrent heap sorts
Expand Down Expand Up @@ -57,7 +64,9 @@ func (c *SorterConfig) ValidateAndAdjust() error {
if c.MaxMemoryPercentage < 0 || c.MaxMemoryPercentage > 80 {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs(
"max-memory-percentage should be a percentage and within (0, 80]")
}
if c.CacheSizeInMB < 8 || c.CacheSizeInMB*uint64(1<<20) > uint64(math.MaxInt64) {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs("cache-size-in-mb should be greater than 8(MB)")
}

return nil
}
2 changes: 0 additions & 2 deletions tests/integration_tests/_utils/run_cdc_server
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ if [[ "$restart" == "true" ]]; then
GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \
--log-file $workdir/cdc$logsuffix.log \
--log-level $log_level \
--sorter-num-workerpool-goroutine 4 \
--data-dir "$data_dir" \
--cluster-id "$cluster_id" \
$config_path \
Expand All @@ -137,7 +136,6 @@ else
GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \
--log-file $workdir/cdc$logsuffix.log \
--log-level $log_level \
--sorter-num-workerpool-goroutine 4 \
--data-dir "$data_dir" \
--cluster-id "$cluster_id" \
$config_path \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[consistent]
level = "eventual"
storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/redo"
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["consistent_replicate_storage_file_large_value.usertable*","consistent_replicate_storage_file_large_value.t*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
threadcount=10
recordcount=100
operationcount=0
workload=core
fieldcount=1000
fieldlengthdistribution=constant
fieldlength=15000

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use `consistent_replicate_storage_file_large_value`;
set @@global.tidb_enable_exchange_partition=on;

create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3,p4*/

create table t2 (a int primary key);


Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

rm -rf "$WORK_DIR"
mkdir -p "$WORK_DIR"

stop() {
# to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase
echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_storage_file_large_value.usertable;")
stop_tidb_cluster
}

function run() {
# we only support eventually consistent replication with MySQL sink
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR
run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

SINK_URI="mysql://normal:123456@127.0.0.1:3306/"
changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')

run_sql "CREATE DATABASE consistent_replicate_storage_file_large_value;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_storage_file_large_value
run_sql "CREATE table consistent_replicate_storage_file_large_value.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "consistent_replicate_storage_file_large_value.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "consistent_replicate_storage_file_large_value.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_table_exists "consistent_replicate_storage_file_large_value.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_storage_file_large_value.usertable2 like consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_file_large_value.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file_large_value.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_file_large_value.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_file_large_value.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_storage_file_large_value.usertable2 select * from consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# to ensure row changed events have been replicated to TiCDC
sleep 20

storage_path="file://$WORK_DIR/redo"
tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id
current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta
cleanup_process $CDC_BINARY

export GO_FAILPOINTS=''

# This value is generated by:
# echo -n '123456' | base64
# MTIzNDU2
# Use this value here to test redo apply function works well
# when use base64 encoded password
ENPASSWORD="MTIzNDU2"

cdc redo apply --tmp-dir="$tmp_download_path/apply" \
--storage="$storage_path" \
--sink-uri="mysql://normal:${ENPASSWORD}@127.0.0.1:3306/"
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
}

trap stop EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit e4070dd

Please sign in to comment.