Skip to content

Commit

Permalink
Merge branch 'master' into add-lock-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Mar 4, 2024
2 parents ecf93b6 + 2851db2 commit 7a5d15a
Show file tree
Hide file tree
Showing 78 changed files with 949 additions and 319 deletions.
7 changes: 5 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ linters-settings:
- require-error
- suite-dont-use-pkg
- suite-extra-assert-call
disable:
- float-compare
- go-require
gofmt:
# https://golangci-lint.run/usage/linters/#gofmt
# disable for faster check
simplify: false
rewrite-rules:
- pattern: 'interface{}'
replacement: 'any'
- pattern: "interface{}"
replacement: "any"
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)

install-tools:
@mkdir -p $(GO_TOOLS_BIN_PATH)
@which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.55.2
@which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.56.2
@grep '_' tools.go | sed 's/"//g' | awk '{print $$2}' | xargs go install

.PHONY: install-tools
Expand Down
9 changes: 6 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func newClientWithKeyspaceName(
return nil
}

// Create a PD service discovery with null keyspace id, then query the real id wth the keyspace name,
// Create a PD service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
c.pdSvcDiscovery = newPDServiceDiscovery(
clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option)
Expand Down Expand Up @@ -702,6 +702,9 @@ func (c *client) UpdateOption(option DynamicOption, value any) error {
return err
}
case EnableTSOFollowerProxy:
if c.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE {
return errors.New("[pd] tso follower proxy is only supported in PD service mode")
}
enable, ok := value.(bool)
if !ok {
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
Expand Down Expand Up @@ -788,10 +791,10 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
return req
}

if err := tsoClient.dispatchRequest(dcLocation, req); err != nil {
if err := tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
// Wait for a while and try again
time.Sleep(50 * time.Millisecond)
if err = tsoClient.dispatchRequest(dcLocation, req); err != nil {
if err = tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
req.done <- err
}
}
Expand Down
14 changes: 13 additions & 1 deletion client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -62,7 +63,18 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g
if err != nil {
return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause()
}
cc, err := grpc.DialContext(ctx, u.Host, append(do, opt)...)
// Here we use a shorter MaxDelay to make the connection recover faster.
// The default MaxDelay is 120s, which is too long for us.
backoffOpts := grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Multiplier: 1.6,
Jitter: 0.2,
MaxDelay: 3 * time.Second,
},
})
do = append(do, opt, backoffOpts)
cc, err := grpc.DialContext(ctx, u.Host, do...)
if err != nil {
return nil, errs.ErrGRPCDial.Wrap(err).GenWithStackByCause()
}
Expand Down
6 changes: 3 additions & 3 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Client interface {
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetPDVersion(context.Context) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]string, error)
GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error)
GetMicroServicePrimary(context.Context, string) (string, error)
DeleteOperators(context.Context) error

Expand Down Expand Up @@ -856,8 +856,8 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}

// GetMicroServiceMembers gets the members of the microservice.
func (c *client) GetMicroServiceMembers(ctx context.Context, service string) ([]string, error) {
var members []string
func (c *client) GetMicroServiceMembers(ctx context.Context, service string) ([]MicroServiceMember, error) {
var members []MicroServiceMember
err := c.request(ctx, newRequestInfo().
WithName(getMicroServiceMembersName).
WithURI(MicroServiceMembers(service)).
Expand Down
9 changes: 9 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,12 @@ type MembersInfo struct {
Leader *pdpb.Member `json:"leader,omitempty"`
EtcdLeader *pdpb.Member `json:"etcd_leader,omitempty"`
}

// MicroServiceMember is the member info of a micro service.
type MicroServiceMember struct {
ServiceAddr string `json:"service-addr"`
Version string `json:"version"`
GitHash string `json:"git-hash"`
DeployPath string `json:"deploy-path"`
StartTimestamp int64 `json:"start-timestamp"`
}
2 changes: 1 addition & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func (c *pdServiceDiscovery) GetServiceClient() ServiceClient {
return leaderClient
}

// GetAllServiceClients implments ServiceDiscovery
// GetAllServiceClients implements ServiceDiscovery
func (c *pdServiceDiscovery) GetAllServiceClients() []ServiceClient {
all := c.all.Load()
if all == nil {
Expand Down
31 changes: 26 additions & 5 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() {
}
}

func (c *tsoClient) dispatchRequest(dcLocation string, request *tsoRequest) error {
func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, request *tsoRequest) error {
dispatcher, ok := c.tsoDispatcher.Load(dcLocation)
if !ok {
err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation))
Expand All @@ -83,7 +83,11 @@ func (c *tsoClient) dispatchRequest(dcLocation string, request *tsoRequest) erro
}

defer trace.StartRegion(request.requestCtx, "tsoReqEnqueue").End()
dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request
select {
case <-ctx.Done():
return ctx.Err()
case dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request:
}
return nil
}

Expand Down Expand Up @@ -311,6 +315,14 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) {
make(chan *tsoRequest, defaultMaxTSOBatchSize*2),
defaultMaxTSOBatchSize),
}
failpoint.Inject("shortDispatcherChannel", func() {
dispatcher = &tsoDispatcher{
dispatcherCancel: dispatcherCancel,
tsoBatchController: newTSOBatchController(
make(chan *tsoRequest, 1),
defaultMaxTSOBatchSize),
}
})

