Skip to content

Commit

Permalink
Support resource group watch (#5830)
Browse files Browse the repository at this point in the history
close #5794

Support resource group watch

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HuSharp and ti-chi-bot authored Jan 19, 2023
1 parent 7ca1b9a commit c5bf5be
Show file tree
Hide file tree
Showing 15 changed files with 471 additions and 64 deletions.
61 changes: 56 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1821,18 +1821,69 @@ func trimHTTPPrefix(str string) string {
}

func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) {
// TODO: complete this function with new implementation.
return nil, 0, nil
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{ConfigPath: configPath})
if err != nil {
return nil, 0, err
}

res := make([]GlobalConfigItem, len(resp.GetItems()))
for i, item := range resp.GetItems() {
cfg := GlobalConfigItem{Name: item.GetName()}
cfg.Value = item.GetValue()
res[i] = cfg
}
return res, resp.GetRevision(), nil
}

func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error {
// TODO: complete this function with new implementation.
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType}
}
_, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath})
if err != nil {
return err
}
return nil
}

func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) {
// TODO: complete this function with new implementation.
return nil, nil
// TODO: Add retry mechanism
// register watch components there
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{
ConfigPath: configPath,
Revision: revision,
})
if err != nil {
close(globalConfigWatcherCh)
return nil, err
}
go func() {
defer func() {
close(globalConfigWatcherCh)
if r := recover(); r != nil {
log.Error("[pd] panic in client `WatchGlobalConfig`", zap.Any("error", r))
return
}
}()
for {
m, err := res.Recv()
if err != nil {
return
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue()}
}
select {
case <-ctx.Done():
return
case globalConfigWatcherCh <- arr:
}
}
}()
return globalConfigWatcherCh, err
}

func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
Expand Down
3 changes: 2 additions & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ module github.com/tikv/pd/client
go 1.16

require (
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc=
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
60 changes: 55 additions & 5 deletions client/resourcemanager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
Expand All @@ -28,8 +30,9 @@ import (
type actionType int

const (
add actionType = 0
modify actionType = 1
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
)

// ResourceManagerClient manages resource group info and token request.
Expand All @@ -39,6 +42,7 @@ type ResourceManagerClient interface {
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
}

Expand Down Expand Up @@ -124,12 +128,58 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri
return resp.GetBody(), nil
}

// WatchResourceGroup [just for TEST] watches resource groups changes.
// It returns a stream of slices of resource groups.
// The first message in stream contains all current resource groups,
// all subsequent messages contains new events[PUT/DELETE] for all resource groups.
func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {
configChan, err := c.WatchGlobalConfig(ctx, groupSettingsPathPrefix, revision)
if err != nil {
return nil, err
}
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup)
go func() {
defer func() {
close(resourceGroupWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
case res, ok := <-configChan:
if !ok {
return
}
groups := make([]*rmpb.ResourceGroup, 0, len(res))
for _, item := range res {
switch item.EventType {
case pdpb.EventType_PUT:
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(item.Value), group); err != nil {
return
}
groups = append(groups, group)
case pdpb.EventType_DELETE:
continue
}
}
resourceGroupWatcherChan <- groups
}
}
}()
return resourceGroupWatcherChan, err
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
requestCtx: ctx,
clientCtx: c.ctx,
Requeset: request,
Request: request,
}
c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req
grantedTokens, err := req.Wait()
Expand All @@ -143,7 +193,7 @@ type tokenRequest struct {
clientCtx context.Context
requestCtx context.Context
done chan error
Requeset *rmpb.TokenBucketsRequest
Request *rmpb.TokenBucketsRequest
TokenBuckets []*rmpb.TokenBucketResponse
}

Expand Down Expand Up @@ -232,7 +282,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb
}

