Skip to content

Commit

Permalink
Merge branch 'master' into fix-test20
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored Jun 25, 2024
2 parents dc810a1 + 49c3b5a commit 9aa8c16
Show file tree
Hide file tree
Showing 181 changed files with 4,150 additions and 3,088 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ coverage
*.txt
go.work*
embedded_assets_handler.go
*.log
13 changes: 12 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ linters:
- testifylint
- gofmt
- revive
disable:
- errcheck
linters-settings:
gocritic:
Expand Down Expand Up @@ -199,3 +198,15 @@ linters-settings:
severity: warning
disabled: false
exclude: [""]
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
linters:
- errcheck
# following path will enable in the future
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
- path: (pkg/tso/admin.go|pkg/schedule/schedulers/split_bucket.go|server/api/plugin_disable.go|server/api/plugin_disable.go|server/api/operator.go|server/api/region.go|pkg/schedule/schedulers/balance_leader.go|server/api/.*\.go|pkg/replication/replication_mode.go|pkg/storage/endpoint/gc_safe_point.go|server/.*\.go|pkg/schedule/schedulers/.*\.go|pkg/syncer/server.go)
linters:
- errcheck
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ test-tso-consistency: install-tools
REAL_CLUSTER_TEST_PATH := $(ROOT_PATH)/tests/integrations/realcluster

test-real-cluster:
@ rm -rf ~/.tiup/data/pd_real_cluster_test
# testing with the real cluster...
cd $(REAL_CLUSTER_TEST_PATH) && $(MAKE) check

Expand Down
26 changes: 26 additions & 0 deletions OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- AndreMouche
- binshi-bing
- bufferflies
- CabinfeverB
- Connor1996
- disksing
- huachaohuang
- HunDunDM
- HuSharp
- JmPotato
- lhy1024
- nolouch
- overvenus
- qiuyesuifeng
- rleungx
- siddontang
- Yisaer
- zhouqiang-cl
reviewers:
- BusyJay
- howardlau1999
- Luffbee
- shafreeck
- xhebox
6 changes: 6 additions & 0 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Sort the member alphabetically.
aliases:
sig-critical-approvers-config:
- easonn7
- kevin-xianliu
- niubell
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ If you're interested in contributing to PD, see [CONTRIBUTING.md](./CONTRIBUTING

## Build

