Skip to content

Commit

Permalink
resource_control: supports dynamically change the controller config
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Sep 4, 2023
1 parent 91648e5 commit bea0900
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 31 deletions.
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/client
go 1.21

require (
github.com/BurntSushi/toml v0.3.1
github.com/cloudfoundry/gosigar v1.3.6
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
Expand Down
1 change: 1 addition & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
14 changes: 10 additions & 4 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
// According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15.
defaultTargetPeriod = 5 * time.Second
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
defaultMaxWaitDuration = time.Second
defaultMaxWaitDuration = 30 * time.Second
)

const (
Expand Down Expand Up @@ -82,6 +82,9 @@ type Config struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
Expand All @@ -91,6 +94,7 @@ type Config struct {
func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: defaultDegradedModeWaitDuration,
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
}
}
Expand Down Expand Up @@ -143,8 +147,10 @@ type RUConfig struct {
WriteBytesCost RequestUnit
CPUMsCost RequestUnit
// The CPU statistics need to distinguish between different environments.
isSingleGroupByKeyspace bool
maxWaitDuration time.Duration
isSingleGroupByKeyspace bool

// some config for client
LTBMaxWaitDuration time.Duration
DegradedModeWaitDuration time.Duration
}

Expand All @@ -165,7 +171,7 @@ func GenerateRUConfig(config *Config) *RUConfig {
WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
maxWaitDuration: defaultMaxWaitDuration,
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
}
duration, err := time.ParseDuration(config.DegradedModeWaitDuration)
if err != nil {
Expand Down
100 changes: 75 additions & 25 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func EnableSingleGroupByKeyspace() ResourceControlCreateOption {
// WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets.
func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
return func(controller *ResourceGroupsController) {
controller.ruConfig.maxWaitDuration = d
controller.ruConfig.LTBMaxWaitDuration = d

Check warning on line 92 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L92

Added line #L92 was not covered by tests
}
}

Expand Down Expand Up @@ -122,6 +122,8 @@ type ResourceGroupsController struct {
// Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`.
currentRequests []*rmpb.TokenBucketRequest
}

opts []ResourceControlCreateOption
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
Expand All @@ -148,6 +150,7 @@ func NewResourceGroupController(
lowTokenNotifyChan: make(chan struct{}, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
opts: opts,
}
for _, opt := range opts {
opt(controller)
Expand Down Expand Up @@ -213,22 +216,61 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
})

_, revision, err := c.provider.LoadResourceGroups(ctx)
_, metaRevision, err := c.provider.LoadResourceGroups(ctx)
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))

Check warning on line 221 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L221

Added line #L221 was not covered by tests
}
_, cfgRevision, err := c.provider.LoadGlobalConfig(ctx, nil, controllerConfigPath)
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
var watchChannel chan []*meta_storagepb.Event
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))

Check warning on line 231 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L231

Added line #L231 was not covered by tests
}
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
if err == nil || c.ruConfig.isSingleGroupByKeyspace {
watchRetryTimer.Stop()
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))

Check warning on line 236 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L236

Added line #L236 was not covered by tests
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
defer watchRetryTimer.Stop()

for {
select {
/* tickers */
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)

Check warning on line 257 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L256-L257

Added lines #L256 - L257 were not covered by tests
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})

Check warning on line 260 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L259-L260

Added lines #L259 - L260 were not covered by tests
}
}
if watchConfigChannel == nil {
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)

Check warning on line 267 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L264-L267

Added lines #L264 - L267 were not covered by tests
}
}

case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
/* channels */
case <-c.loopCtx.Done():
resourceGroupStatusGauge.Reset()
return
Expand All @@ -242,14 +284,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.handleTokenBucketResponse(resp)
}
c.run.currentRequests = nil
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
Expand All @@ -259,24 +293,22 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
case resp, ok := <-watchChannel:
case resp, ok := <-watchMetaChannel:
failpoint.Inject("disableWatch", func() {
if c.ruConfig.isSingleGroupByKeyspace {
panic("disableWatch")
}
})
if !ok {
watchChannel = nil
watchMetaChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
revision = item.Kv.ModRevision
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
Expand All @@ -293,14 +325,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
}
}
case <-watchRetryTimer.C:
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
if err != nil {
case resp, ok := <-watchConfigChannel:
if !ok {
watchConfigChannel = nil

Check warning on line 330 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L328-L330

Added lines #L328 - L330 were not covered by tests
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue

Check warning on line 335 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L335

Added line #L335 was not covered by tests
}
for _, item := range resp {
cfgRevision = item.Kv.ModRevision
if !strings.HasPrefix(string(item.Kv.Key), controllerConfigPath) {
continue

Check warning on line 340 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L338-L340

Added lines #L338 - L340 were not covered by tests
}
config := &Config{}
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
continue

Check warning on line 344 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L342-L344

Added lines #L342 - L344 were not covered by tests
}
c.ruConfig = GenerateRUConfig(config)

Check warning on line 346 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L346

Added line #L346 was not covered by tests
// Stay compatible with serverless
for _, opt := range c.opts {
opt(c)

Check warning on line 349 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L349

Added line #L349 was not covered by tests
}
log.Info("load resource controller config after config changed", zap.Reflect("config", config))

Check warning on line 351 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L351

Added line #L351 was not covered by tests
}

