Skip to content

Commit

Permalink
Enhance the fallback cache
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jul 12, 2024
1 parent ccb1e4b commit 6703c54
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 27 deletions.
94 changes: 73 additions & 21 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,24 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
gc, ok := c.loadGroupController(group.Name)
if !ok {
continue
}
// If the loaded group is the default resource group but the new group is not,
// it means it's non-existent in this client before but is in-use, so we should
// create a new controller for it to replace the default one.
if gc.isTemporaryDefaultGroupAlias(group.Name) {
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
continue

Check warning on line 370 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L370

Added line #L370 was not covered by tests
}
c.groupsController.Store(group.Name, gc)
resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(0)
resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1)
log.Info("[resource group controller] create resource group cost controller to replace the default one",
zap.String("name", group.Name))
} else {
gc.modifyMeta(group)
}
case meta_storagepb.Event_DELETE:
Expand Down Expand Up @@ -421,52 +437,83 @@ func (c *ResourceGroupsController) Stop() error {
return nil
}

// loadGroupController just wraps the `Load` method of `sync.Map`.
func (c *ResourceGroupsController) loadGroupController(name string) (*groupCostController, bool) {
tmp, ok := c.groupsController.Load(name)
if !ok {
return nil, false
}
return tmp.(*groupCostController), true
}

// loadOrStoreGroupController just wraps the `LoadOrStore` method of `sync.Map`.
func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *groupCostController) (*groupCostController, bool) {
tmp, loaded := c.groupsController.LoadOrStore(name, gc)
return tmp.(*groupCostController), loaded
}

// tryGetResourceGroup will try to get the resource group controller from local cache first,
// if the local cache misses, it will then call gRPC to fetch the resource group info from server.
// if `fallback` is true, it will use the default resource group if the resource group does not exist.
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string, fallback bool) (*groupCostController, error) {
// Get from the local cache first.
if tmp, ok := c.groupsController.Load(name); ok {
return tmp.(*groupCostController), nil
gc, ok := c.loadGroupController(name)
if ok {
return gc, nil
}
// Call gRPC to fetch the resource group info.
group, err := c.provider.GetResourceGroup(ctx, name)
if err != nil {
if err != nil && !strings.Contains(err.Error(), "resource group not found") {
return nil, err
}
if group == nil {
// If the resource group does not exist and it's not the default resource group and fallback is true,
// try again with the default resource group.
if fallback && name != defaultResourceGroupName {
return c.tryGetResourceGroup(ctx, defaultResourceGroupName, false)
gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName, false)
if err != nil {
return nil, err

Check warning on line 475 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L475

Added line #L475 was not covered by tests
}
// To prevent the non-existent resource group from being fetched from the server again,
// store the default resource group controller as its own temporarily.
gc, loaded := c.loadOrStoreGroupController(name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, defaultResourceGroupName).Set(1)
log.Info("[resource group controller] use default resource group cost controller temporarily",
zap.String("name", name))
}
return gc, nil
}
return nil, errors.Errorf("%s does not exists", name)
}
// Check again to prevent initializing the same resource group concurrently.
if tmp, ok := c.groupsController.Load(name); ok {
gc := tmp.(*groupCostController)
if gc, ok = c.loadGroupController(name); ok {
return gc, nil
}
// Initialize the resource group controller.
gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
return nil, err
}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
// Check again to prevent initializing the same resource group concurrently.
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc)
gc, loaded := c.loadOrStoreGroupController(group.Name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
}
return tmp.(*groupCostController), nil
return gc, nil
}

