Skip to content

Commit

Permalink
add tidbs
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>

*: refine log in metricutil/etcdutil (tikv#2802)

Signed-off-by: Ryan Leung <rleungx@gmail.com>

add error handling
  • Loading branch information
Yisaer committed Aug 20, 2020
1 parent 4b8ad99 commit c200c89
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 54 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/pingcap-incubator/tidb-dashboard v0.0.0-20200807020752-01f0abe88e93
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/errors v0.11.5-0.20200729012136-4e113ddee29e
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
Expand All @@ -54,5 +54,3 @@ require (
)

replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5

replace github.com/pingcap/errors => github.com/pingcap/errors v0.11.5-0.20200812093836-57ec461934ff
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,10 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 h1:rfD9v3+ppLPzoQBgZ
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.11.5-0.20200812093836-57ec461934ff h1:5MCSOM1ydTR9tXY2IrdWODUrnDdSjb1yluNHDO1sVN4=
github.com/pingcap/errors v0.11.5-0.20200812093836-57ec461934ff/go.mod h1:g4vx//d6VakjJ0mk7iLBlKA8LFavV/sAVINT/1PFxeQ=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd h1:ay+wAVWHI/Z6vIik13hsK+FT9ZCNSPBElGr0qgiZpjg=
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd/go.mod h1:g4vx//d6VakjJ0mk7iLBlKA8LFavV/sAVINT/1PFxeQ=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
Expand Down
13 changes: 8 additions & 5 deletions pkg/autoscaling/calculation.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func filterTiKVInstances(informer core.StoreSetInformer) []instance {
return instances
}

// TODO: get TiDB instances
// TODO: get TiDB instances, we can directly visit prometheus 'up{job="tidb"}' metrics to know the healthy tidb instances
func getTiDBInstances() []instance {
return []instance{}
}
Expand Down Expand Up @@ -207,8 +207,7 @@ func getScaledGroupsByComponent(rc *cluster.RaftCluster, component ComponentType
case TiKV:
return getScaledTiKVGroups(rc, healthyInstances)
case TiDB:
// TODO: support search TiDB Group
return []*Plan{}
return getScaledTiDBGroups(rc, healthyInstances)
default:
return nil
}
Expand All @@ -232,13 +231,17 @@ func getScaledTiKVGroups(informer core.StoreSetInformer, healthyInstances []inst
func getScaledTiDBGroups(informer tidbInformer, healthyInstances []instance) []*Plan {
planMap := make(map[string]map[string]struct{}, len(healthyInstances))
for _, instance := range healthyInstances {
tidb := informer.GetTiDB(instance.address)
tidb, err := informer.GetTiDB(instance.address)
if err != nil {
// TODO: need to handle the error
return nil
}
if tidb == nil {
log.Warn("inconsistency between health instances and tidb status, exit auto-scaling calculation",
zap.String("tidb-address", instance.address))
return nil
}
v := tidb.getLabelValue(groupLabelKey)
v := tidb.GetLabelValue(groupLabelKey)
buildPlanMap(planMap, v, instance.address)
}
return buildPlans(planMap, TiDB)
Expand Down
26 changes: 2 additions & 24 deletions pkg/autoscaling/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
package autoscaling

import (
"strings"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/typeutil"
)

// Strategy within a HTTP request provides rules and resources to help make decision for auto scaling.
Expand Down Expand Up @@ -115,26 +114,5 @@ type instance struct {
// TiDBInformer is used to fetch tidb info
// TODO: implement TiDBInformer
type tidbInformer interface {
GetTiDB(address string) *TiDBInfo
}

// TiDBInfo record the detail tidb info
type TiDBInfo struct {
Address string
Labels map[string]string
}

// GetLabelValue returns a label's value (if exists).
func (t *TiDBInfo) getLabelValue(key string) string {
for k, v := range t.getLabels() {
if strings.EqualFold(k, key) {
return v
}
}
return ""
}

// GetLabels returns the labels of the tidb.
func (t *TiDBInfo) getLabels() map[string]string {
return t.Labels
GetTiDB(address string) (*typeutil.TiDBInfo, error)
}
13 changes: 13 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ var (
ErrReadHTTPBody = errors.Normalize("read HTTP body failed", errors.RFCCodeText("PD:apiutil:ErrReadHTTPBody"))
ErrWriteHTTPBody = errors.Normalize("write HTTP body failed", errors.RFCCodeText("PD:apiutil:ErrWriteHTTPBody"))
)

// metricutil errors
var (
ErrPushGateway = errors.Normalize("push metrics to gateway failed", errors.RFCCodeText("PD:metricutil:ErrPushGateway"))
)

// etcdutil errors
var (
ErrLoadValue = errors.Normalize("load value from etcd failed", errors.RFCCodeText("PD:etcdutil:ErrLoadValue"))
ErrGetCluster = errors.Normalize("get cluster from remote peer failed", errors.RFCCodeText("PD:etcdutil:ErrGetCluster"))
ErrLoadTiDBInfo = errors.Normalize("load tidb info from etcd failed", errors.RFCCodeText("PD:etcdutil:ErrLoadTiDBInfo"))
ErrParseTiDBInfo = errors.Normalize("parse tidb info from etcd failed", errors.RFCCodeText("PD:etcdutil:ErrParseTiDBInfo"))
)
7 changes: 4 additions & 3 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/errs"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/pkg/types"
Expand Down Expand Up @@ -61,7 +62,7 @@ func CheckClusterID(localClusterID types.ID, um types.URLsMap, tlsConfig *tls.Co
trp.CloseIdleConnections()
if gerr != nil {
// Do not return error, because other members may be not ready.
log.Error("failed to get cluster from remote", zap.Error(gerr))
log.Error("failed to get cluster from remote", errs.ZapError(errs.ErrGetCluster, gerr))
continue
}

Expand Down Expand Up @@ -105,7 +106,7 @@ func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clie
start := time.Now()
resp, err := clientv3.NewKV(c).Get(ctx, key, opts...)
if err != nil {
log.Error("load from etcd meet error", zap.Error(err))
log.Error("load from etcd meet error", errs.ZapError(errs.ErrLoadValue, err))
}
if cost := time.Since(start); cost > DefaultSlowRequestTime {
log.Warn("kv gets too slow", zap.String("request-key", key), zap.Duration("cost", cost), zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/metricutil/metricutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"go.uber.org/zap"
)

const zeroDuration = time.Duration(0)
Expand Down Expand Up @@ -68,7 +68,7 @@ func prometheusPushClient(job, addr string, interval time.Duration) {
for {
err := pusher.Push()
if err != nil {
log.Error("could not push metrics to Prometheus Pushgateway", zap.Error(err))
log.Error("could not push metrics to Prometheus Pushgateway", errs.ZapError(errs.ErrPushGateway, err))
}

time.Sleep(interval)
Expand Down
40 changes: 40 additions & 0 deletions pkg/typeutil/tidb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package typeutil

import "strings"

// TiDBInfo record the detail tidb info
type TiDBInfo struct {
Version string `json:"version"`
StartTimestamp int64 `json:"start_timestamp"`
Labels map[string]string `json:"labels"`
GitHash string `json:"git_hash"`
Address string
}

// GetLabelValue returns a label's value (if exists).
func (t *TiDBInfo) GetLabelValue(key string) string {
for k, v := range t.GetLabels() {
if strings.EqualFold(k, key) {
return v
}
}
return ""
}

// GetLabels returns the labels of the tidb.
func (t *TiDBInfo) GetLabels() map[string]string {
return t.Labels
}
29 changes: 29 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cluster

import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/component"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/keyutil"
"github.com/tikv/pd/pkg/logutil"
Expand Down Expand Up @@ -509,6 +511,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats())
c.storesStats.UpdateTotalBytesRate(c.core.GetStores)
c.storesStats.UpdateTotalKeysRate(c.core.GetStores)
c.storesStats.FilterUnhealthyStore(c)

// c.limiter is nil before "start" is called
if c.limiter != nil && c.opt.GetStoreLimitMode() == "auto" {
Expand Down Expand Up @@ -829,6 +832,7 @@ func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.Region
}

// GetStoresStats returns stores' statistics from cluster.
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
func (c *RaftCluster) GetStoresStats() *statistics.StoresStats {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -1787,6 +1791,31 @@ func (c *RaftCluster) GetClusterVersion() string {
return c.opt.GetClusterVersion().String()
}

func (c *RaftCluster) GetTiDB(address string) (*typeutil.TiDBInfo, error) {
key := fmt.Sprintf("/topology/tidb/%s/info", address)
resp, err := etcdutil.EtcdKVGet(c.etcdClient, key)
if err != nil {
return nil, err
}
if resp.Count < 1 {
err := fmt.Errorf("resp loaded for tidb[%s] is empty", address)
log.Error("failed to load tidb info",
zap.String("address", address),
errs.ZapError(errs.ErrLoadTiDBInfo, err))
return nil, err
}
tidb := &typeutil.TiDBInfo{}
err = json.Unmarshal(resp.Kvs[0].Value, tidb)
if err != nil {
log.Error("failed to parse tidb info",
zap.String("address", address),
errs.ZapError(errs.ErrParseTiDBInfo, err))
return nil, err
}
tidb.Address = address
return tidb, nil
}

var healthURL = "/pd/api/v1/ping"

// CheckHealth checks if members are healthy.
Expand Down
32 changes: 32 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,38 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
}
}

func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

stores := newTestStores(3)
for _, store := range stores {
storeStats := &pdpb.StoreStats{
StoreId: store.GetID(),
Capacity: 100,
Available: 50,
RegionCount: 1,
}
c.Assert(cluster.putStoreLocked(store), IsNil)
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), NotNil)
}

for _, store := range stores {
storeStats := &pdpb.StoreStats{
StoreId: store.GetID(),
Capacity: 100,
Available: 50,
RegionCount: 1,
}
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
c.Assert(cluster.putStoreLocked(newStore), IsNil)
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), IsNil)
}
}

func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
Expand Down
16 changes: 1 addition & 15 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,7 @@ func (bs *balanceSolver) init() {
case readLeader:
bs.stLoadDetail = bs.sche.stLoadInfos[readLeader]
}
for _, id := range getUnhealthyStores(bs.cluster) {
delete(bs.stLoadDetail, id)
}
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat

bs.maxSrc = &storeLoad{}
bs.minDst = &storeLoad{
Expand All @@ -561,18 +559,6 @@ func (bs *balanceSolver) init() {
}
}

func getUnhealthyStores(cluster opt.Cluster) []uint64 {
ret := make([]uint64, 0)
stores := cluster.GetStores()
for _, store := range stores {
if store.IsTombstone() ||
store.DownTime() > cluster.GetMaxStoreDownTime() {
ret = append(ret, store.GetID())
}
}
return ret
}

func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver {
solver := &balanceSolver{
sche: sche,
Expand Down
16 changes: 16 additions & 0 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ func (s *StoresStats) GetStoresKeysReadStat() map[uint64]float64 {
return res
}

func (s *StoresStats) storeIsUnhealthy(cluster core.StoreSetInformer, storeID uint64) bool {
store := cluster.GetStore(storeID)
return store.IsTombstone() || store.IsUnhealth()
}

// FilterUnhealthyStore filter unhealthy store
func (s *StoresStats) FilterUnhealthyStore(cluster core.StoreSetInformer) {
s.Lock()
defer s.Unlock()
for storeID := range s.rollingStoresStats {
if s.storeIsUnhealthy(cluster, storeID) {
delete(s.rollingStoresStats, storeID)
}
}
}

// RollingStoreStats are multiple sets of recent historical records with specified windows size.
type RollingStoreStats struct {
sync.RWMutex
Expand Down

0 comments on commit c200c89

Please sign in to comment.