Skip to content

Commit

Permalink
domain: report min start timestamp of TiDB server (#12133)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and sre-bot committed Sep 17, 2019
1 parent d438e10 commit bc4f8ce
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
5 changes: 5 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,12 @@ func (do *Domain) topNSlowQueryLoop() {
func (do *Domain) infoSyncerKeeper() {
defer do.wg.Done()
defer recoverInDomain("infoSyncerKeeper", false)
ticker := time.NewTicker(time.Second * time.Duration(InfoSessionTTL) / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
do.info.ReportMinStartTS()
case <-do.info.Done():
logutil.BgLogger().Info("server info syncer need to restart")
if err := do.info.Restart(context.Background()); err != nil {
Expand Down Expand Up @@ -547,6 +551,7 @@ func (do *Domain) Close() {
}
if do.info != nil {
do.info.RemoveServerInfo()
do.info.RemoveMinStartTS()
}
close(do.exit)
if do.etcdClient != nil {
Expand Down
55 changes: 55 additions & 0 deletions domain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"time"

"github.com/coreos/etcd/clientv3"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/owner"
util2 "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/printer"
Expand All @@ -35,6 +38,8 @@ import (
const (
// ServerInformationPath store server information such as IP, port and so on.
ServerInformationPath = "/tidb/server/info"
// ServerMinStartTSPath store the server min start timestamp.
ServerMinStartTSPath = "/tidb/server/minstartts"
// keyOpDefaultRetryCnt is the default retry count for etcd store.
keyOpDefaultRetryCnt = 2
// keyOpDefaultTimeout is the default time out for etcd store.
Expand All @@ -49,6 +54,9 @@ type InfoSyncer struct {
etcdCli *clientv3.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
minStartTSPath string
manager util2.SessionManager
session *concurrency.Session
}

Expand All @@ -75,6 +83,7 @@ func NewInfoSyncer(id string, etcdCli *clientv3.Client) *InfoSyncer {
etcdCli: etcdCli,
info: getServerInfo(id),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
}
}

Expand All @@ -83,6 +92,11 @@ func (is *InfoSyncer) Init(ctx context.Context) error {
return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
}

// SetSessionManager set the session manager for InfoSyncer.
func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) {
is.manager = manager
}

// GetServerInfo gets self server static information.
func (is *InfoSyncer) GetServerInfo() *ServerInfo {
return is.info
Expand Down Expand Up @@ -144,6 +158,47 @@ func (is *InfoSyncer) RemoveServerInfo() {
}
}

// storeMinStartTS stores self server min start timestamp to etcd.
func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error {
if is.etcdCli == nil {
return nil
}
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
strconv.FormatUint(is.minStartTS, 10),
clientv3.WithLease(is.session.Lease()))
}

// RemoveMinStartTS removes self server min start timestamp from etcd.
func (is *InfoSyncer) RemoveMinStartTS() {
if is.etcdCli == nil {
return
}
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err))
}
}

// ReportMinStartTS reports self server min start timestamp to ETCD.
func (is *InfoSyncer) ReportMinStartTS() {
if is.manager == nil {
// Server may not start in time.
return
}
pl := is.manager.ShowProcessList()
var minStartTS uint64 = math.MaxUint64
for _, info := range pl {
if info.CurTxnStartTS < minStartTS {
minStartTS = info.CurTxnStartTS
}
}
is.minStartTS = minStartTS
err := is.storeMinStartTS(context.Background())
if err != nil {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
}
}

// Done returns a channel that closes when the info syncer is no longer being refreshed.
func (is InfoSyncer) Done() <-chan struct{} {
if is.etcdCli == nil {
Expand Down
1 change: 1 addition & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ func createServer() {
// Both domain and storage have started, so we have to clean them before exiting.
terror.MustNil(err, closeDomainAndStorage)
go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run()
dom.InfoSyncer().SetSessionManager(svr)
}

func serverShutdown(isgraceful bool) {
Expand Down

0 comments on commit bc4f8ce

Please sign in to comment.