Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into add-lock-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Mar 7, 2024
2 parents bd59983 + bbd3bdb commit eecb7d7
Show file tree
Hide file tree
Showing 57 changed files with 697 additions and 385 deletions.
64 changes: 32 additions & 32 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,10 +773,10 @@ func (c *client) GetTSAsync(ctx context.Context) TSFuture {
}

func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("GetLocalTSAsync", opentracing.ChildOf(span.Context()))
ctx = opentracing.ContextWithSpan(ctx, span)
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

req := tsoReqPool.Get().(*tsoRequest)
Expand Down Expand Up @@ -875,8 +875,8 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
}

func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -913,8 +913,8 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -951,8 +951,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -989,8 +989,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
}

func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1027,8 +1027,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1102,8 +1102,8 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
}

func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1146,8 +1146,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
opt(options)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1173,8 +1173,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
}

func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1204,8 +1204,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
// determine the safepoint for multiple services, it does not trigger a GC
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

Expand Down Expand Up @@ -1234,8 +1234,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.scatterRegionsWithGroup(ctx, regionID, "")
Expand Down Expand Up @@ -1268,16 +1268,16 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
}

func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.scatterRegionsWithOptions(ctx, regionsID, opts...)
}

func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1304,8 +1304,8 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
}

func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1327,8 +1327,8 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe

// SplitRegions split regions by given split keys
func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type GCClient interface {

// UpdateGCSafePointV2 update gc safe point for the given keyspace.
func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -63,8 +63,8 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf

// UpdateServiceSafePointV2 update service safe point for the given keyspace.
func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
3 changes: 2 additions & 1 deletion client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

Expand All @@ -54,7 +55,7 @@ const (
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...grpc.DialOption) (*grpc.ClientConn, error) {
opt := grpc.WithInsecure() //nolint
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
if tlsCfg != nil {
creds := credentials.NewTLS(tlsCfg)
opt = grpc.WithTransportCredentials(creds)
Expand Down
15 changes: 14 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ const (
operators = "/pd/api/v1/operators"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
// Keyspace
KeyspaceConfig = "/pd/api/v2/keyspaces/%s/config"
GetKeyspaceMetaByName = "/pd/api/v2/keyspaces/%s"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
Expand All @@ -95,7 +98,7 @@ func RegionByKey(key []byte) string {
// RegionsByKeyRange returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters.
func RegionsByKeyRange(keyRange *KeyRange, limit int) string {
startKeyStr, endKeyStr := keyRange.EscapeAsUTF8Str()
return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d",
return fmt.Sprintf("%s?key=%s&end_key=%s&limit=%d",
regionsByKey, startKeyStr, endKeyStr, limit)
}

Expand Down Expand Up @@ -201,3 +204,13 @@ func MicroServiceMembers(service string) string {
func MicroServicePrimary(service string) string {
return fmt.Sprintf("%s/primary/%s", microServicePrefix, service)
}

// GetUpdateKeyspaceConfigURL returns the path of PD HTTP API to update keyspace config.
func GetUpdateKeyspaceConfigURL(keyspaceName string) string {
return fmt.Sprintf(KeyspaceConfig, keyspaceName)
}

// GetKeyspaceMetaByNameURL returns the path of PD HTTP API to get keyspace meta by keyspace name.
func GetKeyspaceMetaByNameURL(keyspaceName string) string {
return fmt.Sprintf(GetKeyspaceMetaByName, keyspaceName)
}
15 changes: 10 additions & 5 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ func NewClient(
}
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init service discovery failed", zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
log.Error("[pd] init service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
return nil
}
c.inner.init(sd)
Expand Down Expand Up @@ -382,9 +383,8 @@ func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client {
}
}

// newClientWithoutInitServiceDiscovery creates a PD HTTP client
// with the given PD addresses and TLS config without init service discovery.
func newClientWithoutInitServiceDiscovery(
// newClientWithMockServiceDiscovery creates a new PD HTTP client with a mock PD service discovery.
func newClientWithMockServiceDiscovery(
source string,
pdAddrs []string,
opts ...ClientOption,
Expand All @@ -395,7 +395,12 @@ func newClientWithoutInitServiceDiscovery(
for _, opt := range opts {
opt(c)
}
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
sd := pd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init mock service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
return nil
}
c.inner.init(sd)
return c
}
18 changes: 12 additions & 6 deletions client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

func TestPDAllowFollowerHandleHeader(t *testing.T) {
re := require.New(t)
checked := 0
httpClient := NewHTTPClientWithRequestChecker(func(req *http.Request) error {
var expectedVal string
if req.URL.Path == HotHistory {
Expand All @@ -38,16 +39,19 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) {
re.Failf("PD allow follower handler header check failed",
"should be %s, but got %s", expectedVal, val)
}
checked++
return nil
})
c := newClientWithoutInitServiceDiscovery("test-header", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
c := newClientWithMockServiceDiscovery("test-header", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
defer c.Close()
c.GetRegions(context.Background())
c.GetHistoryHotRegions(context.Background(), &HistoryHotRegionsRequest{})
c.Close()
re.Equal(2, checked)
}

func TestCallerID(t *testing.T) {
re := require.New(t)
checked := 0
expectedVal := atomic.NewString(defaultCallerID)
httpClient := NewHTTPClientWithRequestChecker(func(req *http.Request) error {
val := req.Header.Get(xCallerIDKey)
Expand All @@ -56,20 +60,23 @@ func TestCallerID(t *testing.T) {
re.Failf("Caller ID header check failed",
"should be %s, but got %s", expectedVal, val)
}
checked++
return nil
})
c := newClientWithoutInitServiceDiscovery("test-caller-id", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
c := newClientWithMockServiceDiscovery("test-caller-id", []string{"http://127.0.0.1"}, WithHTTPClient(httpClient))
defer c.Close()
c.GetRegions(context.Background())
expectedVal.Store("test")
c.WithCallerID(expectedVal.Load()).GetRegions(context.Background())
c.Close()
re.Equal(2, checked)
}

func TestWithBackoffer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newClientWithoutInitServiceDiscovery("test-with-backoffer", []string{"http://127.0.0.1"})
c := newClientWithMockServiceDiscovery("test-with-backoffer", []string{"http://127.0.0.1"})
defer c.Close()

base := 100 * time.Millisecond
max := 500 * time.Millisecond
Expand All @@ -88,5 +95,4 @@ func TestWithBackoffer(t *testing.T) {
_, err = c.WithBackoffer(bo).GetPDVersion(timeoutCtx)
re.InDelta(3*time.Second, time.Since(start), float64(250*time.Millisecond))
re.ErrorIs(err, context.DeadlineExceeded)
c.Close()
}
Loading

0 comments on commit eecb7d7

Please sign in to comment.