Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rg/controller: use the default resource group if the requested one doesn't exist #8387

Merged
merged 3 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ type ErrClientGetResourceGroup struct {
}

func (e *ErrClientGetResourceGroup) Error() string {
return fmt.Sprintf("get resource group %v failed, %v", e.ResourceGroupName, e.Cause)
return fmt.Sprintf("get resource group %s failed, %s", e.ResourceGroupName, e.Cause)
}
90 changes: 63 additions & 27 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)

const (
defaultResourceGroupName = "default"
controllerConfigPath = "resource_group/controller"
maxNotificationChanLen = 200
needTokensAmplification = 1.1
Expand Down Expand Up @@ -356,22 +357,32 @@
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
if gc, ok := c.loadGroupController(group.Name); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reset the tombstone flag in here if the user deletes it and creates it again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Fixed.

gc.modifyMeta(group)
// If the resource group is marked as tombstone before, set it as active again.
if swapped := gc.tombstone.CompareAndSwap(true, false); swapped {
resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1)
log.Info("[resource group controller] mark resource group as active", zap.String("name", group.Name))
}
}
case meta_storagepb.Event_DELETE:
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
// Do not delete the resource group immediately, just mark it as tombstone.
// For the requests that are still in progress, fallback to the default resource group.
if gc, ok := c.loadGroupController(group.Name); ok {
gc.tombstone.Store(true)
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1)
log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name))
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
log.Info("[resource group controller] previous key-value pair has been compacted",
zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))

Check warning on line 385 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L384-L385

Added lines #L384 - L385 were not covered by tests
}
}
}
Expand Down Expand Up @@ -420,12 +431,32 @@
return nil
}

// loadGroupController just wraps the `Load` method of `sync.Map`.
func (c *ResourceGroupsController) loadGroupController(name string) (*groupCostController, bool) {
tmp, ok := c.groupsController.Load(name)
if !ok {
return nil, false
}
return tmp.(*groupCostController), true
}

// loadOrStoreGroupController just wraps the `LoadOrStore` method of `sync.Map`.
func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *groupCostController) (*groupCostController, bool) {
tmp, loaded := c.groupsController.LoadOrStore(name, gc)
return tmp.(*groupCostController), loaded
}

// tryGetResourceGroup will try to get the resource group controller from local cache first,
// if the local cache misses, it will then call gRPC to fetch the resource group info from server.
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) {
// Get from the local cache first.
if tmp, ok := c.groupsController.Load(name); ok {
return tmp.(*groupCostController), nil
gc, ok := c.loadGroupController(name)
if ok {
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && name != defaultResourceGroupName {
return c.tryGetResourceGroup(ctx, defaultResourceGroupName)
}
return gc, nil
}
// Call gRPC to fetch the resource group info.
group, err := c.provider.GetResourceGroup(ctx, name)
Expand All @@ -436,24 +467,21 @@
return nil, errors.Errorf("%s does not exists", name)
}
// Check again to prevent initializing the same resource group concurrently.
if tmp, ok := c.groupsController.Load(name); ok {
gc := tmp.(*groupCostController)
if gc, ok = c.loadGroupController(name); ok {
return gc, nil
}
// Initialize the resource group controller.
gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
return nil, err
}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
// Check again to prevent initializing the same resource group concurrently.
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc)
gc, loaded := c.loadOrStoreGroupController(group.Name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
}
return tmp.(*groupCostController), nil
return gc, nil
}

func (c *ResourceGroupsController) cleanUpResourceGroup() {
Expand All @@ -465,14 +493,15 @@
latestConsumption := *gc.mu.consumption
gc.mu.Unlock()
if equalRU(latestConsumption, *gc.run.consumption) {
if gc.tombstone {
if gc.inactive || gc.tombstone.Load() {
c.groupsController.Delete(resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName)
return true
}
gc.tombstone = true
gc.inactive = true
} else {
gc.tombstone = false
gc.inactive = false
}
return true
})
Expand All @@ -498,12 +527,11 @@
c.run.inDegradedMode = false
for _, res := range resp {
name := res.GetResourceGroupName()
v, ok := c.groupsController.Load(name)
gc, ok := c.loadGroupController(name)
if !ok {
log.Warn("[resource group controller] a non-existent resource group was found when handle token response", zap.String("name", name))
continue
}
gc := v.(*groupCostController)
gc.handleTokenBucketResponse(res)
}
}
Expand Down Expand Up @@ -572,12 +600,16 @@
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
tmp, ok := c.groupsController.Load(resourceGroupName)
gc, ok := c.loadGroupController(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
Comment on lines 605 to 606
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to advance the log and remove return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better not to do so since that may cause a lot of logs during the workload.

}
return tmp.(*groupCostController).onResponse(req, resp)
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && resourceGroupName != defaultResourceGroupName {
return c.OnResponse(defaultResourceGroupName, req, resp)
}
return gc.onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand All @@ -594,8 +626,7 @@
func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool {
// fallback to default resource group.
if bg == nil {
resourceGroupName := "default"
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName)
if err != nil {
return false
}
Expand Down Expand Up @@ -681,7 +712,10 @@
requestUnitTokens map[rmpb.RequestUnitType]*tokenCounter
}