func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error {
req := t.Requeset
req := t.Request
if err := stream.Send(req); err != nil {
err = errors.WithStack(err)
t.done <- err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc=
github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
Expand Down
75 changes: 39 additions & 36 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ const (
// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second

// global config
globalConfigPath = "/global/config/"
)

// gRPC errors
Expand Down Expand Up @@ -1888,9 +1885,13 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
value := item.GetValue()
ops[i] = clientv3.OpPut(name, value)
name := item.GetName()
switch item.GetKind() {
case pdpb.EventType_PUT:
ops[i] = clientv3.OpPut(s.GetFinalPathWithinPD(request.GetConfigPath()+name), item.GetValue())
case pdpb.EventType_DELETE:
ops[i] = clientv3.OpDelete(s.GetFinalPathWithinPD(request.GetConfigPath() + name))
}
}
res, err :=
kv.NewSlowLogTxn(s.client).Then(ops...).Commit()
Expand All @@ -1900,61 +1901,63 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo
if !res.Succeeded {
return &pdpb.StoreGlobalConfigResponse{}, errors.Errorf("failed to execute StoreGlobalConfig transaction")
}
return &pdpb.StoreGlobalConfigResponse{}, err
return &pdpb.StoreGlobalConfigResponse{}, nil
}

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
// TODO: complete this function with new implementation
return &pdpb.LoadGlobalConfigResponse{}, nil
configPath := request.GetConfigPath()
r, err := s.client.Get(ctx, s.GetFinalPathWithinPD(configPath), clientv3.WithPrefix())
if err != nil {
return &pdpb.LoadGlobalConfigResponse{}, err
}
res := make([]*pdpb.GlobalConfigItem, len(r.Kvs))
for i, value := range r.Kvs {
res[i] = &pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: string(value.Key), Value: string(value.Value)}
}
return &pdpb.LoadGlobalConfigResponse{Items: res, Revision: r.Header.GetRevision()}, nil
}

// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
// or stopped by whatever reason, just reconnect to it.
// Watch on revision which greater than or equal to the required revision.
func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
if err != nil {
return err
}
watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix())
revision := req.GetRevision()
// If the revision is compacted, will meet required revision has been compacted error.
// - If required revision < CompactRevision, we need to reload all configs to avoid losing data.
// - If required revision >= CompactRevision, just keep watching.
watchChan := s.client.Watch(ctx, s.GetFinalPathWithinPD(req.GetConfigPath()), clientv3.WithPrefix(), clientv3.WithRev(revision))
for {
select {
case <-ctx.Done():
return nil
case res := <-watchChan:
if revision < res.CompactRevision {
if err := server.Send(&pdpb.WatchGlobalConfigResponse{
Revision: res.CompactRevision,
Header: s.wrapErrorToHeader(pdpb.ErrorType_DATA_COMPACTED,
fmt.Sprintf("required watch revision: %d is smaller than current compact/min revision. %d", revision, res.CompactRevision)),
}); err != nil {
return err
}
}
revision = res.Header.GetRevision()

cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
for _, e := range res.Events {
if e.Type != clientv3.EventTypePut {
continue
}
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)})
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value), Kind: pdpb.EventType(e.Type)})
}
if len(cfgs) > 0 {
err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs})
if err != nil {
if err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs, Revision: res.Header.GetRevision()}); err != nil {
return err
}
}
}
}
}

func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error {
configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix())
if err != nil {
return err
}
ls := make([]*pdpb.GlobalConfigItem, configList.Count)
for i, kv := range configList.Kvs {
ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)}
}
err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls})
return err
}

// Evict the leaders when the store is damaged. Damaged regions are emergency errors
// and requires user to manually remove the `evict-leader-scheduler` with pd-ctl
func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) {
Expand Down
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,11 @@ func (s *Server) GetHTTPClient() *http.Client {
return s.httpClient
}

// GetFinalPathWithinPD returns the etcd path.
func (s *Server) GetFinalPathWithinPD(configPath string) string {
return strings.Join([]string{s.rootPath, configPath}, "/")
}

// GetLeader returns the leader of PD cluster(i.e the PD leader).
func (s *Server) GetLeader() *pdpb.Member {
return s.member.GetLeader()
Expand Down
Loading

0 comments on commit c5bf5be

Please sign in to comment.