1. Make sure [*Go*](https://golang.org/) (version 1.20) is installed.
2. Use `make` to install PD. PD is installed in the `bin` directory.
1. Make sure [*Go*](https://golang.org/) (version 1.21) is installed.
2. Use `make` to install PD. `pd-server` will be installed in the `bin` directory.

## Usage

Expand Down
139 changes: 127 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package pd
import (
"context"
"crypto/tls"
"encoding/hex"
"fmt"
"net/url"
"runtime/trace"
"strings"
"sync"
Expand Down Expand Up @@ -85,11 +87,18 @@ type RPCClient interface {
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error)
// Deprecated: use BatchScanRegions instead.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
// Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error)
// BatchScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
// The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned.
BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -259,6 +268,15 @@ func WithForwardingOption(enableForwarding bool) ClientOption {
}
}

// WithTSOServerProxyOption configures the client to use TSO server proxy,
// i.e., the client will send TSO requests to the API leader (the TSO server
// proxy) which will forward the requests to the TSO servers.
func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption {
return func(c *client) {
c.option.useTSOServerProxy = useTSOServerProxy
}
}

// WithMaxErrorRetry configures the client max retry times when connect meets error.
func WithMaxErrorRetry(count int) ClientOption {
return func(c *client) {
Expand Down Expand Up @@ -337,6 +355,38 @@ type SecurityOption struct {
SSLKEYBytes []byte
}

// KeyRange defines a range of keys in bytes.
type KeyRange struct {
StartKey []byte
EndKey []byte
}

// NewKeyRange creates a new key range structure with the given start key and end key bytes.
// Notice: the actual encoding of the key range is not specified here. It should be either UTF-8 or hex.
// - UTF-8 means the key has already been encoded into a string with UTF-8 encoding, like:
// []byte{52 56 54 53 54 99 54 99 54 102 50 48 53 55 54 102 55 50 54 99 54 52}, which will later be converted to "48656c6c6f20576f726c64"
// by using `string()` method.
// - Hex means the key is just a raw hex bytes without encoding to a UTF-8 string, like:
// []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}, which will later be converted to "48656c6c6f20576f726c64"
// by using `hex.EncodeToString()` method.
func NewKeyRange(startKey, endKey []byte) *KeyRange {
return &KeyRange{startKey, endKey}
}

// EscapeAsUTF8Str returns the URL escaped key strings as they are UTF-8 encoded.
func (r *KeyRange) EscapeAsUTF8Str() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(string(r.StartKey))
endKeyStr = url.QueryEscape(string(r.EndKey))
return
}

// EscapeAsHexStr returns the URL escaped key strings as they are hex encoded.
func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) {
startKeyStr = url.QueryEscape(hex.EncodeToString(r.StartKey))
endKeyStr = url.QueryEscape(hex.EncodeToString(r.EndKey))
return
}

// NewClient creates a PD client.
func NewClient(
svrAddrs []string, security SecurityOption, opts ...ClientOption,
Expand Down Expand Up @@ -607,6 +657,11 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()

if c.option.useTSOServerProxy {
// If we are using TSO server proxy, we always use PD_SVC_MODE.
newMode = pdpb.ServiceMode_PD_SVC_MODE
}

if newMode == c.serviceMode {
return
}
Expand Down Expand Up @@ -1094,6 +1149,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
//nolint:staticcheck
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
failpoint.Inject("responseNil", func() {
resp = nil
Expand All @@ -1103,6 +1159,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
//nolint:staticcheck
resp, err = protoClient.ScanRegions(cctx, req)
}

Expand All @@ -1113,6 +1170,74 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
return handleRegionsResponse(resp), nil
}

func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }()

var cancel context.CancelFunc
scanCtx := ctx
if _, ok := ctx.Deadline(); !ok {
scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout)
defer cancel()
}
options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
pbRanges := make([]*pdpb.KeyRange, 0, len(ranges))
for _, r := range ranges {
pbRanges = append(pbRanges, &pdpb.KeyRange{StartKey: r.StartKey, EndKey: r.EndKey})
}
req := &pdpb.BatchScanRegionsRequest{
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
}
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req)
failpoint.Inject("responseNil", func() {
resp = nil
})
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(scanCtx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.BatchScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
}

return handleBatchRegionsResponse(resp), nil
}

func handleBatchRegionsResponse(resp *pdpb.BatchScanRegionsResponse) []*Region {
regions := make([]*Region, 0, len(resp.GetRegions()))
for _, r := range resp.GetRegions() {
region := &Region{
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Buckets: r.Buckets,
}
for _, p := range r.DownPeers {
region.DownPeers = append(region.DownPeers, p.Peer)
}
regions = append(regions, region)
}
return regions
}

func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
var regions []*Region
if len(resp.GetRegions()) == 0 {
Expand All @@ -1131,6 +1256,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
Meta: r.Region,
Leader: r.Leader,
PendingPeers: r.PendingPeers,
Buckets: r.Buckets,
}
for _, p := range r.DownPeers {
region.DownPeers = append(region.DownPeers, p.Peer)
Expand Down Expand Up @@ -1431,17 +1557,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == errs.ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
Expand Down
13 changes: 6 additions & 7 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ import (
"github.com/pingcap/errors"
)

// Note: keep the same as the ones defined on the server side to ensure the client can use them correctly.
const (
// NoLeaderErr indicates there is no leader in the cluster currently.
NoLeaderErr = "no leader"
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
NotLeaderErr = "not leader"
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
NotPrimaryErr = "not primary"
)

// client errors
Expand Down
18 changes: 18 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,29 @@
package errs

import (
"strings"

"github.com/pingcap/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// IsLeaderChange will determine whether there is a leader/primary change.
func IsLeaderChange(err error) bool {
if err == nil {
return false
}
if err == ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, NoLeaderErr) ||
strings.Contains(errMsg, NotLeaderErr) ||
strings.Contains(errMsg, MismatchLeaderErr) ||
strings.Contains(errMsg, NotServedErr) ||
strings.Contains(errMsg, NotPrimaryErr)
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.62.1
Expand All @@ -34,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
Loading

0 comments on commit 9aa8c16

Please sign in to comment.