Skip to content

Commit

Permalink
Merge branch 'master' into fix-member
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored May 25, 2023
2 parents 6b32e4c + 99e2419 commit cd208c4
Show file tree
Hide file tree
Showing 44 changed files with 1,345 additions and 470 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type Client interface {
MetaStorageClient
// KeyspaceClient manages keyspace metadata.
KeyspaceClient
// GCClient manages gcSafePointV2 and serviceSafePointV2
GCClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
// Close closes the client.
Expand Down
29 changes: 15 additions & 14 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,21 @@ const (

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed, %s", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
)

// grpcutil errors
Expand Down
137 changes: 137 additions & 0 deletions client/gc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// GCClient is a client for doing GC
type GCClient interface {
UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error)
UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error)
WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error)
}

// UpdateGCSafePointV2 update gc safe point for the given keyspace.
func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.UpdateGCSafePointV2Request{
Header: c.requestHeader(),
KeyspaceId: keyspaceID,
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateGCSafePointV2(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
}

// UpdateServiceSafePointV2 update service safe point for the given keyspace.
func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.UpdateServiceSafePointV2Request{
Header: c.requestHeader(),
KeyspaceId: keyspaceID,
ServiceId: []byte(serviceID),
SafePoint: safePoint,
Ttl: ttl,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateServiceSafePointV2(ctx, req)
cancel()
if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
}

// WatchGCSafePointV2 watch gc safe point change.
func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) {
SafePointEventsChan := make(chan []*pdpb.SafePointEvent)
req := &pdpb.WatchGCSafePointV2Request{
Header: c.requestHeader(),
Revision: revision,
}

protoClient := c.getClient()
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
stream, err := protoClient.WatchGCSafePointV2(ctx, req)
if err != nil {
close(SafePointEventsChan)
return nil, err
}
go func() {
defer func() {
close(SafePointEventsChan)
if r := recover(); r != nil {
log.Error("[pd] panic in gc client `WatchGCSafePointV2`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := stream.Recv()
if err != nil {
log.Error("watch gc safe point v2 error", errs.ZapError(errs.ErrClientWatchGCSafePointV2Stream, err))
return
}
SafePointEventsChan <- resp.GetEvents()
}
}
}()
return SafePointEventsChan, err
}
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
8 changes: 8 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ var (
cmdDurationUpdateKeyspaceState prometheus.Observer
cmdDurationGet prometheus.Observer
cmdDurationPut prometheus.Observer
cmdDurationUpdateGCSafePointV2 prometheus.Observer
cmdDurationUpdateServiceSafePointV2 prometheus.Observer

cmdFailDurationGetRegion prometheus.Observer
cmdFailDurationTSO prometheus.Observer
Expand All @@ -157,6 +159,8 @@ var (
requestDurationTSO prometheus.Observer
cmdFailedDurationGet prometheus.Observer
cmdFailedDurationPut prometheus.Observer
cmdFailedDurationUpdateGCSafePointV2 prometheus.Observer
cmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer
)

func initCmdDurations() {
Expand All @@ -182,6 +186,8 @@ func initCmdDurations() {
cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
cmdDurationGet = cmdDuration.WithLabelValues("get")
cmdDurationPut = cmdDuration.WithLabelValues("put")
cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2")
cmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -198,6 +204,8 @@ func initCmdDurations() {
requestDurationTSO = requestDuration.WithLabelValues("tso")
cmdFailedDurationGet = cmdFailedDuration.WithLabelValues("get")
cmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put")
cmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2")
cmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2")
}

func registerMetrics() {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ error = '''
TiKV cluster not bootstrapped, please start TiKV first
'''

["PD:cluster:ErrSchedulingIsHalted"]
error = '''
scheduling is halted
'''

["PD:cluster:ErrStoreIsUp"]
error = '''
store is still up, please remove store gracefully
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
109 changes: 108 additions & 1 deletion metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -2340,6 +2340,113 @@
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": true,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The allowance status of the scheduling.",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 0,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 41
},
"hiddenSeries": false,
"id": 1464,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": false,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 1,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": true,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "pd_scheduling_allowance_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{kind}}",
"metric": "pd_scheduling_allowance_status",
"refId": "A",
"step": 2
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Scheduling Allowance Status",
"tooltip": {
"shared": true,
"sort": 1,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:533",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"$$hashKey": "object:534",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"cacheTimeout": null,
"colorBackground": false,
Expand Down Expand Up @@ -2967,7 +3074,7 @@
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{event}}",
"metric": "pd_scheduler_status",
"metric": "pd_schedule_operators_count",
"refId": "A",
"step": 4
}
Expand Down
Loading

0 comments on commit cd208c4

Please sign in to comment.