Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support resource group watch #5830

Merged
merged 12 commits into from
Jan 19, 2023
61 changes: 56 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1821,18 +1821,69 @@ func trimHTTPPrefix(str string) string {
}

func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) {
// TODO: complete this function with new implementation.
return nil, 0, nil
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{ConfigPath: configPath})
if err != nil {
return nil, 0, err
}

res := make([]GlobalConfigItem, len(resp.GetItems()))
for i, item := range resp.GetItems() {
cfg := GlobalConfigItem{Name: item.GetName()}
cfg.Value = item.GetValue()
res[i] = cfg
}
return res, resp.GetRevision(), nil
}

func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error {
// TODO: complete this function with new implementation.
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType}
}
_, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath})
if err != nil {
return err
}
return nil
}

func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) {
// TODO: complete this function with new implementation.
return nil, nil
// register watch components
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return the max revision of each []GlobalConfigItem so the caller can do incremental retry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since resource client is just for testing, I will add a retry mechanism after I solve pd client/tidb/CDC compatibility...

res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{
ConfigPath: configPath,
Revision: revision,
})
if err != nil {
close(globalConfigWatcherCh)
return nil, err
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Error("[pd] panic in client `WatchGlobalConfig`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
close(globalConfigWatcherCh)
return
default:
m, err := res.Recv()
if err != nil {
return
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue()}
}
globalConfigWatcherCh <- arr
}
}
}()
return globalConfigWatcherCh, err
}

func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
Expand Down
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/client
go 1.16

require (
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
Expand Down
58 changes: 53 additions & 5 deletions client/resourcemanager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
Expand All @@ -28,8 +30,9 @@ import (
type actionType int

const (
add actionType = 0
modify actionType = 1
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
)

// ResourceManagerClient manages resource group info and token request.
Expand All @@ -39,6 +42,7 @@ type ResourceManagerClient interface {
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
}

Expand Down Expand Up @@ -124,12 +128,56 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri
return resp.GetBody(), nil
}

// WatchResourceGroup [just for TEST] watches resource groups changes.
// It returns a stream of slices of resource groups.
// The first message in stream contains all current resource groups,
// all subsequent messages contains new events[PUT/DELETE] for all resource groups.
func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {
configChan, err := c.WatchGlobalConfig(ctx, groupSettingsPathPrefix, revision)
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup)
go func() {
defer func() {
if r := recover(); r != nil {
log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
close(resourceGroupWatcherChan)
return
case res, ok := <-configChan:
if !ok {
close(resourceGroupWatcherChan)
return
}
groups := make([]*rmpb.ResourceGroup, 0, len(res))
for _, item := range res {
switch item.EventType {
case pdpb.EventType_PUT:
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(item.Value), group); err != nil {
return
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
}
groups = append(groups, group)
case pdpb.EventType_DELETE:
continue
}
}
resourceGroupWatcherChan <- groups
}
}
}()
return resourceGroupWatcherChan, err
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
requestCtx: ctx,
clientCtx: c.ctx,
Requeset: request,
Request: request,
}
c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req
grantedTokens, err := req.Wait()
Expand All @@ -143,7 +191,7 @@ type tokenRequest struct {
clientCtx context.Context
requestCtx context.Context
done chan error
Requeset *rmpb.TokenBucketsRequest
Request *rmpb.TokenBucketsRequest
TokenBuckets []*rmpb.TokenBucketResponse
}

Expand Down Expand Up @@ -232,7 +280,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb
}

