Skip to content

Commit

Permalink
move isBackground to ResourceGroupKVInterceptor
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jul 13, 2023
1 parent c6edabc commit 4340515
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 82 deletions.
47 changes: 23 additions & 24 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ const (
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
// OnResponse is used to consume tokens after receiving response
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool
}

// ResourceGroupProvider provides some api to interact with resource manager server.
Expand Down Expand Up @@ -456,12 +458,8 @@ func (c *ResourceGroupsController) OnRequestWait(
) (*rmpb.Consumption, *rmpb.Consumption, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, nil, err
}
if gc.isBackgroundRequest(info.RequestSource()) {
return nil, nil, nil
}
return gc.onRequestWait(ctx, info)
}

Expand All @@ -474,12 +472,29 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
if tmp.(*groupCostController).isBackgroundRequest(req.RequestSource()) {
return nil, nil
}
return tmp.(*groupCostController).onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
resourceGroupName, requestResource string) bool {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return false
}

if bg := gc.meta.BackgroundSettings; bg != nil {
if len(requestResource) == 0 || len(bg.JobTypes) == 0 {
return false
}
if idx := strings.LastIndex(requestResource, "_"); idx != -1 {
return slices.Contains(bg.JobTypes, requestResource[idx+1:])
}
}
return false
}

// GetResourceGroup returns the meta setting of the given resource group name.
func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) {
gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName)
Expand Down Expand Up @@ -1052,22 +1067,6 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
return req
}

// If the resource group has background jobs, we should not record consumption and wait for it.
func (gc *groupCostController) isBackgroundRequest(requestResource string) bool {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()

if bg := gc.meta.BackgroundSettings; bg != nil {
if len(requestResource) == 0 || len(bg.JobTypes) == 0 {
return false
}
if idx := strings.LastIndex(requestResource, "_"); idx != -1 {
return slices.Contains(bg.JobTypes, requestResource[idx+1:])
}
}
return false
}

func (gc *groupCostController) getMeta() *rmpb.ResourceGroup {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()
Expand Down
11 changes: 0 additions & 11 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,3 @@ func TestRequestAndResponseConsumption(t *testing.T) {
re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum)
}
}

func TestIsBackgroundRequest(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
re.False(gc.isBackgroundRequest("test"))
re.False(gc.isBackgroundRequest("unknown_default"))
re.True(gc.isBackgroundRequest("internal_lightning"))
re.False(gc.isBackgroundRequest("internal_lightning_default"))
re.False(gc.isBackgroundRequest("external_test_default"))
re.True(gc.isBackgroundRequest("external_unknown_lightning"))
}
3 changes: 1 addition & 2 deletions client/resource_group/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"os"
"time"

"github.com/cloudfoundry/gosigar"
sigar "github.com/cloudfoundry/gosigar"
"go.uber.org/zap"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
Expand All @@ -37,7 +37,6 @@ type RequestInfo interface {
WriteBytes() uint64
ReplicaNumber() int64
StoreID() uint64
RequestSource() string
}

// ResponseInfo is the interface of the response information provider. A response should be
Expand Down
17 changes: 3 additions & 14 deletions client/resource_group/controller/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import "time"

// TestRequestInfo is used to test the request info interface.
type TestRequestInfo struct {
isWrite bool
writeBytes uint64
storeID uint64
requestSource string
isWrite bool
writeBytes uint64
storeID uint64
}

// NewTestRequestInfo creates a new TestRequestInfo.
Expand Down Expand Up @@ -57,16 +56,6 @@ func (tri *TestRequestInfo) ReplicaNumber() int64 {
return 1
}

// RequestSource implements the RequestInfo interface.
func (tri *TestRequestInfo) RequestSource() string {
return tri.requestSource
}

// SetRequestSource set the request source.
func (tri *TestRequestInfo) SetRequestSource(requestSource string) {
tri.requestSource = requestSource
}

// TestResponseInfo is used to test the response info interface.
type TestResponseInfo struct {
readBytes uint64
Expand Down
38 changes: 7 additions & 31 deletions tests/integrations/mcs/resourcemanager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,15 +1044,16 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo
rruTokensAtATime: 0,
wruTokensAtATime: 2,
}
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
resourceName := "modetest"
controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest())
time.Sleep(time.Second * 2)
beginTime := time.Now()
// This is used to make sure resource group in lowRU.
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest())
controller.OnRequestWait(suite.ctx, resourceName, tc2.makeWriteRequest())
}
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest())
}
endTime := time.Now()
// we can not check `inDegradedMode` because of data race.
Expand Down Expand Up @@ -1164,36 +1165,11 @@ func (suite *resourceManagerClientTestSuite) TestSkipConsumptionForBackgroundJob
c.Start(suite.ctx)

resourceGroupName := suite.initGroups[1].Name
req := controller.NewTestRequestInfo(false, 0, 1)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
consumption, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.NotNil(consumption)
re.NotNil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.NotNil(consumption)
re.NoError(err)
re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_default"))

resourceGroupName = "background_job"
// Check background job `br` will not consume tokens.
req.SetRequestSource("br")
consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Nil(consumption)
re.Nil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.Nil(consumption)
re.NoError(err)

// Check background job `lightning` will not consume tokens.
req.SetRequestSource("lightning")
consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Nil(consumption)
re.Nil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.Nil(consumption)
re.NoError(err)
re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_br"))
re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning"))

c.Stop()
}

0 comments on commit 4340515

Please sign in to comment.