From c37854d6515e320b6aae0100c1b0e3882d699d7a Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 24 Jun 2021 15:48:02 +0800 Subject: [PATCH] kv/client: add incremental scan region count limit (#1899) Note #1926 has picked part of #1899, this PR picks the remaining change --- cdc/kv/client.go | 7 +++---- cdc/processor/pipeline/puller.go | 1 + cmd/server_test.go | 3 +++ pkg/config/config.go | 11 +++++++++++ pkg/config/config_test.go | 4 ++-- pkg/config/kvclient.go | 6 +++++- 6 files changed, 25 insertions(+), 7 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 1030a75cfb4..a84bbeffe98 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/retry" @@ -68,9 +69,6 @@ const ( // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we // don't need to force reload region any more. regionScheduleReload = false - - // defines the scan region limit for each table - regionScanLimitPerTable = 6 ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -559,13 +557,14 @@ func newEventFeedSession( eventCh chan<- *model.RegionFeedEvent, ) *eventFeedSession { id := strconv.FormatUint(allocID(), 10) + kvClientCfg := config.GetGlobalServerConfig().KVClient return &eventFeedSession{ client: client, regionCache: regionCache, kvStorage: kvStorage, totalSpan: totalSpan, eventCh: eventCh, - regionRouter: NewSizedRegionRouter(ctx, regionScanLimitPerTable), + regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit), regionCh: make(chan singleRegionInfo, 16), errCh: make(chan regionErrorInfo, 16), requestRangeCh: make(chan rangeRequestTask, 16), diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index db5d6a9b419..58713947867 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -68,6 +68,7 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error { config := ctx.ChangefeedVars().Info.Config ctxC, cancel := context.WithCancel(ctx) ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName) + ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage, n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue) n.wg.Go(func() error { diff --git a/cmd/server_test.go b/cmd/server_test.go index db8c09720c8..e4cd7ea2371 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -128,6 +128,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { KVClient: &config.KVClientConfig{ WorkerConcurrent: 8, WorkerPoolSize: 0, + RegionScanLimit: 6, }, }) @@ -198,6 +199,7 @@ sort-dir = "/tmp/just_a_test" KVClient: &config.KVClientConfig{ WorkerConcurrent: 8, WorkerPoolSize: 0, + RegionScanLimit: 6, }, }) @@ -263,6 +265,7 @@ cert-allowed-cn = ["dd","ee"] KVClient: &config.KVClientConfig{ WorkerConcurrent: 8, WorkerPoolSize: 0, + RegionScanLimit: 6, }, }) } diff --git a/pkg/config/config.go b/pkg/config/config.go index 473dd8910fa..3d04ef4bb84 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -188,6 +188,7 @@ var defaultServerConfig = &ServerConfig{ KVClient: &KVClientConfig{ WorkerConcurrent: 8, WorkerPoolSize: 0, // 0 will use NumCPU() * 2 + RegionScanLimit: 6, }, } @@ -324,6 +325,16 @@ func (c *ServerConfig) ValidateAndAdjust() error { return cerror.ErrInvalidServerOption.GenWithStackByArgs("per-table-memory-quota should be at least 6MB") } + if c.KVClient == nil { + c.KVClient = defaultServerConfig.KVClient + } + if c.KVClient.WorkerConcurrent <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1") + } + if c.KVClient.RegionScanLimit <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1") + } + return nil } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index bf441f3f370..ddc8fe0e028 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) { conf.Sorter.ChunkSizeLimit = 999 b, err := conf.Marshal() c.Assert(err, check.IsNil) - c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`) + c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":6}}`) conf2 := new(ServerConfig) - err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`)) + err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":6}}`)) c.Assert(err, check.IsNil) c.Assert(conf2, check.DeepEquals, conf) } diff --git a/pkg/config/kvclient.go b/pkg/config/kvclient.go index 9a3a6b6ff0b..0df0261c367 100644 --- a/pkg/config/kvclient.go +++ b/pkg/config/kvclient.go @@ -15,6 +15,10 @@ package config // KVClientConfig represents config for kv client type KVClientConfig struct { + // how many workers will be used for a single region worker WorkerConcurrent int `toml:"worker-concurrent" json:"worker-concurrent"` - WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"` + // background workerpool size, the workrpool is shared by all goroutines in cdc server + WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"` + // region incremental scan limit for one table in a single store + RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"` }