Skip to content

Commit

Permalink
infosync, router: remove tombstone check for TiDB topology (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Jun 19, 2023
1 parent d418199 commit 2ac6896
Show file tree
Hide file tree
Showing 12 changed files with 597 additions and 578 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e
google.golang.org/grpc v1.51.0
)

Expand Down Expand Up @@ -128,6 +127,7 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.3.0 // indirect
Expand Down
26 changes: 10 additions & 16 deletions lib/config/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,27 @@ const (
healthCheckMaxRetries = 3
healthCheckRetryInterval = 1 * time.Second
healthCheckTimeout = 2 * time.Second
tombstoneThreshold = 5 * time.Minute
)

// HealthCheck contains some configurations for health check.
// Some general configurations of them may be exposed to users in the future.
// We can use shorter durations to speed up unit tests.
type HealthCheck struct {
Enable bool `yaml:"enable" json:"enable" toml:"enable"`
Interval time.Duration `yaml:"interval" json:"interval" toml:"interval"`
MaxRetries int `yaml:"max-retries" json:"max-retries" toml:"max-retries"`
RetryInterval time.Duration `yaml:"retry-interval" json:"retry-interval" toml:"retry-interval"`
DialTimeout time.Duration `yaml:"dial-timeout" json:"dial-timeout" toml:"dial-timeout"`
TombstoneThreshold time.Duration `yaml:"tombstone-threshold" json:"tombstone-threshold" toml:"tombstone-threshold"`
Enable bool `yaml:"enable" json:"enable" toml:"enable"`
Interval time.Duration `yaml:"interval" json:"interval" toml:"interval"`
MaxRetries int `yaml:"max-retries" json:"max-retries" toml:"max-retries"`
RetryInterval time.Duration `yaml:"retry-interval" json:"retry-interval" toml:"retry-interval"`
DialTimeout time.Duration `yaml:"dial-timeout" json:"dial-timeout" toml:"dial-timeout"`
}

// NewDefaultHealthCheckConfig creates a default HealthCheck.
func NewDefaultHealthCheckConfig() *HealthCheck {
return &HealthCheck{
Enable: true,
Interval: healthCheckInterval,
MaxRetries: healthCheckMaxRetries,
RetryInterval: healthCheckRetryInterval,
DialTimeout: healthCheckTimeout,
TombstoneThreshold: tombstoneThreshold,
Enable: true,
Interval: healthCheckInterval,
MaxRetries: healthCheckMaxRetries,
RetryInterval: healthCheckRetryInterval,
DialTimeout: healthCheckTimeout,
}
}

Expand All @@ -73,9 +70,6 @@ func (hc *HealthCheck) Check() {
if hc.DialTimeout == 0 {
hc.DialTimeout = healthCheckTimeout
}
if hc.TombstoneThreshold == 0 {
hc.TombstoneThreshold = tombstoneThreshold
}
}

func NewNamespace(data []byte) (*Namespace, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/infosync/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"google.golang.org/grpc/keepalive"
)