if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok {
// Successfully stored the value. Start the following goroutine.
Expand Down Expand Up @@ -372,6 +384,9 @@ func (c *tsoClient) handleDispatcher(
return
case <-c.option.enableTSOFollowerProxyCh:
enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy()
log.Info("[tso] tso follower proxy status changed",
zap.String("dc-location", dc),
zap.Bool("enable", enableTSOFollowerProxy))
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
Expand Down Expand Up @@ -412,7 +427,7 @@ tsoBatchLoop:
} else {
log.Error("[tso] fetch pending tso requests error",
zap.String("dc-location", dc),
errs.ZapError(errs.ErrClientGetTSO, err))
zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error())))
}
return
}
Expand Down Expand Up @@ -498,7 +513,7 @@ tsoBatchLoop:
log.Error("[tso] getTS error after processing requests",
zap.String("dc-location", dc),
zap.String("stream-addr", streamAddr),
errs.ZapError(errs.ErrClientGetTSO, err))
zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error())))
// Set `stream` to nil and remove this stream from the `connectionCtxs` due to error.
connectionCtxs.Delete(streamAddr)
cancel()
Expand Down Expand Up @@ -701,7 +716,11 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
}
// GC the stale one.
connectionCtxs.Range(func(addr, cc any) bool {
if _, ok := tsoStreamBuilders[addr.(string)]; !ok {
addrStr := addr.(string)
if _, ok := tsoStreamBuilders[addrStr]; !ok {
log.Info("[tso] remove the stale tso stream",
zap.String("dc", dc),
zap.String("addr", addrStr))
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(addr)
}
Expand All @@ -712,6 +731,8 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
if _, ok = connectionCtxs.Load(addr); ok {
continue
}
log.Info("[tso] try to create tso stream",
zap.String("dc", dc), zap.String("addr", addr))
cctx, cancel := context.WithCancel(dispatcherCtx)
// Do not proxy the leader client.
if addr != leaderAddr {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20240111062855-41f7c8011953
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo=
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
2 changes: 1 addition & 1 deletion pkg/btree/btree_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ type copyOnWriteContext[T Item[T]] struct {
// The internal tree structure of b is marked read-only and shared between t and
// t2. Writes to both t and t2 use copy-on-write logic, creating new nodes
// whenever one of b's original nodes would have been modified. Read operations
// should have no performance degredation. Write operations for both t and t2
// should have no performance degradation. Write operations for both t and t2
// will initially experience minor slow-downs caused by additional allocs and
// copies due to the aforementioned copy-on-write logic, but should converge to
// the original performance characteristics of the original tree.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *FIFO) FromElems(key uint64) []*Item {
return elems
}

// FromLastSameElems returns continuous items that have the same comparable attribute with the the lastest one.
// FromLastSameElems returns continuous items that have the same comparable attribute with the last one.
func (c *FIFO) FromLastSameElems(checkFunc func(any) (bool, string)) []*Item {
c.RLock()
defer c.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cgroup/cgroup_cpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func checkKernelVersionNewerThan(re *require.Assertions, t *testing.T, major, mi
re.Len(kernelVersion, 1, fmt.Sprintf("release str is %s", releaseStr))
kernelVersionPartRE := regexp.MustCompile(`[0-9]+`)
kernelVersionParts := kernelVersionPartRE.FindAllString(kernelVersion[0], -1)
re.Len(kernelVersionParts, 3, fmt.Sprintf("kernel verion str is %s", kernelVersion[0]))
re.Len(kernelVersionParts, 3, fmt.Sprintf("kernel version str is %s", kernelVersion[0]))
t.Logf("parsed kernel version parts: major %s, minor %s, patch %s",
kernelVersionParts[0], kernelVersionParts[1], kernelVersionParts[2])
mustConvInt := func(s string) int {
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/rangetree/range_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ func bucketDebrisFactory(startKey, endKey []byte, item RangeItem) []RangeItem {
if bytes.Compare(left, right) >= 0 {
return nil
}
// the left has oen intersection like |010 - 100| and |020 - 100|.
// the left has one intersection like |010 - 100| and |020 - 100|.
if !bytes.Equal(item.GetStartKey(), left) {
res = append(res, newSimpleBucketItem(item.GetStartKey(), left))
}
// the right has oen intersection like |010 - 100| and |010 - 099|.
// the right has one intersection like |010 - 100| and |010 - 099|.
if !bytes.Equal(right, item.GetEndKey()) {
res = append(res, newSimpleBucketItem(right, item.GetEndKey()))
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1786,16 +1786,16 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 {
// ValidRegion is used to decide if the region is valid.
func (r *RegionsInfo) ValidRegion(region *metapb.Region) error {
startKey := region.GetStartKey()
currnetRegion := r.GetRegionByKey(startKey)
if currnetRegion == nil {
currentRegion := r.GetRegionByKey(startKey)
if currentRegion == nil {
return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(RegionToHexMeta(region)))
}
// If the request epoch is less than current region epoch, then returns an error.
regionEpoch := region.GetRegionEpoch()
currnetEpoch := currnetRegion.GetMeta().GetRegionEpoch()
if regionEpoch.GetVersion() < currnetEpoch.GetVersion() ||
regionEpoch.GetConfVer() < currnetEpoch.GetConfVer() {
return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currnetEpoch)
currentEpoch := currentRegion.GetMeta().GetRegionEpoch()
if regionEpoch.GetVersion() < currentEpoch.GetVersion() ||
regionEpoch.GetConfVer() < currentEpoch.GetConfVer() {
return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currentEpoch)
}
return nil
}
Expand Down Expand Up @@ -1884,19 +1884,19 @@ func EncodeToString(src []byte) []byte {
return dst
}

// HexRegionKey converts region key to hex format. Used for formating region in
// HexRegionKey converts region key to hex format. Used for formatting region in
// logs.
func HexRegionKey(key []byte) []byte {
return ToUpperASCIIInplace(EncodeToString(key))
}

// HexRegionKeyStr converts region key to hex format. Used for formating region in
// HexRegionKeyStr converts region key to hex format. Used for formatting region in
// logs.
func HexRegionKeyStr(key []byte) string {
return String(HexRegionKey(key))
}

// RegionToHexMeta converts a region meta's keys to hex format. Used for formating
// RegionToHexMeta converts a region meta's keys to hex format. Used for formatting
// region in logs.
func RegionToHexMeta(meta *metapb.Region) HexRegionMeta {
if meta == nil {
Expand All @@ -1905,7 +1905,7 @@ func RegionToHexMeta(meta *metapb.Region) HexRegionMeta {
return HexRegionMeta{meta}
}

// HexRegionMeta is a region meta in the hex format. Used for formating region in logs.
// HexRegionMeta is a region meta in the hex format. Used for formatting region in logs.
type HexRegionMeta struct {
*metapb.Region
}
Expand All @@ -1917,15 +1917,15 @@ func (h HexRegionMeta) String() string {
return strings.TrimSpace(proto.CompactTextString(meta))
}

// RegionsToHexMeta converts regions' meta keys to hex format. Used for formating
// RegionsToHexMeta converts regions' meta keys to hex format. Used for formatting
// region in logs.
func RegionsToHexMeta(regions []*metapb.Region) HexRegionsMeta {
hexRegionMetas := make([]*metapb.Region, len(regions))
copy(hexRegionMetas, regions)
return hexRegionMetas
}

// HexRegionsMeta is a slice of regions' meta in the hex format. Used for formating
// HexRegionsMeta is a slice of regions' meta in the hex format. Used for formatting
// region in logs.
type HexRegionsMeta []*metapb.Region

Expand Down
32 changes: 23 additions & 9 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,22 +600,36 @@ func DistinctScore(labels []string, stores []*StoreInfo, other *StoreInfo) float
return score
}

// MergeLabels merges the passed in labels with origins, overriding duplicated
// ones.
// MergeLabels merges the passed in labels with origins, overriding duplicated ones.
// Note: To prevent potential data races, it is advisable to refrain from directly modifying the 'origin' variable.
func MergeLabels(origin []*metapb.StoreLabel, labels []*metapb.StoreLabel) []*metapb.StoreLabel {
storeLabels := origin
L:
results := make([]*metapb.StoreLabel, 0, len(origin))
for _, label := range origin {
results = append(results, &metapb.StoreLabel{
Key: label.Key,
Value: label.Value,
})
}

for _, newLabel := range labels {
for _, label := range storeLabels {
found := false
for _, label := range results {
if strings.EqualFold(label.Key, newLabel.Key) {
// Update the value for an existing key.
label.Value = newLabel.Value
continue L
found = true
break
}
}
storeLabels = append(storeLabels, newLabel)
// Add a new label if the key doesn't exist in the original slice.
if !found {
results = append(results, newLabel)
}
}
res := storeLabels[:0]
for _, l := range storeLabels {

// Filter out labels with an empty value.
res := results[:0]
for _, l := range results {
if l.Value != "" {
res = append(res, l)
}
Expand Down
Loading

0 comments on commit 7a5d15a

Please sign in to comment.