Skip to content

Commit

Permalink
Merge branch 'master' into more-tso-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Mar 21, 2023
2 parents e191ebb + af5b019 commit ca074a0
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 46 deletions.
3 changes: 1 addition & 2 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ var (
)

const (
initialRequestUnits = 10000
bufferRUs = 2000
// movingAvgFactor is the weight applied to a new "sample" of RU usage (with one
// sample per mainLoopUpdateInterval).
//
Expand All @@ -42,6 +40,7 @@ const (
// 0.5^(1 second / mainLoopUpdateInterval)
movingAvgFactor = 0.5
notifyFraction = 0.1
tokenReserveFraction = 0.8
consumptionsReportingThreshold = 100
extendedReportingPeriodFactor = 4
// defaultGroupCleanupInterval is the interval to clean up the deleted resource groups in memory.
Expand Down
58 changes: 29 additions & 29 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import (
)

const (
controllerConfigPath = "resource_group/controller"
maxRetry = 3
maxNotificationChanLen = 200
controllerConfigPath = "resource_group/controller"
maxRetry = 3
maxNotificationChanLen = 200
needTokensAmplification = 1.1
)

type selectType int
Expand Down Expand Up @@ -188,6 +189,10 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
failpoint.Inject("fastCleanup", func() {
cleanupTicker.Stop()
cleanupTicker = time.NewTicker(100 * time.Millisecond)
// because of checking `gc.run.consumption` in cleanupTicker,
// so should also change the stateUpdateTicker.
stateUpdateTicker.Stop()
stateUpdateTicker = time.NewTicker(200 * time.Millisecond)
})

for {
Expand Down Expand Up @@ -366,8 +371,8 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
request := gc.collectRequestAndConsumption(typ)
if request != nil {
c.run.currentRequests = append(c.run.currentRequests, request)
gc.tokenRequestCounter.Inc()
}
gc.tokenRequestCounter.Inc()
return true
})
if len(c.run.currentRequests) > 0 {
Expand All @@ -380,6 +385,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
req := &rmpb.TokenBucketsRequest{
Requests: requests,
TargetRequestPeriodMs: uint64(defaultTargetPeriod / time.Millisecond),
ClientUniqueId: c.clientUniqueID,
}
if c.config.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil {
c.run.responseDeadline.Reset(c.config.DegradedModeWaitDuration)
Expand Down Expand Up @@ -556,21 +562,26 @@ func newGroupCostController(
func (gc *groupCostController) initRunState() {
now := time.Now()
gc.run.now = now
gc.run.lastRequestTime = now
gc.run.lastRequestTime = now.Add(-defaultTargetPeriod)
gc.run.targetPeriod = defaultTargetPeriod
gc.run.consumption = &rmpb.Consumption{}
gc.run.lastRequestConsumption = &rmpb.Consumption{SqlLayerCpuTimeMs: getSQLProcessCPUTime(gc.mainCfg.isSingleGroupByKeyspace)}

isBurstable := true
cfgFunc := func(tb *rmpb.TokenBucket) tokenBucketReconfigureArgs {
initialToken := float64(tb.Settings.FillRate)
cfg := tokenBucketReconfigureArgs{
NewTokens: initialRequestUnits,
NewTokens: initialToken,
NewBurst: tb.Settings.BurstLimit,
// This is to trigger token requests as soon as resource group start consuming tokens.
NotifyThreshold: math.Max(initialRequestUnits-float64(tb.Settings.FillRate)*0.2, 1),
NotifyThreshold: math.Max(initialToken*tokenReserveFraction, 1),
}
if cfg.NewBurst >= 0 {
cfg.NewBurst = 0
}
if tb.Settings.BurstLimit >= 0 {
isBurstable = false
}
return cfg
}

Expand Down Expand Up @@ -602,6 +613,7 @@ func (gc *groupCostController) initRunState() {
gc.run.resourceTokens[typ] = counter
}
}
gc.burstable.Store(isBurstable)
}

// applyDegradedMode is used to apply degraded mode for resource group which is in low-process.
Expand Down Expand Up @@ -711,9 +723,6 @@ func (gc *groupCostController) updateAvgRUPerSec() {

func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool {
deltaDuration := gc.run.now.Sub(counter.avgLastTime)
if deltaDuration <= 500*time.Millisecond {
return false
}
delta := (new - counter.avgRUPerSecLastRU) / deltaDuration.Seconds()
counter.avgRUPerSec = movingAvgFactor*counter.avgRUPerSec + (1-movingAvgFactor)*delta
counter.avgLastTime = gc.run.now
Expand All @@ -722,6 +731,9 @@ func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool
}

func (gc *groupCostController) shouldReportConsumption() bool {
if !gc.run.initialRequestCompleted {
return true
}
timeSinceLastRequest := gc.run.now.Sub(gc.run.lastRequestTime)
if timeSinceLastRequest >= defaultTargetPeriod {
if timeSinceLastRequest >= extendedReportingPeriodFactor*defaultTargetPeriod {
Expand All @@ -748,17 +760,7 @@ func (gc *groupCostController) shouldReportConsumption() bool {
func (gc *groupCostController) handleTokenBucketResponse(resp *rmpb.TokenBucketResponse) {
gc.run.requestInProgress = false
gc.handleRespFunc(resp)
if !gc.run.initialRequestCompleted {
gc.run.initialRequestCompleted = true
// This is the first successful request. Take back the initial RUs that we
// used to pre-fill the bucket.
for _, counter := range gc.run.resourceTokens {
counter.limiter.RemoveTokens(gc.run.now, initialRequestUnits)
}
for _, counter := range gc.run.requestUnitTokens {
counter.limiter.RemoveTokens(gc.run.now, initialRequestUnits)
}
}
gc.run.initialRequestCompleted = true
}

func (gc *groupCostController) handleRawResourceTokenResponse(resp *rmpb.TokenBucketResponse) {
Expand Down Expand Up @@ -833,20 +835,15 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
}
initCounterNotify(counter)
counter.inDegradedMode = false
notifyThreshold := granted * notifyFraction
if notifyThreshold < bufferRUs {
notifyThreshold = bufferRUs
}

var cfg tokenBucketReconfigureArgs
cfg.NewBurst = bucket.GetSettings().GetBurstLimit()
// when trickleTimeMs equals zero, server has enough tokens and does not need to
// limit client consume token. So all token is granted to client right now.
if trickleTimeMs == 0 {
cfg.NewTokens = granted
cfg.NewRate = float64(bucket.GetSettings().FillRate)
cfg.NotifyThreshold = notifyThreshold
counter.lastDeadline = time.Time{}
cfg.NotifyThreshold = math.Min((granted+counter.limiter.AvailableTokens(gc.run.now)), counter.avgRUPerSec*float64(defaultTargetPeriod)) * notifyFraction
// In the non-trickle case, clients can be allowed to accumulate more tokens.
if cfg.NewBurst >= 0 {
cfg.NewBurst = 0
Expand All @@ -865,7 +862,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
counter.notify.mu.Lock()
counter.notify.setupNotificationTimer = time.NewTimer(timerDuration)
counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C
counter.notify.setupNotificationThreshold = notifyThreshold
counter.notify.setupNotificationThreshold = 1
counter.notify.mu.Unlock()
counter.lastDeadline = deadline
select {
Expand Down Expand Up @@ -958,7 +955,10 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
}

func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {
value := counter.avgRUPerSec*gc.run.targetPeriod.Seconds() + bufferRUs
// `needTokensAmplification` is used to properly amplify a need. The reason is that in the current implementation,
// the token returned from the server determines the average consumption speed.
// Therefore, when the fillrate of resource group increases, `needTokensAmplification` can enable the client to obtain more tokens.
value := counter.avgRUPerSec * gc.run.targetPeriod.Seconds() * needTokensAmplification
value -= counter.limiter.AvailableTokens(gc.run.now)
if value < 0 {
value = 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
s.etcdClient, s.httpClient, err = etcdutil.CreateClientsWithMultiEndpoint(tlsConfig, []url.URL(u))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls)
s.etcdClient, s.httpClient, err = etcdutil.CreateClientsWithMultiEndpoint(tlsConfig, s.backendUrls)
return err
}

Expand Down
50 changes: 42 additions & 8 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,30 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
}

// CreateClientsWithMultiEndpoint creates etcd v3 client and http client.
func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
client, err := createEtcdClientWithMultiEndpoint(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
httpClient := createHTTPClient(tlsConfig)
return client, httpClient, nil
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) {
client, err := createEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
httpClient := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
},
}
httpClient := createHTTPClient(tlsConfig)
return client, httpClient, nil
}

func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) {
// createEtcdClientWithMultiEndpoint creates etcd v3 client.
// Note: it will be used by micro service server and support multi etcd endpoints.
// FIXME: But it cannot switch etcd endpoints as soon as possible when one of endpoints is with io hang.
func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) {
if len(acUrls) == 0 {
return nil, errs.ErrNewEtcdClient.FastGenByArgs("no available etcd address")
}
Expand Down Expand Up @@ -244,6 +252,32 @@ func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client
return client, err
}

// createEtcdClient creates etcd v3 client.
// Note: it will be used by legacy pd-server, and only connect to leader only.
func createEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{acURL.String()},
DialTimeout: defaultEtcdClientTimeout,
TLS: tlsConfig,
LogConfig: &lgc,
})
if err == nil {
log.Info("create etcd v3 client", zap.String("endpoints", acURL.String()))
}
return client, err
}

func createHTTPClient(tlsConfig *tls.Config) *http.Client {
return &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
},
}
}

// InitClusterID creates a cluster ID for the given key if it hasn't existed.
// This function assumes the cluster ID has already existed and always use a
// cheaper read to retrieve it; if it doesn't exist, invoke the more expensive
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestEtcdClientSync(t *testing.T) {
ep1 := cfg1.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := createEtcdClient(nil, urls)
client1, err := createEtcdClientWithMultiEndpoint(nil, urls)
re.NoError(err)
<-etcd1.Server.ReadyNotify()

Expand Down Expand Up @@ -277,7 +277,7 @@ func checkEtcdWithHangLeader(t *testing.T) error {
// Create a etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := createEtcdClient(nil, urls)
client1, err := createEtcdClientWithMultiEndpoint(nil, urls)
re.NoError(err)

// Add a new member and set the client endpoints to etcd1 and etcd2.
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) {
if err != nil {
return nil, nil, err
}
return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls)
return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0])
}

// AddStartCallback adds a callback in the startServer phase.
Expand Down
14 changes: 12 additions & 2 deletions tests/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
_, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(cas.resourceGroupName, rreq, rres)
controller.OnResponse(cas.resourceGroupName, wreq, wres)
Expand Down Expand Up @@ -706,9 +708,17 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo
rruTokensAtATime: 0,
wruTokensAtATime: 10000,
}
tc2 := tokenConsumptionPerSecond{
rruTokensAtATime: 0,
wruTokensAtATime: 2,
}
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
time.Sleep(time.Second * 2)
beginTime := time.Now()
// This is used to make sure resource group in lowRU.
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest())
}
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
}
Expand Down

0 comments on commit ca074a0

Please sign in to comment.