case gc := <-c.tokenBucketUpdateChan:
now := gc.run.now
go gc.handleTokenBucketUpdateEvent(c.loopCtx, now)
Expand Down Expand Up @@ -1127,7 +1177,7 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))

Check warning on line 1180 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L1180

Added line #L1180 was not covered by tests
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand All @@ -1137,7 +1187,7 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand Down
68 changes: 68 additions & 0 deletions client/resource_group/controller/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,g
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"fmt"
"strconv"
"time"

"github.com/pingcap/errors"
)

// Duration is a wrapper of time.Duration for TOML and JSON.
type Duration struct {
time.Duration
}

// NewDuration creates a Duration from time.Duration.
func NewDuration(duration time.Duration) Duration {
return Duration{Duration: duration}
}

// MarshalJSON returns the duration as a JSON string.
func (d *Duration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d.String())), nil
}

// UnmarshalJSON parses a JSON string into the duration.
func (d *Duration) UnmarshalJSON(text []byte) error {
s, err := strconv.Unquote(string(text))
if err != nil {
return errors.WithStack(err)

Check warning on line 48 in client/resource_group/controller/util.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/util.go#L48

Added line #L48 was not covered by tests
}
duration, err := time.ParseDuration(s)
if err != nil {
return errors.WithStack(err)

Check warning on line 52 in client/resource_group/controller/util.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/util.go#L52

Added line #L52 was not covered by tests
}
d.Duration = duration
return nil
}

// UnmarshalText parses a TOML string into the duration.
func (d *Duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return errors.WithStack(err)

Check warning on line 62 in client/resource_group/controller/util.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/util.go#L60-L62

Added lines #L60 - L62 were not covered by tests
}

// MarshalText returns the duration as a JSON string.
func (d Duration) MarshalText() ([]byte, error) {
return []byte(d.String()), nil

Check warning on line 67 in client/resource_group/controller/util.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/util.go#L67

Added line #L67 was not covered by tests
}
51 changes: 51 additions & 0 deletions client/resource_group/controller/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"encoding/json"
"testing"

"github.com/BurntSushi/toml"
"github.com/stretchr/testify/require"
)

type example struct {
Interval Duration `json:"interval" toml:"interval"`
}

func TestDurationJSON(t *testing.T) {
t.Parallel()
re := require.New(t)
example := &example{}

text := []byte(`{"interval":"1h1m1s"}`)
re.NoError(json.Unmarshal(text, example))
re.Equal(float64(60*60+60+1), example.Interval.Seconds())

b, err := json.Marshal(example)
re.NoError(err)
re.Equal(string(text), string(b))
}

func TestDurationTOML(t *testing.T) {
t.Parallel()
re := require.New(t)
example := &example{}

text := []byte(`interval = "1h1m1s"`)
re.Nil(toml.Unmarshal(text, example))
re.Equal(float64(60*60+60+1), example.Interval.Seconds())
}
4 changes: 4 additions & 0 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
controllerPathPrefix = "resource_group/controller"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
// errNotLeader is returned when the requested server is not pd leader.
Expand All @@ -43,6 +44,9 @@ const (
// GroupSettingsPathPrefixBytes is used to watch or get resource groups.
var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix)

// ControllerPathPrefixBytes is used to watch or get controller config.
var ControllerPathPrefixBytes = []byte(controllerPathPrefix)

// ResourceManagerClient manages resource group info and token request.
type ResourceManagerClient interface {
ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error)
Expand Down
Loading

0 comments on commit bea0900

Please sign in to comment.