Skip to content

Commit

Permalink
client, resource_manager: resource manager client (#5810)
Browse files Browse the repository at this point in the history
* impl token request

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* impl resouce manager client

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* impl token request and fix update bug

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* add test

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

impl token request and fix update bug

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* address comment

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* address comment

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* address comment

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* merge master

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* address comment

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* address comment

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* address comment

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* address comment

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* address comment

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* merge master

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

merge master

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

* add test

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB and ti-chi-bot authored Jan 11, 2023
1 parent a1317a7 commit 59a6203
Show file tree
Hide file tree
Showing 10 changed files with 1,275 additions and 36 deletions.
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ type Client interface {

// KeyspaceClient manages keyspace metadata.
KeyspaceClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
// Close closes the client.
Close()
}
Expand Down Expand Up @@ -390,6 +392,8 @@ type client struct {
// dc-location -> *lastTSO
lastTSMap sync.Map // Same as map[string]*lastTSO

tokenDispatcher *tokenDispatcher

// For internal usage.
checkTSDeadlineCh chan struct{}
leaderNetworkFailure int32
Expand Down Expand Up @@ -417,6 +421,7 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi
}
// Start the daemons.
c.updateTSODispatcher()
c.createTokenispatcher()
c.wg.Add(3)
go c.tsLoop()
go c.tsCancelLoop()
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172 h1:FYgKV9znRQmzVrrJDZ0gUfMIvKLAMU1tu1UKJib8bEQ=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1 h1:jw4NjEiCleRJPPpHM7K6l8OKzOjnZAj62eKteCAY6ro=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
278 changes: 278 additions & 0 deletions client/resourcemanager_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type actionType int

const (
add actionType = 0
modify actionType = 1
)

// ResourceManagerClient manages resource group info and token request.
type ResourceManagerClient interface {
ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error)
GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error)
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
}

// resourceManagerClient gets the ResourceManager client of current PD leader.
func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return rmpb.NewResourceManagerClient(cc.(*grpc.ClientConn))
}
return nil
}

// ListResourceGroups loads and returns all metadata of resource groups.
func (c *client) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
req := &rmpb.ListResourceGroupsRequest{}
resp, err := c.resourceManagerClient().ListResourceGroups(ctx, req)
if err != nil {
return nil, err
}
resErr := resp.GetError()
if resErr != nil {
return nil, errors.Errorf("[resource_manager]" + resErr.Message)
}
return resp.GetGroups(), nil
}

func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) {
req := &rmpb.GetResourceGroupRequest{
ResourceGroupName: resourceGroupName,
}
resp, err := c.resourceManagerClient().GetResourceGroup(ctx, req)
if err != nil {
return nil, err
}
resErr := resp.GetError()
if resErr != nil {
return nil, errors.Errorf("[resource_manager]" + resErr.Message)
}
return resp.GetGroup(), nil
}

func (c *client) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
return c.putResourceGroup(ctx, metaGroup, add)
}

func (c *client) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
return c.putResourceGroup(ctx, metaGroup, modify)
}

func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup, typ actionType) (str string, err error) {
req := &rmpb.PutResourceGroupRequest{
Group: metaGroup,
}
var resp *rmpb.PutResourceGroupResponse
switch typ {
case add:
resp, err = c.resourceManagerClient().AddResourceGroup(ctx, req)
case modify:
resp, err = c.resourceManagerClient().ModifyResourceGroup(ctx, req)
}
if err != nil {
return str, err
}
resErr := resp.GetError()
if resErr != nil {
return str, errors.Errorf("[resource_manager]" + resErr.Message)
}
str = resp.GetBody()
return
}

func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
req := &rmpb.DeleteResourceGroupRequest{
ResourceGroupName: resourceGroupName,
}
resp, err := c.resourceManagerClient().DeleteResourceGroup(ctx, req)
if err != nil {
return "", err
}
resErr := resp.GetError()
if resErr != nil {
return "", errors.Errorf("[resource_manager]" + resErr.Message)
}
return resp.GetBody(), nil
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
requestCtx: ctx,
clientCtx: c.ctx,
Requeset: request,
}
c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req
grantedTokens, err := req.Wait()
if err != nil {
return nil, err
}
return grantedTokens, err
}