tombstone bool
// tombstone is set to true when the resource group is deleted.
tombstone atomic.Bool
// inactive is set to true when the resource group has not been updated for a long time.
inactive bool
}

type groupMetricsCollection struct {
Expand Down Expand Up @@ -774,6 +808,8 @@
gc.mu.consumption = &rmpb.Consumption{}
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption)
gc.mu.globalCounter = &rmpb.Consumption{}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
return gc, nil
}

Expand Down Expand Up @@ -1359,14 +1395,14 @@
return delta, nil
}

// GetActiveResourceGroup is used to get action resource group.
// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
gc, ok := c.loadGroupController(resourceGroupName)
if !ok || gc.tombstone.Load() {
return nil
}
return tmp.(*groupCostController).getMeta()
return gc.getMeta()
}

// This is used for test only.
Expand Down
83 changes: 67 additions & 16 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
func TestGroupControlBurstable(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
args := tokenBucketReconfigureArgs{
NewRate: 1000,
NewBurst: -1,
Expand All @@ -74,7 +73,6 @@ func TestGroupControlBurstable(t *testing.T) {
func TestRequestAndResponseConsumption(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
testCases := []struct {
req *TestRequestInfo
resp *TestResponseInfo
Expand Down Expand Up @@ -126,7 +124,6 @@ func TestRequestAndResponseConsumption(t *testing.T) {
func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
req := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
Expand All @@ -142,6 +139,14 @@ type MockResourceGroupProvider struct {
mock.Mock
}

func newMockResourceGroupProvider() *MockResourceGroupProvider {
mockProvider := &MockResourceGroupProvider{}
mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil)
mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil)
return mockProvider
}

func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) {
args := m.Called(ctx, resourceGroupName, opts)
return args.Get(0).(*rmpb.ResourceGroup), args.Error(1)
Expand Down Expand Up @@ -191,28 +196,22 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockProvider := new(MockResourceGroupProvider)

mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
// LoadResourceGroups
mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil)
// Watch
mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default")))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", defaultResourceGroupName)))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport")
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group")))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport")

mockProvider := newMockResourceGroupProvider()
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
controller.Start(ctx)

defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)

c1, err := controller.tryGetResourceGroup(ctx, "default")
c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
re.NoError(err)
re.Equal(defaultResourceGroup, c1.meta)

Expand All @@ -226,11 +225,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
request := args.Get(1).(*rmpb.TokenBucketsRequest)
var responses []*rmpb.TokenBucketResponse
for _, req := range request.Requests {
if req.ResourceGroupName == "default" {
if req.ResourceGroupName == defaultResourceGroupName {
// no response the default group request, that's mean `len(c.run.currentRequests) != 0` always.
time.Sleep(100 * time.Second)
responses = append(responses, &rmpb.TokenBucketResponse{
ResourceGroupName: "default",
ResourceGroupName: defaultResourceGroupName,
GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{
{
GrantedTokens: &rmpb.TokenBucket{
Expand Down Expand Up @@ -271,3 +270,55 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
re.Fail("timeout")
}
}

func TestGetController(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockProvider := newMockResourceGroupProvider()
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
controller.Start(ctx)

defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil)

c, err := controller.GetResourceGroup("test-group-non-existent")
re.Error(err)
re.Nil(c)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
c, err = controller.GetResourceGroup("test-group")
re.NoError(err)
re.Equal(testResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{})
re.NoError(err)
_, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{})
re.NoError(err)
// Mark the tombstone manually to test the fallback case.
gc, err := controller.tryGetResourceGroup(ctx, "test-group")
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup("test-group")
re.NoError(err)
re.Equal(defaultResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{})
re.NoError(err)
_, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{})
re.NoError(err)
// Mark the default group tombstone manually to test the fallback case.
gc, err = controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
_, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, &TestRequestInfo{})
re.NoError(err)
_, err = controller.OnResponse(defaultResourceGroupName, &TestRequestInfo{}, &TestResponseInfo{})
re.NoError(err)
}
Loading