// InitEtcdClient initializes an etcd client that fetches TiDB instance topology from PD.
// InitEtcdClient initializes an etcd client that connects to PD ETCD server.
func InitEtcdClient(logger *zap.Logger, cfg *config.Config, certMgr *cert.CertManager) (*clientv3.Client, error) {
pdAddr := cfg.Proxy.PDAddrs
if len(pdAddr) == 0 {
Expand Down
86 changes: 75 additions & 11 deletions pkg/manager/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import (
"net"
"os"
"path"
"strings"
"time"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/retry"
"github.com/pingcap/TiProxy/lib/util/sys"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
"github.com/pingcap/TiProxy/pkg/manager/cert"
"github.com/pingcap/TiProxy/pkg/util/versioninfo"
"github.com/pingcap/errors"
tidbinfo "github.com/pingcap/tidb/domain/infosync"
"github.com/siddontang/go/hack"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
Expand All @@ -34,12 +37,12 @@ const (
topologyPutRetryCnt = 3
logInterval = 10

TTLSuffix = "ttl"
InfoSuffix = "info"
ttlSuffix = "ttl"
infoSuffix = "info"
)

// InfoSyncer syncs TiProxy topology to ETCD.
// It writes 2 items: `/topology/tiproxy/.../info` and `/topology/tiproxy/.../ttl`.
// InfoSyncer syncs TiProxy topology to ETCD and queries TiDB topology from ETCD.
// It writes 2 items to ETCD: `/topology/tiproxy/.../info` and `/topology/tiproxy/.../ttl`.
// `info` is written once and `ttl` will be erased automatically after TiProxy is down.
// The code is modified from github.com/pingcap/tidb/domain/infosync/info.go.
type InfoSyncer struct {
Expand All @@ -59,6 +62,7 @@ type syncConfig struct {
putRetryCnt uint64
}

// TopologyInfo is the info of TiProxy.
type TopologyInfo struct {
Version string `json:"version"`
GitHash string `json:"git_hash"`
Expand All @@ -69,10 +73,17 @@ type TopologyInfo struct {
StartTimestamp int64 `json:"start_timestamp"`
}

func NewInfoSyncer(etcdCli *clientv3.Client, lg *zap.Logger) *InfoSyncer {
// TiDBInfo is the info of TiDB.
type TiDBInfo struct {
// TopologyInfo is parsed from the /info path.
*tidbinfo.TopologyInfo
// TTL is parsed from the /ttl path.
TTL string
}

func NewInfoSyncer(lg *zap.Logger) *InfoSyncer {
return &InfoSyncer{
etcdCli: etcdCli,
lg: lg,
lg: lg,
syncConfig: syncConfig{
sessionTTL: topologySessionTTL,
refreshIntvl: topologyRefreshIntvl,
Expand All @@ -83,7 +94,13 @@ func NewInfoSyncer(etcdCli *clientv3.Client, lg *zap.Logger) *InfoSyncer {
}
}

func (is *InfoSyncer) Init(ctx context.Context, cfg *config.Config) error {
func (is *InfoSyncer) Init(ctx context.Context, cfg *config.Config, certMgr *cert.CertManager) error {
etcdCli, err := InitEtcdClient(is.lg, cfg, certMgr)
if err != nil {
return err
}
is.etcdCli = etcdCli

topologyInfo, err := is.getTopologyInfo(cfg)
if err != nil {
is.lg.Error("get topology failed", zap.Error(err))
Expand Down Expand Up @@ -124,6 +141,7 @@ func (is *InfoSyncer) updateTopologyLivenessLoop(ctx context.Context, topologyIn
func (is *InfoSyncer) initTopologySession(ctx context.Context) error {
// Infinitely retry until cancelled.
return retry.RetryNotify(func() error {
// Do not use context.WithTimeout, otherwise the session will be cancelled after timeout, even if the session is created successfully.
topologySession, err := concurrency.NewSession(is.etcdCli, concurrency.WithTTL(is.syncConfig.sessionTTL), concurrency.WithContext(ctx))
if err == nil {
is.topologySession = topologySession
Expand Down Expand Up @@ -179,7 +197,7 @@ func (is *InfoSyncer) storeTopologyInfo(ctx context.Context, topologyInfo *Topol
return errors.Trace(err)
}
value := hack.String(infoBuf)
key := fmt.Sprintf("%s/%s/%s", tiproxyTopologyPath, net.JoinHostPort(topologyInfo.IP, topologyInfo.Port), InfoSuffix)
key := fmt.Sprintf("%s/%s/%s", tiproxyTopologyPath, net.JoinHostPort(topologyInfo.IP, topologyInfo.Port), infoSuffix)
return retry.Retry(func() error {
childCtx, cancel := context.WithTimeout(ctx, is.syncConfig.putTimeout)
_, err := is.etcdCli.Put(childCtx, key, value)
Expand All @@ -189,7 +207,7 @@ func (is *InfoSyncer) storeTopologyInfo(ctx context.Context, topologyInfo *Topol
}

func (is *InfoSyncer) updateTopologyAliveness(ctx context.Context, topologyInfo *TopologyInfo) error {
key := fmt.Sprintf("%s/%s/%s", tiproxyTopologyPath, net.JoinHostPort(topologyInfo.IP, topologyInfo.Port), TTLSuffix)
key := fmt.Sprintf("%s/%s/%s", tiproxyTopologyPath, net.JoinHostPort(topologyInfo.IP, topologyInfo.Port), ttlSuffix)
// The lease may be not found and the session won't be recreated, so do not retry infinitely.
return retry.Retry(func() error {
value := fmt.Sprintf("%v", time.Now().UnixNano())
Expand All @@ -200,9 +218,55 @@ func (is *InfoSyncer) updateTopologyAliveness(ctx context.Context, topologyInfo
}, ctx, is.syncConfig.putRetryIntvl, is.syncConfig.putRetryCnt)
}

func (is *InfoSyncer) Close() {
func (is *InfoSyncer) GetTiDBTopology(ctx context.Context) (map[string]*TiDBInfo, error) {
// etcdCli.Get will retry infinitely internally.
res, err := is.etcdCli.Get(ctx, tidbinfo.TopologyInformationPath, clientv3.WithPrefix())
if err != nil {
return nil, err
}

infos := make(map[string]*TiDBInfo, len(res.Kvs)/2)
for _, kv := range res.Kvs {
var ttl, addr string
var topology *tidbinfo.TopologyInfo
key := hack.String(kv.Key)
switch {
case strings.HasSuffix(key, ttlSuffix):
addr = key[len(tidbinfo.TopologyInformationPath)+1 : len(key)-len(ttlSuffix)-1]
ttl = hack.String(kv.Value)
case strings.HasSuffix(key, infoSuffix):
addr = key[len(tidbinfo.TopologyInformationPath)+1 : len(key)-len(infoSuffix)-1]
if err = json.Unmarshal(kv.Value, &topology); err != nil {
is.lg.Error("unmarshal topology info failed", zap.String("key", key),
zap.String("value", hack.String(kv.Value)), zap.Error(err))
continue
}
default:
continue
}

info, ok := infos[addr]
if !ok {
info = &TiDBInfo{}
infos[addr] = info
}

if len(ttl) > 0 {
info.TTL = hack.String(kv.Value)
} else {
info.TopologyInfo = topology
}
}
return infos, nil
}

func (is *InfoSyncer) Close() error {
if is.cancelFunc != nil {
is.cancelFunc()
}
is.wg.Wait()
if is.etcdCli != nil {
return is.etcdCli.Close()
}
return nil
}
Loading

0 comments on commit 2ac6896

Please sign in to comment.