Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sorter(cdc): add config cache-size (#9024) #9030

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cdc/processor/sourcemanager/engine/factory/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func createPebbleDBs(
dbs := make([]*pebble.DB, 0, cfg.Count)
writeStalls := make([]writeStall, cfg.Count)

cache := pebble.NewCache(int64(memQuotaInBytes))
defer cache.Unref()
for id := 0; id < cfg.Count; id++ {
ws := writeStalls[id]
adjust := func(opts *pebble.Options) {
Expand All @@ -57,7 +59,7 @@ func createPebbleDBs(
}
}

db, err := epebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust)
db, err := epebble.OpenPebble(id, dir, cfg, cache, adjust)
if err != nil {
log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err))
for _, db := range dbs {
Expand All @@ -67,7 +69,7 @@ func createPebbleDBs(
}
log.Info("create pebble instance success",
zap.Int("id", id+1),
zap.Uint64("cacheSize", memQuotaInBytes/uint64(cfg.Count)))
zap.Uint64("sharedCacheSize", memQuotaInBytes))
dbs = append(dbs, db)
}
return dbs, writeStalls, nil
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/sourcemanager/engine/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func iterTable(
// OpenPebble opens a pebble.
func OpenPebble(
id int, path string, cfg *config.DBConfig,
cacheSize uint64,
cache *pebble.Cache,
adjusts ...func(*pebble.Options),
) (db *pebble.DB, err error) {
dbDir := filepath.Join(path, fmt.Sprintf("%04d", id))
Expand All @@ -99,11 +99,7 @@ func OpenPebble(
}

opts := buildPebbleOption(cfg)
if cacheSize > 0 {
opts.Cache = pebble.NewCache(int64(cacheSize))
defer opts.Cache.Unref()
}

opts.Cache = cache
for _, adjust := range adjusts {
adjust(opts)
}
Expand Down Expand Up @@ -136,10 +132,14 @@ func buildPebbleOption(cfg *config.DBConfig) (opts *pebble.Options) {
l.IndexBlockSize = 256 << 10 // 256 KB
l.FilterPolicy = bloom.FilterPolicy(10)
l.FilterType = pebble.TableFilter
if i == 0 {
l.TargetFileSize = 8 << 20 // 8 MB
} else if i < 4 {
l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2
// 8M is large enough because generally Sorter won't carry too much data.
// Avoiding large targe file is helpful to reduce write-amplification.
l.TargetFileSize = 8 << 20 // 8 MB
switch cfg.Compression {
case "none":
l.Compression = pebble.NoCompression
case "snappy":
l.Compression = pebble.SnappyCompression
}
l.EnsureDefaults()
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sourcemanager/engine/pebble/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestIteratorWithTableFilter(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(
1, dbPath, &config.DBConfig{Count: 1},
1024*1024*10,
nil,
// Disable auto compactions to make the case more stable.
func(opts *pebble.Options) { opts.DisableAutomaticCompactions = true },
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func TestTableOperations(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand All @@ -50,7 +50,7 @@ func TestTableOperations(t *testing.T) {
// TestNoResolvedTs tests resolved timestamps shouldn't be emitted.
func TestNoResolvedTs(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down Expand Up @@ -80,7 +80,7 @@ func TestNoResolvedTs(t *testing.T) {
// TestEventFetch tests events can be sorted correctly.
func TestEventFetch(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down Expand Up @@ -161,7 +161,7 @@ func TestEventFetch(t *testing.T) {

func TestCleanData(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down
7 changes: 1 addition & 6 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,7 @@ func (s *server) startActorSystems(ctx context.Context) error {
sortDir := config.GetGlobalServerConfig().Sorter.SortDir

if s.useEventSortEngine {
totalMemory, err := util.GetMemoryLimit()
if err != nil {
return errors.Trace(err)
}
memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100
memInBytes := uint64(float64(totalMemory) * memPercentage)
memInBytes := conf.Sorter.CacheSizeInMB * (1 << 20)
if config.GetGlobalServerConfig().Debug.EnableDBSorter {
s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB)
} else {
Expand Down
26 changes: 0 additions & 26 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,7 @@ func (o *options) addFlags(cmd *cobra.Command) {
cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(o.serverConfig.ProcessorFlushInterval), "processor flushes task status interval")
_ = cmd.Flags().MarkHidden("processor-flush-interval")

// sorter related parameters, hidden them since cannot be configured by TiUP easily.
cmd.Flags().IntVar(&o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size")
_ = cmd.Flags().MarkHidden("sorter-num-workerpool-goroutine")

cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level")
_ = cmd.Flags().MarkHidden("sorter-num-concurrent-worker")

cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting")
_ = cmd.Flags().MarkHidden("sorter-chunk-size-limit")

// 80 is safe on most systems.
cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPercentage, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPercentage, "system memory usage threshold for forcing in-disk sort")
_ = cmd.Flags().MarkHidden("sorter-max-memory-percentage")
// We use 8GB as a safe default before we support local configuration file.
cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort")
_ = cmd.Flags().MarkHidden("sorter-max-memory-consumption")

// sort-dir id deprecate, hidden it.
cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory")
_ = cmd.Flags().MarkHidden("sort-dir")
Expand Down Expand Up @@ -207,16 +191,6 @@ func (o *options) complete(cmd *cobra.Command) error {
cfg.OwnerFlushInterval = o.serverConfig.OwnerFlushInterval
case "processor-flush-interval":
cfg.ProcessorFlushInterval = o.serverConfig.ProcessorFlushInterval
case "sorter-num-workerpool-goroutine":
cfg.Sorter.NumWorkerPoolGoroutine = o.serverConfig.Sorter.NumWorkerPoolGoroutine
case "sorter-num-concurrent-worker":
cfg.Sorter.NumConcurrentWorker = o.serverConfig.Sorter.NumConcurrentWorker
case "sorter-chunk-size-limit":
cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit
case "sorter-max-memory-percentage":
cfg.Sorter.MaxMemoryPercentage = o.serverConfig.Sorter.MaxMemoryPercentage
case "sorter-max-memory-consumption":
cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption
case "ca":
cfg.Security.CAPath = o.serverConfig.Security.CAPath
case "cert":
Expand Down
59 changes: 23 additions & 36 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ func TestParseCfg(t *testing.T) {
"--cert", "bb",
"--key", "cc",
"--cert-allowed-cn", "dd,ee",
"--sorter-chunk-size-limit", "50000000",
"--sorter-max-memory-consumption", "60000",
"--sorter-max-memory-percentage", "70",
"--sorter-num-concurrent-worker", "80",
"--sorter-num-workerpool-goroutine", "90",
"--sort-dir", "/tmp/just_a_test",
}))

Expand Down Expand Up @@ -156,12 +151,14 @@ func TestParseCfg(t *testing.T) {
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
NumConcurrentWorker: 80,
ChunkSizeLimit: 50000000,
MaxMemoryPercentage: 70,
MaxMemoryConsumption: 60000,
NumWorkerPoolGoroutine: 90,
SortDir: config.DefaultSortDir,
SortDir: config.DefaultSortDir,
CacheSizeInMB: 128,
MaxMemoryPercentage: 10,

NumConcurrentWorker: 4,
ChunkSizeLimit: 128 * 1024 * 1024,
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024,
NumWorkerPoolGoroutine: 16,
},
Security: &config.SecurityConfig{
CertPath: "bb",
Expand Down Expand Up @@ -248,11 +245,6 @@ max-days = 1
max-backups = 1

[sorter]
chunk-size-limit = 10000000
max-memory-consumption = 2000000
max-memory-percentage = 3
num-concurrent-worker = 4
num-workerpool-goroutine = 5
sort-dir = "/tmp/just_a_test"

[kv-client]
Expand Down Expand Up @@ -324,12 +316,14 @@ check-balance-interval = "10s"
OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(600 * time.Millisecond),
Sorter: &config.SorterConfig{
SortDir: config.DefaultSortDir,
CacheSizeInMB: 128,
MaxMemoryPercentage: 10,

NumConcurrentWorker: 4,
ChunkSizeLimit: 10000000,
MaxMemoryPercentage: 3,
MaxMemoryConsumption: 2000000,
NumWorkerPoolGoroutine: 5,
SortDir: config.DefaultSortDir,
ChunkSizeLimit: 128 * 1024 * 1024,
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024,
NumWorkerPoolGoroutine: 16,
},
Security: &config.SecurityConfig{},
PerTableMemoryQuota: config.DefaultTableMemoryQuota,
Expand Down Expand Up @@ -411,11 +405,6 @@ max-days = 1
max-backups = 1

[sorter]
chunk-size-limit = 10000000
max-memory-consumption = 2000000
max-memory-percentage = 3
num-concurrent-worker = 4
num-workerpool-goroutine = 5
sort-dir = "/tmp/just_a_test"

[security]
Expand All @@ -441,10 +430,6 @@ cert-allowed-cn = ["dd","ee"]
"--owner-flush-interval", "150ms",
"--processor-flush-interval", "150ms",
"--ca", "",
"--sorter-chunk-size-limit", "50000000",
"--sorter-max-memory-consumption", "60000000",
"--sorter-max-memory-percentage", "70",
"--sorter-num-concurrent-worker", "3",
"--config", configPath,
}))

Expand Down Expand Up @@ -472,12 +457,14 @@ cert-allowed-cn = ["dd","ee"]
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
NumConcurrentWorker: 3,
ChunkSizeLimit: 50000000,
MaxMemoryPercentage: 70,
MaxMemoryConsumption: 60000000,
NumWorkerPoolGoroutine: 5,
SortDir: config.DefaultSortDir,
SortDir: config.DefaultSortDir,
CacheSizeInMB: 128,
MaxMemoryPercentage: 10,

NumConcurrentWorker: 4,
ChunkSizeLimit: 128 * 1024 * 1024,
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024,
NumWorkerPoolGoroutine: 16,
},
Security: &config.SecurityConfig{
CertPath: "bb",
Expand Down
7 changes: 4 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ const (
"owner-flush-interval": 50000000,
"processor-flush-interval": 50000000,
"sorter": {
"sort-dir": "/tmp/sorter",
"cache-size-in-mb": 128,
"max-memory-percentage": 10,
"num-concurrent-worker": 4,
"chunk-size-limit": 999,
"max-memory-percentage": 10,
"max-memory-consumption": 17179869184,
"num-workerpool-goroutine": 16,
"sort-dir": "/tmp/sorter"
"num-workerpool-goroutine": 16
},
"security": {
"ca-path": "",
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ var defaultServerConfig = &ServerConfig{
OwnerFlushInterval: TomlDuration(50 * time.Millisecond),
ProcessorFlushInterval: TomlDuration(50 * time.Millisecond),
Sorter: &SorterConfig{
SortDir: DefaultSortDir,
CacheSizeInMB: 128, // By default use 128M memory as sorter cache.
MaxMemoryPercentage: 10, // Only for unified sorter.

NumConcurrentWorker: 4,
ChunkSizeLimit: 128 * 1024 * 1024, // 128MB
MaxMemoryPercentage: 10, // 10% is safe on machines with memory capacity <= 16GB
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16GB
NumWorkerPoolGoroutine: 16,
SortDir: DefaultSortDir,
},
Security: &SecurityConfig{},
PerTableMemoryQuota: DefaultTableMemoryQuota,
Expand Down
24 changes: 17 additions & 7 deletions pkg/config/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,30 @@

package config

import "github.com/pingcap/tiflow/pkg/errors"
import (
"math"

"github.com/pingcap/tiflow/pkg/errors"
)

// SorterConfig represents sorter config for a changefeed
type SorterConfig struct {
// the directory used to store the temporary files generated by the sorter
SortDir string `toml:"sort-dir" json:"sort-dir"`
// Cache size of sorter in MB.
CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"`
// the maximum memory use percentage that allows in-memory sorting
// Deprecated: use CacheSizeInMB instead.
MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`

// number of concurrent heap sorts
NumConcurrentWorker int `toml:"num-concurrent-worker" json:"num-concurrent-worker"`
// maximum size for a heap
ChunkSizeLimit uint64 `toml:"chunk-size-limit" json:"chunk-size-limit"`
// the maximum memory use percentage that allows in-memory sorting
MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`
// the maximum memory consumption allowed for in-memory sorting
MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"`
// the size of workerpool
NumWorkerPoolGoroutine int `toml:"num-workerpool-goroutine" json:"num-workerpool-goroutine"`
// the directory used to store the temporary files generated by the sorter
SortDir string `toml:"sort-dir" json:"sort-dir"`
}

// ValidateAndAdjust validates and adjusts the sorter configuration
Expand All @@ -53,10 +61,12 @@ func (c *SorterConfig) ValidateAndAdjust() error {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs(
"num-workerpool-goroutine should be at least 1, larger than 8 is recommended")
}
if c.MaxMemoryPercentage <= 0 || c.MaxMemoryPercentage > 80 {
if c.MaxMemoryPercentage < 0 || c.MaxMemoryPercentage > 80 {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs(
"max-memory-percentage should be a percentage and within (0, 80]")
}

if c.CacheSizeInMB < 8 || c.CacheSizeInMB*uint64(1<<20) > uint64(math.MaxInt64) {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs("cache-size-in-mb should be greater than 8(MB)")
}
return nil
}
2 changes: 0 additions & 2 deletions tests/integration_tests/_utils/run_cdc_server
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ if [[ "$restart" == "true" ]]; then
GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \
--log-file $workdir/cdc$logsuffix.log \
--log-level $log_level \
--sorter-num-workerpool-goroutine 4 \
--data-dir "$data_dir" \
--cluster-id "$cluster_id" \
$config_path \
Expand All @@ -137,7 +136,6 @@ else
GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \
--log-file $workdir/cdc$logsuffix.log \
--log-level $log_level \
--sorter-num-workerpool-goroutine 4 \
--data-dir "$data_dir" \
--cluster-id "$cluster_id" \
$config_path \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[consistent]
level = "eventual"
storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/redo"
Loading