type tokenRequest struct {
clientCtx context.Context
requestCtx context.Context
done chan error
Requeset *rmpb.TokenBucketsRequest
TokenBuckets []*rmpb.TokenBucketResponse
}

func (req *tokenRequest) Wait() (tokenBuckets []*rmpb.TokenBucketResponse, err error) {
select {
case err = <-req.done:
err = errors.WithStack(err)
if err != nil {
return nil, err
}
tokenBuckets = req.TokenBuckets
return
case <-req.requestCtx.Done():
return nil, errors.WithStack(req.requestCtx.Err())
case <-req.clientCtx.Done():
return nil, errors.WithStack(req.clientCtx.Err())
}
}

type tokenBatchController struct {
tokenRequestCh chan *tokenRequest
}

func newTokenBatchController(tokenRequestCh chan *tokenRequest) *tokenBatchController {
return &tokenBatchController{
tokenRequestCh: tokenRequestCh,
}
}

type tokenDispatcher struct {
dispatcherCancel context.CancelFunc
tokenBatchController *tokenBatchController
}

type resourceManagerConnectionContext struct {
stream rmpb.ResourceManager_AcquireTokenBucketsClient
ctx context.Context
cancel context.CancelFunc
}

func (c *client) createTokenispatcher() {
dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx)
dispatcher := &tokenDispatcher{
dispatcherCancel: dispatcherCancel,
tokenBatchController: newTokenBatchController(
make(chan *tokenRequest, 1)),
}
go c.handleResourceTokenDispatcher(dispatcherCtx, dispatcher.tokenBatchController)
c.tokenDispatcher = dispatcher
}

func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tbc *tokenBatchController) {
var connection resourceManagerConnectionContext
if err := c.tryResourceManagerConnect(dispatcherCtx, &connection); err != nil {
log.Warn("get stream error", zap.Error(err))
}

for {
var firstRequest *tokenRequest
select {
case <-dispatcherCtx.Done():
return
case firstRequest = <-tbc.tokenRequestCh:
}
stream, streamCtx, cancel := connection.stream, connection.ctx, connection.cancel
if stream == nil {
c.tryResourceManagerConnect(dispatcherCtx, &connection)
firstRequest.done <- errors.Errorf("no stream")
continue
}
select {
case <-streamCtx.Done():
log.Info("[resource_manager] resource manager stream is canceled")
cancel()
connection.stream = nil
continue
default:
}
err := c.processTokenRequests(stream, firstRequest)
if err != nil {
log.Info("processTokenRequests error", zap.Error(err))
cancel()
connection.stream = nil
}
}
}

func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error {
req := t.Requeset
if err := stream.Send(req); err != nil {
err = errors.WithStack(err)
t.done <- err
return err
}
resp, err := stream.Recv()
if err != nil {
err = errors.WithStack(err)
t.done <- err
return err
}
if resp.GetError() != nil {
return errors.Errorf("[resource_manager]" + resp.GetError().Message)
}
tokenBuckets := resp.GetResponses()
t.TokenBuckets = tokenBuckets
t.done <- nil
return nil
}

func (c *client) tryResourceManagerConnect(ctx context.Context, connection *resourceManagerConnectionContext) error {
var (
err error
stream rmpb.ResourceManager_AcquireTokenBucketsClient
)
for i := 0; i < maxRetryTimes; i++ {
cctx, cancel := context.WithCancel(ctx)
stream, err = c.resourceManagerClient().AcquireTokenBuckets(cctx)
if err == nil && stream != nil {
connection.cancel = cancel
connection.ctx = cctx
connection.stream = stream
return nil
}
cancel()
select {
case <-ctx.Done():
return err
case <-time.After(retryInterval):
}
}
return err
}
1 change: 0 additions & 1 deletion pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
if neededTokens <= 0 {
return &res, 0
}

// If the current tokens can directly meet the requirement, returns the need token
if t.Tokens >= neededTokens {
t.Tokens -= neededTokens
Expand Down
1 change: 0 additions & 1 deletion tests/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1 h1:jw4NjEiCleRJPPpHM7K6l8OKzOjnZAj62eKteCAY6ro=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
Loading

0 comments on commit 59a6203

Please sign in to comment.