diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 4a20728ac930..00de07467d9b 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -32,8 +32,6 @@ var ( ) const ( - initialRequestUnits = 10000 - bufferRUs = 2000 // movingAvgFactor is the weight applied to a new "sample" of RU usage (with one // sample per mainLoopUpdateInterval). // @@ -42,6 +40,7 @@ const ( // 0.5^(1 second / mainLoopUpdateInterval) movingAvgFactor = 0.5 notifyFraction = 0.1 + tokenReserveFraction = 0.8 consumptionsReportingThreshold = 100 extendedReportingPeriodFactor = 4 // defaultGroupCleanupInterval is the interval to clean up the deleted resource groups in memory. diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 2b721a159bbd..5acd0aab478b 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -33,9 +33,10 @@ import ( ) const ( - controllerConfigPath = "resource_group/controller" - maxRetry = 3 - maxNotificationChanLen = 200 + controllerConfigPath = "resource_group/controller" + maxRetry = 3 + maxNotificationChanLen = 200 + needTokensAmplification = 1.1 ) type selectType int @@ -188,6 +189,10 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { failpoint.Inject("fastCleanup", func() { cleanupTicker.Stop() cleanupTicker = time.NewTicker(100 * time.Millisecond) + // because of checking `gc.run.consumption` in cleanupTicker, + // so should also change the stateUpdateTicker. + stateUpdateTicker.Stop() + stateUpdateTicker = time.NewTicker(200 * time.Millisecond) }) for { @@ -366,8 +371,8 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex request := gc.collectRequestAndConsumption(typ) if request != nil { c.run.currentRequests = append(c.run.currentRequests, request) + gc.tokenRequestCounter.Inc() } - gc.tokenRequestCounter.Inc() return true }) if len(c.run.currentRequests) > 0 { @@ -380,6 +385,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, req := &rmpb.TokenBucketsRequest{ Requests: requests, TargetRequestPeriodMs: uint64(defaultTargetPeriod / time.Millisecond), + ClientUniqueId: c.clientUniqueID, } if c.config.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil { c.run.responseDeadline.Reset(c.config.DegradedModeWaitDuration) @@ -556,21 +562,26 @@ func newGroupCostController( func (gc *groupCostController) initRunState() { now := time.Now() gc.run.now = now - gc.run.lastRequestTime = now + gc.run.lastRequestTime = now.Add(-defaultTargetPeriod) gc.run.targetPeriod = defaultTargetPeriod gc.run.consumption = &rmpb.Consumption{} gc.run.lastRequestConsumption = &rmpb.Consumption{SqlLayerCpuTimeMs: getSQLProcessCPUTime(gc.mainCfg.isSingleGroupByKeyspace)} + isBurstable := true cfgFunc := func(tb *rmpb.TokenBucket) tokenBucketReconfigureArgs { + initialToken := float64(tb.Settings.FillRate) cfg := tokenBucketReconfigureArgs{ - NewTokens: initialRequestUnits, + NewTokens: initialToken, NewBurst: tb.Settings.BurstLimit, // This is to trigger token requests as soon as resource group start consuming tokens. - NotifyThreshold: math.Max(initialRequestUnits-float64(tb.Settings.FillRate)*0.2, 1), + NotifyThreshold: math.Max(initialToken*tokenReserveFraction, 1), } if cfg.NewBurst >= 0 { cfg.NewBurst = 0 } + if tb.Settings.BurstLimit >= 0 { + isBurstable = false + } return cfg } @@ -602,6 +613,7 @@ func (gc *groupCostController) initRunState() { gc.run.resourceTokens[typ] = counter } } + gc.burstable.Store(isBurstable) } // applyDegradedMode is used to apply degraded mode for resource group which is in low-process. @@ -711,9 +723,6 @@ func (gc *groupCostController) updateAvgRUPerSec() { func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool { deltaDuration := gc.run.now.Sub(counter.avgLastTime) - if deltaDuration <= 500*time.Millisecond { - return false - } delta := (new - counter.avgRUPerSecLastRU) / deltaDuration.Seconds() counter.avgRUPerSec = movingAvgFactor*counter.avgRUPerSec + (1-movingAvgFactor)*delta counter.avgLastTime = gc.run.now @@ -722,6 +731,9 @@ func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool } func (gc *groupCostController) shouldReportConsumption() bool { + if !gc.run.initialRequestCompleted { + return true + } timeSinceLastRequest := gc.run.now.Sub(gc.run.lastRequestTime) if timeSinceLastRequest >= defaultTargetPeriod { if timeSinceLastRequest >= extendedReportingPeriodFactor*defaultTargetPeriod { @@ -748,17 +760,7 @@ func (gc *groupCostController) shouldReportConsumption() bool { func (gc *groupCostController) handleTokenBucketResponse(resp *rmpb.TokenBucketResponse) { gc.run.requestInProgress = false gc.handleRespFunc(resp) - if !gc.run.initialRequestCompleted { - gc.run.initialRequestCompleted = true - // This is the first successful request. Take back the initial RUs that we - // used to pre-fill the bucket. - for _, counter := range gc.run.resourceTokens { - counter.limiter.RemoveTokens(gc.run.now, initialRequestUnits) - } - for _, counter := range gc.run.requestUnitTokens { - counter.limiter.RemoveTokens(gc.run.now, initialRequestUnits) - } - } + gc.run.initialRequestCompleted = true } func (gc *groupCostController) handleRawResourceTokenResponse(resp *rmpb.TokenBucketResponse) { @@ -833,11 +835,6 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket } initCounterNotify(counter) counter.inDegradedMode = false - notifyThreshold := granted * notifyFraction - if notifyThreshold < bufferRUs { - notifyThreshold = bufferRUs - } - var cfg tokenBucketReconfigureArgs cfg.NewBurst = bucket.GetSettings().GetBurstLimit() // when trickleTimeMs equals zero, server has enough tokens and does not need to @@ -845,8 +842,8 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket if trickleTimeMs == 0 { cfg.NewTokens = granted cfg.NewRate = float64(bucket.GetSettings().FillRate) - cfg.NotifyThreshold = notifyThreshold counter.lastDeadline = time.Time{} + cfg.NotifyThreshold = math.Min((granted+counter.limiter.AvailableTokens(gc.run.now)), counter.avgRUPerSec*float64(defaultTargetPeriod)) * notifyFraction // In the non-trickle case, clients can be allowed to accumulate more tokens. if cfg.NewBurst >= 0 { cfg.NewBurst = 0 @@ -865,7 +862,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket counter.notify.mu.Lock() counter.notify.setupNotificationTimer = time.NewTimer(timerDuration) counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C - counter.notify.setupNotificationThreshold = notifyThreshold + counter.notify.setupNotificationThreshold = 1 counter.notify.mu.Unlock() counter.lastDeadline = deadline select { @@ -958,7 +955,10 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType } func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { - value := counter.avgRUPerSec*gc.run.targetPeriod.Seconds() + bufferRUs + // `needTokensAmplification` is used to properly amplify a need. The reason is that in the current implementation, + // the token returned from the server determines the average consumption speed. + // Therefore, when the fillrate of resource group increases, `needTokensAmplification` can enable the client to obtain more tokens. + value := counter.avgRUPerSec * gc.run.targetPeriod.Seconds() * needTokensAmplification value -= counter.limiter.AvailableTokens(gc.run.now) if value < 0 { value = 0 diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index 3ccff5f6be01..21d8b111d445 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -263,7 +263,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)) + s.etcdClient, s.httpClient, err = etcdutil.CreateClientsWithMultiEndpoint(tlsConfig, []url.URL(u)) return err } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 758bbdcfba15..8f3d754c53a2 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -466,7 +466,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls) + s.etcdClient, s.httpClient, err = etcdutil.CreateClientsWithMultiEndpoint(tlsConfig, s.backendUrls) return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 0c7081d537ec..423b4f0c42d8 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -193,22 +193,30 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) } +// CreateClientsWithMultiEndpoint creates etcd v3 client and http client. +func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { + client, err := createEtcdClientWithMultiEndpoint(tlsConfig, acUrls) + if err != nil { + return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() + } + httpClient := createHTTPClient(tlsConfig) + return client, httpClient, nil +} + // CreateClients creates etcd v3 client and http client. -func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { +func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { client, err := createEtcdClient(tlsConfig, acUrls) if err != nil { return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } - httpClient := &http.Client{ - Transport: &http.Transport{ - DisableKeepAlives: true, - TLSClientConfig: tlsConfig, - }, - } + httpClient := createHTTPClient(tlsConfig) return client, httpClient, nil } -func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) { +// createEtcdClientWithMultiEndpoint creates etcd v3 client. +// Note: it will be used by micro service server and support multi etcd endpoints. +// FIXME: But it cannot switch etcd endpoints as soon as possible when one of endpoints is with io hang. +func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) { if len(acUrls) == 0 { return nil, errs.ErrNewEtcdClient.FastGenByArgs("no available etcd address") } @@ -244,6 +252,32 @@ func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client return client, err } +// createEtcdClient creates etcd v3 client. +// Note: it will be used by legacy pd-server, and only connect to leader only. +func createEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { + lgc := zap.NewProductionConfig() + lgc.Encoding = log.ZapEncodingName + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{acURL.String()}, + DialTimeout: defaultEtcdClientTimeout, + TLS: tlsConfig, + LogConfig: &lgc, + }) + if err == nil { + log.Info("create etcd v3 client", zap.String("endpoints", acURL.String())) + } + return client, err +} + +func createHTTPClient(tlsConfig *tls.Config) *http.Client { + return &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + TLSClientConfig: tlsConfig, + }, + } +} + // InitClusterID creates a cluster ID for the given key if it hasn't existed. // This function assumes the cluster ID has already existed and always use a // cheaper read to retrieve it; if it doesn't exist, invoke the more expensive diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 06ac61e264ea..ad0b277c8f0c 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -220,7 +220,7 @@ func TestEtcdClientSync(t *testing.T) { ep1 := cfg1.LCUrls[0].String() urls, err := types.NewURLs([]string{ep1}) re.NoError(err) - client1, err := createEtcdClient(nil, urls) + client1, err := createEtcdClientWithMultiEndpoint(nil, urls) re.NoError(err) <-etcd1.Server.ReadyNotify() @@ -277,7 +277,7 @@ func checkEtcdWithHangLeader(t *testing.T) error { // Create a etcd client with etcd1 as endpoint. urls, err := types.NewURLs([]string{proxyAddr}) re.NoError(err) - client1, err := createEtcdClient(nil, urls) + client1, err := createEtcdClientWithMultiEndpoint(nil, urls) re.NoError(err) // Add a new member and set the client endpoints to etcd1 and etcd2. diff --git a/server/server.go b/server/server.go index 08a6ad701ee2..aa5f026b650f 100644 --- a/server/server.go +++ b/server/server.go @@ -349,7 +349,7 @@ func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { if err != nil { return nil, nil, err } - return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls) + return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) } // AddStartCallback adds a callback in the startServer phase. diff --git a/tests/mcs/resource_manager/resource_manager_test.go b/tests/mcs/resource_manager/resource_manager_test.go index aa0a00fbd7bb..49efdde332f7 100644 --- a/tests/mcs/resource_manager/resource_manager_test.go +++ b/tests/mcs/resource_manager/resource_manager_test.go @@ -310,8 +310,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) - controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) + re.NoError(err) + _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + re.NoError(err) sum += time.Since(startTime) controller.OnResponse(cas.resourceGroupName, rreq, rres) controller.OnResponse(cas.resourceGroupName, wreq, wres) @@ -706,9 +708,17 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo rruTokensAtATime: 0, wruTokensAtATime: 10000, } + tc2 := tokenConsumptionPerSecond{ + rruTokensAtATime: 0, + wruTokensAtATime: 2, + } controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest()) time.Sleep(time.Second * 2) beginTime := time.Now() + // This is used to make sure resource group in lowRU. + for i := 0; i < 100; i++ { + controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest()) + } for i := 0; i < 100; i++ { controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest()) }