func (c *ResourceGroupsController) cleanUpResourceGroup() {
c.groupsController.Range(func(key, value any) bool {
resourceGroupName := key.(string)
gc := value.(*groupCostController)
// Remove the temporary default resource group controller alias.
if gc.isTemporaryDefaultGroupAlias(resourceGroupName) {
c.groupsController.Delete(resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName)
return true

Check warning on line 515 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L513-L515

Added lines #L513 - L515 were not covered by tests
}
// Check for stale resource groups, which will be deleted when consumption is continuously unchanged.
gc.mu.Lock()
latestConsumption := *gc.mu.consumption
Expand Down Expand Up @@ -505,12 +552,11 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
c.run.inDegradedMode = false
for _, res := range resp {
name := res.GetResourceGroupName()
v, ok := c.groupsController.Load(name)
gc, ok := c.loadGroupController(name)
if !ok {
log.Warn("[resource group controller] a non-existent resource group was found when handle token response", zap.String("name", name))
continue
}
gc := v.(*groupCostController)
gc.handleTokenBucketResponse(res)
}
}
Expand Down Expand Up @@ -568,7 +614,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName, true)
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName, !c.ruConfig.isSingleGroupByKeyspace /* Do not fallback in single keyspace group */)
if err != nil {
return nil, nil, time.Duration(0), 0, err
}
Expand All @@ -579,7 +625,7 @@ func (c *ResourceGroupsController) OnRequestWait(
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
tmp, ok := c.groupsController.Load(resourceGroupName)
gc, ok := c.loadGroupController(resourceGroupName)
if !ok {
// If the resource group does not exist, use the default resource group.
if resourceGroupName != defaultResourceGroupName {
Expand All @@ -588,7 +634,7 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
return tmp.(*groupCostController).onResponse(req, resp)
return gc.onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand Down Expand Up @@ -784,6 +830,8 @@ func newGroupCostController(
gc.mu.consumption = &rmpb.Consumption{}
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption)
gc.mu.globalCounter = &rmpb.Consumption{}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
return gc, nil
}

Expand Down Expand Up @@ -1222,6 +1270,10 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
return req
}

func (gc *groupCostController) isTemporaryDefaultGroupAlias(name string) bool {
return name != defaultResourceGroupName && gc.name == defaultResourceGroupName
}

func (gc *groupCostController) getMeta() *rmpb.ResourceGroup {
gc.metaLock.RLock()
defer gc.metaLock.RUnlock()
Expand Down Expand Up @@ -1369,14 +1421,14 @@ func (gc *groupCostController) onResponse(
return delta, nil
}

// GetActiveResourceGroup is used to get action resource group.
// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
tmp, ok := c.groupsController.Load(resourceGroupName)
gc, ok := c.loadGroupController(resourceGroupName)
if !ok {
return nil
}
return tmp.(*groupCostController).getMeta()
return gc.getMeta()
}

// This is used for test only.
Expand Down
3 changes: 0 additions & 3 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
func TestGroupControlBurstable(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
args := tokenBucketReconfigureArgs{
NewRate: 1000,
NewBurst: -1,
Expand All @@ -74,7 +73,6 @@ func TestGroupControlBurstable(t *testing.T) {
func TestRequestAndResponseConsumption(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
testCases := []struct {
req *TestRequestInfo
resp *TestResponseInfo
Expand Down Expand Up @@ -126,7 +124,6 @@ func TestRequestAndResponseConsumption(t *testing.T) {
func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
req := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
Expand Down
25 changes: 22 additions & 3 deletions tests/integrations/mcs/resourcemanager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
CPUMsCost: 1,
}

controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace())
controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg)
controller.Start(suite.ctx)
defer controller.Stop()

testCases := []struct {
resourceGroupName string
Expand Down Expand Up @@ -463,9 +464,27 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
wreq := tcs.makeWriteRequest()
_, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq)
re.Error(err)
time.Sleep(time.Millisecond * 200)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate"))
controller.Stop()

group, err := controller.GetResourceGroup("fallback-group")
re.Error(err)
re.Nil(group)
// Call `OnRequestWait` once to trigger the fallback group creation.
controller.OnRequestWait(suite.ctx, "fallback-group", tokenConsumptionPerSecond{}.makeReadRequest())
// Check if the temporary controller is created.
group, err = controller.GetResourceGroup("fallback-group")
re.NoError(err)
re.Equal("default", group.GetName())
// Create the `fallback-group` and test again.
rg.Name = "fallback-group"
resp, err = cli.AddResourceGroup(suite.ctx, rg)
re.NoError(err)
re.Contains(resp, "Success!")
// Make sure the resource group is watched by the controller and is used to replace the default group.
testutil.Eventually(re, func() bool {
meta := controller.GetActiveResourceGroup("fallback-group")
return meta.GetName() == "fallback-group"
}, testutil.WithTickInterval(50*time.Millisecond))
}

// TestSwitchBurst is used to test https://github.com/tikv/pd/issues/6209
Expand Down

0 comments on commit 6703c54

Please sign in to comment.