func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error {
req := t.Requeset
req := t.Request
if err := stream.Send(req); err != nil {
err = errors.WithStack(err)
t.done <- err
Expand Down
77 changes: 41 additions & 36 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ const (
// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second

// global config
globalConfigPath = "/global/config/"
)

// gRPC errors
Expand Down Expand Up @@ -1888,9 +1885,13 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
value := item.GetValue()
ops[i] = clientv3.OpPut(name, value)
name := item.GetName()
switch item.GetKind() {
case pdpb.EventType_PUT:
ops[i] = clientv3.OpPut(s.GetFinalPathWithinPD(request.GetConfigPath()+name), item.GetValue())
case pdpb.EventType_DELETE:
ops[i] = clientv3.OpDelete(s.GetFinalPathWithinPD(request.GetConfigPath() + name))
}
}
res, err :=
kv.NewSlowLogTxn(s.client).Then(ops...).Commit()
Expand All @@ -1900,61 +1901,65 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo
if !res.Succeeded {
return &pdpb.StoreGlobalConfigResponse{}, errors.Errorf("failed to execute StoreGlobalConfig transaction")
}
return &pdpb.StoreGlobalConfigResponse{}, err
return &pdpb.StoreGlobalConfigResponse{}, nil
}

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
// TODO: complete this function with new implementation
return &pdpb.LoadGlobalConfigResponse{}, nil
configPath := request.GetConfigPath()
r, err := s.client.Get(ctx, s.GetFinalPathWithinPD(configPath), clientv3.WithPrefix())
if err != nil {
return &pdpb.LoadGlobalConfigResponse{}, err
}
res := make([]*pdpb.GlobalConfigItem, len(r.Kvs))
for i, value := range r.Kvs {
res[i] = &pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: string(value.Key), Value: string(value.Value)}
}
return &pdpb.LoadGlobalConfigResponse{Items: res, Revision: r.Header.GetRevision()}, nil
}

// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
// or stopped by whatever reason, just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
if err != nil {
return err
}
watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix())
revision := req.GetRevision()
// If the revision is compacted, will meet required revision has been compacted error.
// - If required revision < CompactRevision, we need to reload all configs to avoid losing data.
// - If required revision >= CompactRevision, just keep watching.
watchChan := s.client.Watch(ctx, s.GetFinalPathWithinPD(req.GetConfigPath()), clientv3.WithPrefix(), clientv3.WithRev(revision))
for {
select {
case <-ctx.Done():
return nil
case res := <-watchChan:
// update revision by every resp
if revision > res.Header.GetRevision() {
revision = res.Header.GetRevision()
}
if res.CompactRevision != 0 && revision < res.CompactRevision {
if err := server.Send(&pdpb.WatchGlobalConfigResponse{
Revision: res.CompactRevision,
Header: s.wrapErrorToHeader(pdpb.ErrorType_DATA_COMPACTED,
fmt.Sprintf("required watch revision: %d is smaller than current compact/min revision. %d", revision, res.CompactRevision)),
}); err != nil {
return err
}
}
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
HuSharp marked this conversation as resolved.
Show resolved Hide resolved

cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
for _, e := range res.Events {
if e.Type != clientv3.EventTypePut {
continue
}
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)})
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value), Kind: pdpb.EventType(e.Type)})
}
if len(cfgs) > 0 {
err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs})
if err != nil {
if err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs, Revision: res.Header.GetRevision()}); err != nil {
return err
}
}
}
}
}

func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error {
configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix())
if err != nil {
return err
}
ls := make([]*pdpb.GlobalConfigItem, configList.Count)
for i, kv := range configList.Kvs {
ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)}
}
err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls})
return err
}

// Evict the leaders when the store is damaged. Damaged regions are emergency errors
// and requires user to manually remove the `evict-leader-scheduler` with pd-ctl
func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) {
Expand Down
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,11 @@ func (s *Server) GetHTTPClient() *http.Client {
return s.httpClient
}

// GetFinalPathWithinPD returns the etcd path.
func (s *Server) GetFinalPathWithinPD(configPath string) string {
return strings.Join([]string{s.rootPath, configPath}, "/")
}

// GetLeader returns the leader of PD cluster(i.e the PD leader).
func (s *Server) GetLeader() *pdpb.Member {
return s.member.GetLeader()
Expand Down
Loading