Skip to content

Commit

Permalink
support watch resource group
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 10, 2023
1 parent 1b778f2 commit fb9a3db
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 190 deletions.
21 changes: 14 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type Region struct {

// GlobalConfigItem standard format of KV pair in GlobalConfig client
type GlobalConfigItem struct {
Name string
Value string
Error error
Name string
Value string
ItemKind pdpb.ItemKind
Error error
}

// Client is a PD (Placement Driver) client.
Expand Down Expand Up @@ -126,7 +127,7 @@ type Client interface {
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error)
WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error

Expand All @@ -137,6 +138,8 @@ type Client interface {

// KeyspaceClient manages keyspace metadata.
KeyspaceClient
// ResourceManagerClient manages resource manager metadata.
ResourceManagerClient
// Close closes the client.
Close()
}
Expand Down Expand Up @@ -1855,9 +1858,13 @@ func (c *client) StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem
return err
}

func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error) {
func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) {
// register watch components
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{})
res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{
ConfigPath: configPath,
Revision: revision,
})
if err != nil {
close(globalConfigWatcherCh)
return nil, err
Expand All @@ -1881,7 +1888,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetName(), i.GetValue(), nil}
arr[j] = GlobalConfigItem{i.GetName(), i.GetValue(), i.GetKind(), nil}
}
globalConfigWatcherCh <- arr
}
Expand Down
3 changes: 3 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/client
go 1.16

require (
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
Expand All @@ -14,3 +15,5 @@ require (
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.43.0
)

replace github.com/pingcap/kvproto => github.com/HuSharp/kvproto v0.0.0-20230109023741-068474f998ee
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/HuSharp/kvproto v0.0.0-20230109023741-068474f998ee h1:PXMyepFrImElFEbi3mqHC1iCdbNa5ATelzzoqJlz0pg=
github.com/HuSharp/kvproto v0.0.0-20230109023741-068474f998ee/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -104,8 +106,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172 h1:FYgKV9znRQmzVrrJDZ0gUfMIvKLAMU1tu1UKJib8bEQ=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
87 changes: 87 additions & 0 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/pdpb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// TODO: delete it when update proto
const GroupSettingsPathPrefix = "/settings"

// ResourceManagerClient manages resource groups.
type ResourceManagerClient interface {
// WatchResourceGroup watches resource groups changes.
WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error)
}

// ResourceManagerClient returns the ResourceManagerClient from current PD leader.
func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return rmpb.NewResourceManagerClient(cc.(*grpc.ClientConn))
}
return nil
}

// WatchResourceGroup watches resource groups changes.
// It returns a stream of slices of resource groups.
// The first message in stream contains all current resource groups,
// all subsequent messages contains new put events for all resource groups.
func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {
configChan, err := c.WatchGlobalConfig(ctx, GroupSettingsPathPrefix, revision)
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup, 16)
go func() {
defer func() {
if r := recover(); r != nil {
log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
close(resourceGroupWatcherChan)
return
case res, ok := <-configChan:
if !ok {
close(resourceGroupWatcherChan)
return
}
groups := make([]*rmpb.ResourceGroup, 0, len(res))
for _, item := range res {
switch item.ItemKind {
case pdpb.ItemKind_PUT:
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(item.Value), group); err != nil {
return
}
groups = append(groups, group)
case pdpb.ItemKind_DELETE:

}

}
resourceGroupWatcherChan <- groups
}
}
}()
return resourceGroupWatcherChan, err
}
48 changes: 25 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ require (
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.6.0
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
Expand All @@ -43,15 +43,17 @@ require (
github.com/urfave/negroni v0.3.0
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.19.1
golang.org/x/text v0.3.7
go.uber.org/zap v1.20.0
golang.org/x/text v0.5.0
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/tools v0.1.10
google.golang.org/grpc v1.26.0
golang.org/x/tools v0.4.0
google.golang.org/grpc v1.43.0
gotest.tools/gotestsum v1.7.0
)

require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/onsi/gomega v1.20.1 // indirect
Expand Down Expand Up @@ -104,7 +106,7 @@ require (
github.com/google/uuid v1.1.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.12.1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69 // indirect
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand All @@ -115,7 +117,6 @@ require (
github.com/joomcode/errorx v1.0.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
Expand All @@ -134,23 +135,23 @@ require (
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect
github.com/prometheus/procfs v0.0.3 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
github.com/shirou/gopsutil v3.21.3+incompatible // indirect
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 // indirect
github.com/thoas/go-funk v0.8.0 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.4 // indirect
github.com/tklauser/numcpus v0.2.1 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
github.com/urfave/cli/v2 v2.3.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
Expand All @@ -165,15 +166,14 @@ require (
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/exp v0.0.0-20220321173239-a90fa8a75705
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.3.0 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/term v0.3.0 // indirect
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c // indirect
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand All @@ -189,3 +189,5 @@ require (
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/kvproto => github.com/HuSharp/kvproto v0.0.0-20230109023741-068474f998ee
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
Loading

0 comments on commit fb9a3db

Please sign in to comment.