diff --git a/client/http/client.go b/client/http/client.go index 4fea7a65086d..2e7a76957cf1 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -26,10 +26,17 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) -const defaultTimeout = 30 * time.Second +const ( + httpScheme = "http" + httpsScheme = "https" + networkErrorStatus = "network error" + + defaultTimeout = 30 * time.Second +) // Client is a PD (Placement Driver) HTTP client. type Client interface { @@ -51,6 +58,9 @@ type client struct { pdAddrs []string tlsConf *tls.Config cli *http.Client + + requestCounter *prometheus.CounterVec + executionDuration *prometheus.HistogramVec } // ClientOption configures the HTTP client. @@ -71,6 +81,17 @@ func WithTLSConfig(tlsConf *tls.Config) ClientOption { } } +// WithMetrics configures the client with metrics. +func WithMetrics( + requestCounter *prometheus.CounterVec, + executionDuration *prometheus.HistogramVec, +) ClientOption { + return func(c *client) { + c.requestCounter = requestCounter + c.executionDuration = executionDuration + } +} + // NewClient creates a PD HTTP client with the given PD addresses and TLS config. func NewClient( pdAddrs []string, @@ -83,13 +104,14 @@ func NewClient( } // Normalize the addresses with correct scheme prefix. for i, addr := range pdAddrs { - if !strings.HasPrefix(addr, "http") { + if !strings.HasPrefix(addr, httpScheme) { + var scheme string if c.tlsConf != nil { - addr = "https://" + addr + scheme = httpsScheme } else { - addr = "http://" + addr + scheme = httpScheme } - pdAddrs[i] = addr + pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr) } } c.pdAddrs = pdAddrs @@ -114,17 +136,49 @@ func (c *client) Close() { log.Info("[pd] http client closed") } -func (c *client) pdAddr() string { - // TODO: support the customized PD address selection strategy. - return c.pdAddrs[0] +func (c *client) reqCounter(name, status string) { + if c.requestCounter == nil { + return + } + c.requestCounter.WithLabelValues(name, status).Inc() } -func (c *client) request( +func (c *client) execDuration(name string, duration time.Duration) { + if c.executionDuration == nil { + return + } + c.executionDuration.WithLabelValues(name).Observe(duration.Seconds()) +} + +// At present, we will use the retry strategy of polling by default to keep +// it consistent with the current implementation of some clients (e.g. TiDB). +func (c *client) requestWithRetry( ctx context.Context, name, uri string, res interface{}, ) error { - reqURL := fmt.Sprintf("%s%s", c.pdAddr(), uri) + var ( + err error + addr string + ) + for idx := 0; idx < len(c.pdAddrs); idx++ { + addr = c.pdAddrs[idx] + err = c.request(ctx, name, addr, uri, res) + if err == nil { + break + } + log.Debug("[pd] request one addr failed", + zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err)) + } + return err +} + +func (c *client) request( + ctx context.Context, + name, addr, uri string, + res interface{}, +) error { + reqURL := fmt.Sprintf("%s%s", addr, uri) logFields := []zap.Field{ zap.String("name", name), zap.String("url", reqURL), @@ -135,12 +189,15 @@ func (c *client) request( log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) return errors.Trace(err) } - // TODO: integrate the metrics. + start := time.Now() resp, err := c.cli.Do(req) if err != nil { + c.reqCounter(name, "network error") log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...) return errors.Trace(err) } + c.execDuration(name, time.Since(start)) + c.reqCounter(name, resp.Status) defer func() { err = resp.Body.Close() if err != nil { @@ -172,7 +229,7 @@ func (c *client) request( // GetRegionByID gets the region info by ID. func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { var region RegionInfo - err := c.request(ctx, "GetRegionByID", RegionByID(regionID), ®ion) + err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), ®ion) if err != nil { return nil, err } @@ -182,7 +239,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInf // GetRegionByKey gets the region info by key. func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) { var region RegionInfo - err := c.request(ctx, "GetRegionByKey", RegionByKey(key), ®ion) + err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), ®ion) if err != nil { return nil, err } @@ -192,7 +249,7 @@ func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, e // GetRegions gets the regions info. func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) { var regions RegionsInfo - err := c.request(ctx, "GetRegions", Regions, ®ions) + err := c.requestWithRetry(ctx, "GetRegions", Regions, ®ions) if err != nil { return nil, err } @@ -202,7 +259,7 @@ func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) { // GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range. func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) { var regions RegionsInfo - err := c.request(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), ®ions) + err := c.requestWithRetry(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), ®ions) if err != nil { return nil, err } @@ -212,7 +269,7 @@ func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, l // GetRegionsByStoreID gets the regions info by store ID. func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) { var regions RegionsInfo - err := c.request(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), ®ions) + err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), ®ions) if err != nil { return nil, err } @@ -222,7 +279,7 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi // GetHotReadRegions gets the hot read region statistics info. func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) { var hotReadRegions StoreHotPeersInfos - err := c.request(ctx, "GetHotReadRegions", HotRead, &hotReadRegions) + err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, &hotReadRegions) if err != nil { return nil, err } @@ -232,7 +289,7 @@ func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, er // GetHotWriteRegions gets the hot write region statistics info. func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) { var hotWriteRegions StoreHotPeersInfos - err := c.request(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions) + err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions) if err != nil { return nil, err } @@ -242,7 +299,7 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e // GetStores gets the stores info. func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { var stores StoresInfo - err := c.request(ctx, "GetStores", Stores, &stores) + err := c.requestWithRetry(ctx, "GetStores", Stores, &stores) if err != nil { return nil, err } @@ -269,7 +326,7 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin IsRealTime bool `json:"is_real_time,omitempty"` StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"` }{} - err := c.request(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp) + err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp) if err != nil { return 0, nil, err }