diff --git a/client/client.go b/client/client.go index 05e6669ffbb..466e5620017 100644 --- a/client/client.go +++ b/client/client.go @@ -1821,18 +1821,69 @@ func trimHTTPPrefix(str string) string { } func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) { - // TODO: complete this function with new implementation. - return nil, 0, nil + resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{ConfigPath: configPath}) + if err != nil { + return nil, 0, err + } + + res := make([]GlobalConfigItem, len(resp.GetItems())) + for i, item := range resp.GetItems() { + cfg := GlobalConfigItem{Name: item.GetName()} + cfg.Value = item.GetValue() + res[i] = cfg + } + return res, resp.GetRevision(), nil } func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error { - // TODO: complete this function with new implementation. + resArr := make([]*pdpb.GlobalConfigItem, len(items)) + for i, it := range items { + resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType} + } + _, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath}) + if err != nil { + return err + } return nil } func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) { - // TODO: complete this function with new implementation. - return nil, nil + // TODO: Add retry mechanism + // register watch components there + globalConfigWatcherCh := make(chan []GlobalConfigItem, 16) + res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{ + ConfigPath: configPath, + Revision: revision, + }) + if err != nil { + close(globalConfigWatcherCh) + return nil, err + } + go func() { + defer func() { + close(globalConfigWatcherCh) + if r := recover(); r != nil { + log.Error("[pd] panic in client `WatchGlobalConfig`", zap.Any("error", r)) + return + } + }() + for { + m, err := res.Recv() + if err != nil { + return + } + arr := make([]GlobalConfigItem, len(m.Changes)) + for j, i := range m.Changes { + arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue()} + } + select { + case <-ctx.Done(): + return + case globalConfigWatcherCh <- arr: + } + } + }() + return globalConfigWatcherCh, err } func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { diff --git a/client/go.mod b/client/go.mod index bd72354f277..320c8cad9dd 100644 --- a/client/go.mod +++ b/client/go.mod @@ -3,10 +3,11 @@ module github.com/tikv/pd/client go 1.16 require ( + github.com/gogo/protobuf v1.3.2 github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 + github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.0 github.com/stretchr/testify v1.8.1 diff --git a/client/go.sum b/client/go.sum index 7c16601b20b..0156bfe1b21 100644 --- a/client/go.sum +++ b/client/go.sum @@ -561,8 +561,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/resourcemanager_client.go b/client/resourcemanager_client.go index 6a8f99e909a..59bf605abaa 100644 --- a/client/resourcemanager_client.go +++ b/client/resourcemanager_client.go @@ -18,7 +18,9 @@ import ( "context" "time" + "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" "go.uber.org/zap" @@ -28,8 +30,9 @@ import ( type actionType int const ( - add actionType = 0 - modify actionType = 1 + add actionType = 0 + modify actionType = 1 + groupSettingsPathPrefix = "resource_group/settings" ) // ResourceManagerClient manages resource group info and token request. @@ -39,6 +42,7 @@ type ResourceManagerClient interface { AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) + WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) } @@ -124,12 +128,58 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri return resp.GetBody(), nil } +// WatchResourceGroup [just for TEST] watches resource groups changes. +// It returns a stream of slices of resource groups. +// The first message in stream contains all current resource groups, +// all subsequent messages contains new events[PUT/DELETE] for all resource groups. +func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) { + configChan, err := c.WatchGlobalConfig(ctx, groupSettingsPathPrefix, revision) + if err != nil { + return nil, err + } + resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup) + go func() { + defer func() { + close(resourceGroupWatcherChan) + if r := recover(); r != nil { + log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r)) + return + } + }() + for { + select { + case <-ctx.Done(): + return + case res, ok := <-configChan: + if !ok { + return + } + groups := make([]*rmpb.ResourceGroup, 0, len(res)) + for _, item := range res { + switch item.EventType { + case pdpb.EventType_PUT: + group := &rmpb.ResourceGroup{} + if err := proto.Unmarshal([]byte(item.Value), group); err != nil { + return + } + groups = append(groups, group) + case pdpb.EventType_DELETE: + continue + } + } + resourceGroupWatcherChan <- groups + } + } + }() + return resourceGroupWatcherChan, err +} + func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { req := &tokenRequest{ done: make(chan error, 1), requestCtx: ctx, clientCtx: c.ctx, - Requeset: request, + Request: request, } c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req grantedTokens, err := req.Wait() @@ -143,7 +193,7 @@ type tokenRequest struct { clientCtx context.Context requestCtx context.Context done chan error - Requeset *rmpb.TokenBucketsRequest + Request *rmpb.TokenBucketsRequest TokenBuckets []*rmpb.TokenBucketResponse } @@ -232,7 +282,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb } func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error { - req := t.Requeset + req := t.Request if err := stream.Send(req); err != nil { err = errors.WithStack(err) t.done <- err diff --git a/go.mod b/go.mod index b10f0b4e770..78fb3575a20 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 + github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e diff --git a/go.sum b/go.sum index bad8e2a53f4..fd220038863 100644 --- a/go.sum +++ b/go.sum @@ -368,8 +368,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30= github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/server/grpc_service.go b/server/grpc_service.go index c818016bb0a..d3f9cb1de4c 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -51,9 +51,6 @@ const ( // tso maxMergeTSORequests = 10000 defaultTSOProxyTimeout = 3 * time.Second - - // global config - globalConfigPath = "/global/config/" ) // gRPC errors @@ -1888,9 +1885,13 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { ops := make([]clientv3.Op, len(request.Changes)) for i, item := range request.Changes { - name := globalConfigPath + item.GetName() - value := item.GetValue() - ops[i] = clientv3.OpPut(name, value) + name := item.GetName() + switch item.GetKind() { + case pdpb.EventType_PUT: + ops[i] = clientv3.OpPut(s.GetFinalPathWithinPD(request.GetConfigPath()+name), item.GetValue()) + case pdpb.EventType_DELETE: + ops[i] = clientv3.OpDelete(s.GetFinalPathWithinPD(request.GetConfigPath() + name)) + } } res, err := kv.NewSlowLogTxn(s.client).Then(ops...).Commit() @@ -1900,41 +1901,56 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo if !res.Succeeded { return &pdpb.StoreGlobalConfigResponse{}, errors.Errorf("failed to execute StoreGlobalConfig transaction") } - return &pdpb.StoreGlobalConfigResponse{}, err + return &pdpb.StoreGlobalConfigResponse{}, nil } // LoadGlobalConfig load global config from etcd func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { - // TODO: complete this function with new implementation - return &pdpb.LoadGlobalConfigResponse{}, nil + configPath := request.GetConfigPath() + r, err := s.client.Get(ctx, s.GetFinalPathWithinPD(configPath), clientv3.WithPrefix()) + if err != nil { + return &pdpb.LoadGlobalConfigResponse{}, err + } + res := make([]*pdpb.GlobalConfigItem, len(r.Kvs)) + for i, value := range r.Kvs { + res[i] = &pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: string(value.Key), Value: string(value.Value)} + } + return &pdpb.LoadGlobalConfigResponse{Items: res, Revision: r.Header.GetRevision()}, nil } // WatchGlobalConfig if the connection of WatchGlobalConfig is end -// or stoped by whatever reason -// just reconnect to it. -func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { +// or stopped by whatever reason, just reconnect to it. +// Watch on revision which greater than or equal to the required revision. +func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { ctx, cancel := context.WithCancel(s.Context()) defer cancel() - err := s.sendAllGlobalConfig(ctx, server) - if err != nil { - return err - } - watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix()) + revision := req.GetRevision() + // If the revision is compacted, will meet required revision has been compacted error. + // - If required revision < CompactRevision, we need to reload all configs to avoid losing data. + // - If required revision >= CompactRevision, just keep watching. + watchChan := s.client.Watch(ctx, s.GetFinalPathWithinPD(req.GetConfigPath()), clientv3.WithPrefix(), clientv3.WithRev(revision)) for { select { case <-ctx.Done(): return nil case res := <-watchChan: + if revision < res.CompactRevision { + if err := server.Send(&pdpb.WatchGlobalConfigResponse{ + Revision: res.CompactRevision, + Header: s.wrapErrorToHeader(pdpb.ErrorType_DATA_COMPACTED, + fmt.Sprintf("required watch revision: %d is smaller than current compact/min revision. %d", revision, res.CompactRevision)), + }); err != nil { + return err + } + } + revision = res.Header.GetRevision() + cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events)) for _, e := range res.Events { - if e.Type != clientv3.EventTypePut { - continue - } - cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)}) + cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value), Kind: pdpb.EventType(e.Type)}) } if len(cfgs) > 0 { - err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs}) - if err != nil { + if err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs, Revision: res.Header.GetRevision()}); err != nil { return err } } @@ -1942,19 +1958,6 @@ func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server } } -func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error { - configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix()) - if err != nil { - return err - } - ls := make([]*pdpb.GlobalConfigItem, configList.Count) - for i, kv := range configList.Kvs { - ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)} - } - err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls}) - return err -} - // Evict the leaders when the store is damaged. Damaged regions are emergency errors // and requires user to manually remove the `evict-leader-scheduler` with pd-ctl func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) { diff --git a/server/server.go b/server/server.go index d658115f526..4def1dd6bea 100644 --- a/server/server.go +++ b/server/server.go @@ -730,6 +730,11 @@ func (s *Server) GetHTTPClient() *http.Client { return s.httpClient } +// GetFinalPathWithinPD returns the etcd path. +func (s *Server) GetFinalPathWithinPD(configPath string) string { + return strings.Join([]string{s.rootPath, configPath}, "/") +} + // GetLeader returns the leader of PD cluster(i.e the PD leader). func (s *Server) GetLeader() *pdpb.Member { return s.member.GetLeader() diff --git a/tests/client/global_config_test.go b/tests/client/global_config_test.go new file mode 100644 index 00000000000..2c57033865a --- /dev/null +++ b/tests/client/global_config_test.go @@ -0,0 +1,215 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client_test + +import ( + "strconv" + "testing" + "time" + + pd "github.com/tikv/pd/client" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/utils/assertutil" + "github.com/tikv/pd/server" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const globalConfigPath = "global/config/" + +type testReceiver struct { + re *require.Assertions + grpc.ServerStream +} + +func (s testReceiver) Send(m *pdpb.WatchGlobalConfigResponse) error { + log.Info("received", zap.Any("received", m.GetChanges())) + for _, change := range m.GetChanges() { + switch change.GetKind() { + case pdpb.EventType_PUT: + s.re.Contains(change.Name, globalConfigPath+change.Value) + case pdpb.EventType_DELETE: + s.re.Empty(change.Value) + } + } + return nil +} + +type globalConfigTestSuite struct { + suite.Suite + server *server.GrpcServer + client pd.Client + cleanup server.CleanupFunc +} + +func TestGlobalConfigTestSuite(t *testing.T) { + suite.Run(t, new(globalConfigTestSuite)) +} + +func (suite *globalConfigTestSuite) SetupSuite() { + var err error + var gsi *server.Server + checker := assertutil.NewChecker() + checker.FailNow = func() {} + gsi, suite.cleanup, err = server.NewTestServer(checker) + suite.server = &server.GrpcServer{Server: gsi} + suite.NoError(err) + addr := suite.server.GetAddr() + suite.client, err = pd.NewClientWithContext(suite.server.Context(), []string{addr}, pd.SecurityOption{}) + suite.NoError(err) +} + +func (suite *globalConfigTestSuite) TearDownSuite() { + suite.client.Close() + suite.cleanup() +} + +func (suite *globalConfigTestSuite) GetEtcdPath(configPath string) string { + return suite.server.GetFinalPathWithinPD(globalConfigPath + configPath) +} + +func (suite *globalConfigTestSuite) TestLoad() { + defer func() { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + }() + r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + ConfigPath: globalConfigPath, + }) + suite.NoError(err) + suite.Len(res.Items, 1) + suite.Equal(r.Header.GetRevision(), res.Revision) + suite.Equal("test", res.Items[0].Value) +} + +func (suite *globalConfigTestSuite) TestStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), globalConfigPath+"test") + suite.NoError(err) + } + }() + changes := []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Value: "0"}, {Kind: pdpb.EventType_PUT, Name: "1", Value: "1"}, {Kind: pdpb.EventType_PUT, Name: "2", Value: "2"}} + _, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: changes, + }) + suite.NoError(err) + for i := 0; i < 3; i++ { + res, err := suite.server.GetClient().Get(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + suite.Equal(suite.GetEtcdPath(string(res.Kvs[0].Value)), string(res.Kvs[0].Key)) + } +} + +func (suite *globalConfigTestSuite) TestWatch() { + defer func() { + for i := 0; i < 3; i++ { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + server := testReceiver{re: suite.Require()} + go suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Revision: 0, + }, server) + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } +} + +func (suite *globalConfigTestSuite) TestClientLoad() { + defer func() { + _, err := suite.server.GetClient().Delete(suite.server.Context(), globalConfigPath+"test") + suite.NoError(err) + }() + r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") + suite.NoError(err) + res, revision, err := suite.client.LoadGlobalConfig(suite.server.Context(), globalConfigPath) + suite.NoError(err) + suite.Len(res, 1) + suite.Equal(r.Header.GetRevision(), revision) + suite.Equal(pd.GlobalConfigItem{Name: suite.GetEtcdPath("test"), Value: "test", EventType: pdpb.EventType_PUT}, res[0]) +} + +func (suite *globalConfigTestSuite) TestClientStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + err := suite.client.StoreGlobalConfig(suite.server.Context(), globalConfigPath, + []pd.GlobalConfigItem{{Name: "0", Value: "0"}, {Name: "1", Value: "1"}, {Name: "2", Value: "2"}}) + suite.NoError(err) + for i := 0; i < 3; i++ { + res, err := suite.server.GetClient().Get(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + suite.Equal(suite.GetEtcdPath(string(res.Kvs[0].Value)), string(res.Kvs[0].Key)) + } +} + +func (suite *globalConfigTestSuite) TestClientWatchWithRevision() { + defer func() { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + + for i := 0; i < 9; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + // Mock get revision by loading + r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") + suite.NoError(err) + res, revision, err := suite.client.LoadGlobalConfig(suite.server.Context(), globalConfigPath) + suite.NoError(err) + suite.Len(res, 1) + suite.Equal(r.Header.GetRevision(), revision) + suite.Equal(pd.GlobalConfigItem{Name: suite.GetEtcdPath("test"), Value: "test", EventType: pdpb.EventType_PUT}, res[0]) + // Mock when start watcher there are existed some keys, will load firstly + for i := 3; i < 6; i++ { + _, err = suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + // Start watcher at next revision + configChan, err := suite.client.WatchGlobalConfig(suite.server.Context(), globalConfigPath, revision) + suite.NoError(err) + // Mock put + for i := 6; i < 9; i++ { + _, err = suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + for { + select { + case <-time.After(time.Second): + return + case res := <-configChan: + for _, r := range res { + suite.Equal(suite.GetEtcdPath(r.Value), r.Name) + } + } + } +} diff --git a/tests/client/go.mod b/tests/client/go.mod index 1b7be7e9dbe..d2e58cec21e 100644 --- a/tests/client/go.mod +++ b/tests/client/go.mod @@ -4,12 +4,14 @@ go 1.19 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 + github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 + github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.1 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793 go.uber.org/goleak v1.1.12 + go.uber.org/zap v1.20.0 google.golang.org/grpc v1.51.0 ) @@ -94,7 +96,6 @@ require ( github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect - github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d // indirect github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect @@ -132,7 +133,6 @@ require ( go.uber.org/dig v1.9.0 // indirect go.uber.org/fx v1.12.0 // indirect go.uber.org/multierr v1.7.0 // indirect - go.uber.org/zap v1.20.0 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/exp v0.0.0-20220321173239-a90fa8a75705 // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect diff --git a/tests/client/go.sum b/tests/client/go.sum index 8a2a082f1ae..aa8890c181d 100644 --- a/tests/client/go.sum +++ b/tests/client/go.sum @@ -789,8 +789,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/msc/go.mod b/tests/msc/go.mod index f8bb15e5ae4..55ab5995b54 100644 --- a/tests/msc/go.mod +++ b/tests/msc/go.mod @@ -3,7 +3,7 @@ module github.com/tikv/pd/tests/msc go 1.19 require ( - github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 + github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 github.com/stretchr/testify v1.8.1 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/msc/go.sum b/tests/msc/go.sum index 205cbb7468e..8db354013f3 100644 --- a/tests/msc/go.sum +++ b/tests/msc/go.sum @@ -791,8 +791,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/msc/resource_manager/resource_manager_test.go b/tests/msc/resource_manager/resource_manager_test.go index 68a8a7a2cc1..0bff664820a 100644 --- a/tests/msc/resource_manager/resource_manager_test.go +++ b/tests/msc/resource_manager/resource_manager_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "io" "net/http" + "strconv" "strings" "testing" "time" @@ -76,6 +77,87 @@ func (suite *resourceManagerClientTestSuite) TearDownSuite() { suite.cluster.Destroy() } +func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { + re := suite.Require() + cli := suite.client + group := &rmpb.ResourceGroup{ + Name: "test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RRU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, + } + // Mock get revision by listing + for i := 0; i < 3; i++ { + group.Name += strconv.Itoa(i) + resp, err := cli.AddResourceGroup(suite.ctx, group) + group.Name = "test" + re.NoError(err) + re.Contains(resp, "Success!") + } + lresp, err := cli.ListResourceGroups(suite.ctx) + re.NoError(err) + re.Equal(len(lresp), 3) + // Start watcher + watchChan, err := suite.client.WatchResourceGroup(suite.ctx, int64(0)) + suite.NoError(err) + // Mock add resource groups + for i := 3; i < 9; i++ { + group.Name = "test" + strconv.Itoa(i) + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + // Mock modify resource groups + modifySettings := func(gs *rmpb.ResourceGroup) { + gs.RUSettings = &rmpb.GroupRequestUnitSettings{ + RRU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 20000, + }, + }, + } + } + for i := 0; i < 9; i++ { + group.Name = "test" + strconv.Itoa(i) + modifySettings(group) + resp, err := cli.ModifyResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + // Mock delete resource groups + for i := 0; i < 9; i++ { + resp, err := cli.DeleteResourceGroup(suite.ctx, "test"+strconv.Itoa(i)) + re.NoError(err) + re.Contains(resp, "Success!") + } + // Check watch result + i := 0 + for { + select { + case <-time.After(time.Second): + return + case res := <-watchChan: + if i < 6 { + for _, r := range res { + suite.Equal(uint64(10000), r.RUSettings.RRU.Settings.FillRate) + i++ + } + } else { // after modify + for _, r := range res { + suite.Equal(uint64(20000), r.RUSettings.RRU.Settings.FillRate) + i++ + } + } + } + } +} + func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re := suite.Require() cli := suite.client @@ -170,7 +252,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { } } -func (suite *resourceManagerClientTestSuite) TestBasicReourceGroupCURD() { +func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { re := suite.Require() cli := suite.client @@ -289,7 +371,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicReourceGroupCURD() { // Last one, Check list and delete all resource groups if i == len(testCasesSet1)-1 { - // List Resource Group + // List Resource Groups lresp, err := cli.ListResourceGroups(suite.ctx) re.NoError(err) re.Equal(finalNum, len(lresp)) diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index f7f78e24d6d..ad620659cbf 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -561,8 +561,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6 h1:jilku71qYv56a7uM+Q3AJxf4J+iXe3aGovkd0f8hcgc= -github.com/pingcap/kvproto v0.0.0-20230117104311-1bc802baaad6/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw= +github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=