From d6b708b196ff58bdc732a58c4572c0358ac4fb5a Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Mon, 9 Sep 2019 12:12:52 +0800 Subject: [PATCH 1/6] domain: report min start timestamp for TiDB server Signed-off-by: Shuaipeng Yu --- domain/domain.go | 5 ++++ domain/info.go | 60 +++++++++++++++++++++++++++++++++++++++++++++- go.mod | 2 ++ tools/check/go.mod | 2 ++ 4 files changed, 68 insertions(+), 1 deletion(-) diff --git a/domain/domain.go b/domain/domain.go index f3fa3578eab6d..8cb33c2c39859 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -420,8 +420,12 @@ func (do *Domain) topNSlowQueryLoop() { func (do *Domain) infoSyncerKeeper() { defer do.wg.Done() defer recoverInDomain("infoSyncerKeeper", false) + ticker := time.NewTicker(time.Second * time.Duration(InfoSessionTTL) / 2) + defer ticker.Stop() for { select { + case <-ticker.C: + do.info.UpdateMinStartTS(do.SysSessionPool()) case <-do.info.Done(): logutil.BgLogger().Info("server info syncer need to restart") if err := do.info.Restart(context.Background()); err != nil { @@ -547,6 +551,7 @@ func (do *Domain) Close() { } if do.info != nil { do.info.RemoveServerInfo() + do.info.RemoveMinStartTS() } close(do.exit) if do.etcdClient != nil { diff --git a/domain/info.go b/domain/info.go index f92b65a11b372..8d8d8c00cce27 100644 --- a/domain/info.go +++ b/domain/info.go @@ -17,6 +17,8 @@ import ( "context" "encoding/json" "fmt" + "math" + "strconv" "time" "github.com/coreos/etcd/clientv3" @@ -26,6 +28,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/owner" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/printer" @@ -35,6 +38,8 @@ import ( const ( // ServerInformationPath store server information such as IP, port and so on. ServerInformationPath = "/tidb/server/info" + // ServerMinStartTSPath store the server min start timestamp. + ServerMinStartTSPath = "/tidb/server/minstartts" // keyOpDefaultRetryCnt is the default retry count for etcd store. keyOpDefaultRetryCnt = 2 // keyOpDefaultTimeout is the default time out for etcd store. @@ -49,6 +54,8 @@ type InfoSyncer struct { etcdCli *clientv3.Client info *ServerInfo serverInfoPath string + minStartTS uint64 + minStartTSPath string session *concurrency.Session } @@ -75,6 +82,8 @@ func NewInfoSyncer(id string, etcdCli *clientv3.Client) *InfoSyncer { etcdCli: etcdCli, info: getServerInfo(id), serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), + minStartTS: 0, + minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), } } @@ -144,6 +153,48 @@ func (is *InfoSyncer) RemoveServerInfo() { } } +// storeMinStartTS stores self server min start timestamp to etcd. +func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error { + if is.etcdCli == nil { + return nil + } + return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath, + strconv.FormatUint(is.minStartTS, 10), + clientv3.WithLease(is.session.Lease())) +} + +// RemoveMinStartTS removes self server min start timestamp from etcd. +func (is *InfoSyncer) RemoveMinStartTS() { + if is.etcdCli == nil { + return + } + err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) + if err != nil { + logutil.BgLogger().Error("remove min start timestamp failed", zap.Error(err)) + } +} + +// UpdateMinStartTS updates self server min start timestamp from etcd. +func (is *InfoSyncer) UpdateMinStartTS(pool *sessionPool) { + ctx, err := pool.Get() + if err != nil { + logutil.BgLogger().Error("update min start timestamp failed", zap.Error(err)) + } + defer pool.Put(ctx) + pl := ctx.(sessionctx.Context).GetSessionManager().ShowProcessList() + var minStartTS uint64 = math.MaxUint64 + for _, info := range pl { + if info.CurTxnStartTS < minStartTS { + minStartTS = info.CurTxnStartTS + } + } + is.minStartTS = minStartTS + err = is.storeMinStartTS(context.Background()) + if err != nil { + logutil.BgLogger().Error("update min start timestamp failed", zap.Error(err)) + } +} + // Done returns a channel that closes when the info syncer is no longer being refreshed. func (is InfoSyncer) Done() <-chan struct{} { if is.etcdCli == nil { @@ -170,7 +221,14 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt is.session = session err = is.storeServerInfo(ctx) - return err + if err != nil { + return err + } + err = is.storeMinStartTS(ctx) + if err != nil { + return err + } + return nil } // getInfo gets server information from etcd according to the key and opts. diff --git a/go.mod b/go.mod index 9c00d5faacc1a..127852e29014e 100644 --- a/go.mod +++ b/go.mod @@ -75,3 +75,5 @@ require ( sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) + +go 1.13 diff --git a/tools/check/go.mod b/tools/check/go.mod index ca5d580f6d6a4..6dfc12cecadbd 100644 --- a/tools/check/go.mod +++ b/tools/check/go.mod @@ -20,3 +20,5 @@ require ( gopkg.in/yaml.v2 v2.2.2 // indirect honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3 ) + +go 1.13 From bec9246cd04cfec791480b9823d882cc6d612efc Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Tue, 10 Sep 2019 13:36:56 +0800 Subject: [PATCH 2/6] add debug logs Signed-off-by: Shuaipeng Yu --- domain/domain.go | 2 ++ domain/info.go | 1 + 2 files changed, 3 insertions(+) diff --git a/domain/domain.go b/domain/domain.go index 8cb33c2c39859..8e51f155c3aae 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -425,7 +425,9 @@ func (do *Domain) infoSyncerKeeper() { for { select { case <-ticker.C: + logutil.BgLogger().Info("updating minStartTS") do.info.UpdateMinStartTS(do.SysSessionPool()) + logutil.BgLogger().Info("updating minStartTS finished") case <-do.info.Done(): logutil.BgLogger().Info("server info syncer need to restart") if err := do.info.Restart(context.Background()); err != nil { diff --git a/domain/info.go b/domain/info.go index 8d8d8c00cce27..306b165926612 100644 --- a/domain/info.go +++ b/domain/info.go @@ -158,6 +158,7 @@ func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error { if is.etcdCli == nil { return nil } + logutil.Logger(ctx).Info("store minStartTS", zap.Uint64("minStartTS", is.minStartTS)) return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath, strconv.FormatUint(is.minStartTS, 10), clientv3.WithLease(is.session.Lease())) From 4269a2905c8a5b75d66f85870b53526086ae60ff Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Tue, 10 Sep 2019 17:09:07 +0800 Subject: [PATCH 3/6] set session manager for it Signed-off-by: Shuaipeng Yu --- domain/domain.go | 2 +- domain/info.go | 20 ++++++++++++-------- tidb-server/main.go | 1 + 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 8e51f155c3aae..518eb785f6039 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -426,7 +426,7 @@ func (do *Domain) infoSyncerKeeper() { select { case <-ticker.C: logutil.BgLogger().Info("updating minStartTS") - do.info.UpdateMinStartTS(do.SysSessionPool()) + do.info.UpdateMinStartTS() logutil.BgLogger().Info("updating minStartTS finished") case <-do.info.Done(): logutil.BgLogger().Info("server info syncer need to restart") diff --git a/domain/info.go b/domain/info.go index 306b165926612..44792e97be877 100644 --- a/domain/info.go +++ b/domain/info.go @@ -28,7 +28,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/owner" - "github.com/pingcap/tidb/sessionctx" + util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/printer" @@ -57,6 +57,7 @@ type InfoSyncer struct { minStartTS uint64 minStartTSPath string session *concurrency.Session + manager util2.SessionManager } // ServerInfo is server static information. @@ -92,6 +93,10 @@ func (is *InfoSyncer) Init(ctx context.Context) error { return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) } +func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) { + is.manager = manager +} + // GetServerInfo gets self server static information. func (is *InfoSyncer) GetServerInfo() *ServerInfo { return is.info @@ -176,13 +181,12 @@ func (is *InfoSyncer) RemoveMinStartTS() { } // UpdateMinStartTS updates self server min start timestamp from etcd. -func (is *InfoSyncer) UpdateMinStartTS(pool *sessionPool) { - ctx, err := pool.Get() - if err != nil { - logutil.BgLogger().Error("update min start timestamp failed", zap.Error(err)) +func (is *InfoSyncer) UpdateMinStartTS() { + if is.manager == nil { + // Server may not start in time. + return } - defer pool.Put(ctx) - pl := ctx.(sessionctx.Context).GetSessionManager().ShowProcessList() + pl := is.manager.ShowProcessList() var minStartTS uint64 = math.MaxUint64 for _, info := range pl { if info.CurTxnStartTS < minStartTS { @@ -190,7 +194,7 @@ func (is *InfoSyncer) UpdateMinStartTS(pool *sessionPool) { } } is.minStartTS = minStartTS - err = is.storeMinStartTS(context.Background()) + err := is.storeMinStartTS(context.Background()) if err != nil { logutil.BgLogger().Error("update min start timestamp failed", zap.Error(err)) } diff --git a/tidb-server/main.go b/tidb-server/main.go index cdfe515e030d0..0a7eb765acdf9 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -554,6 +554,7 @@ func createServer() { // Both domain and storage have started, so we have to clean them before exiting. terror.MustNil(err, closeDomainAndStorage) go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run() + dom.InfoSyncer().SetSessionManager(svr) } func serverShutdown(isgraceful bool) { From bcf9859d99644b5a2babfa5d61b19212baa90a29 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Tue, 10 Sep 2019 17:21:40 +0800 Subject: [PATCH 4/6] remove debug logs Signed-off-by: Shuaipeng Yu --- domain/domain.go | 2 -- domain/info.go | 8 +++----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 518eb785f6039..34cbbcbb6507f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -425,9 +425,7 @@ func (do *Domain) infoSyncerKeeper() { for { select { case <-ticker.C: - logutil.BgLogger().Info("updating minStartTS") do.info.UpdateMinStartTS() - logutil.BgLogger().Info("updating minStartTS finished") case <-do.info.Done(): logutil.BgLogger().Info("server info syncer need to restart") if err := do.info.Restart(context.Background()); err != nil { diff --git a/domain/info.go b/domain/info.go index 44792e97be877..3c4d1c35f6112 100644 --- a/domain/info.go +++ b/domain/info.go @@ -56,8 +56,8 @@ type InfoSyncer struct { serverInfoPath string minStartTS uint64 minStartTSPath string - session *concurrency.Session manager util2.SessionManager + session *concurrency.Session } // ServerInfo is server static information. @@ -83,7 +83,6 @@ func NewInfoSyncer(id string, etcdCli *clientv3.Client) *InfoSyncer { etcdCli: etcdCli, info: getServerInfo(id), serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), - minStartTS: 0, minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), } } @@ -163,7 +162,6 @@ func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error { if is.etcdCli == nil { return nil } - logutil.Logger(ctx).Info("store minStartTS", zap.Uint64("minStartTS", is.minStartTS)) return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath, strconv.FormatUint(is.minStartTS, 10), clientv3.WithLease(is.session.Lease())) @@ -176,7 +174,7 @@ func (is *InfoSyncer) RemoveMinStartTS() { } err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) if err != nil { - logutil.BgLogger().Error("remove min start timestamp failed", zap.Error(err)) + logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err)) } } @@ -196,7 +194,7 @@ func (is *InfoSyncer) UpdateMinStartTS() { is.minStartTS = minStartTS err := is.storeMinStartTS(context.Background()) if err != nil { - logutil.BgLogger().Error("update min start timestamp failed", zap.Error(err)) + logutil.BgLogger().Error("update minStartTS failed", zap.Error(err)) } } From fbc6a84630cdcaec45ded9faf1b8e244c44c02bd Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Tue, 10 Sep 2019 17:27:21 +0800 Subject: [PATCH 5/6] fix lint Signed-off-by: Shuaipeng Yu --- domain/info.go | 1 + 1 file changed, 1 insertion(+) diff --git a/domain/info.go b/domain/info.go index 3c4d1c35f6112..9dce716ae7147 100644 --- a/domain/info.go +++ b/domain/info.go @@ -92,6 +92,7 @@ func (is *InfoSyncer) Init(ctx context.Context) error { return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) } +// SetSessionManager set the session manager for InfoSyncer. func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) { is.manager = manager } From 6e159ca6cf3dcf1d3c7ff545972de190c230960d Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Sat, 14 Sep 2019 15:13:27 +0800 Subject: [PATCH 6/6] do not report when server starts Signed-off-by: Shuaipeng Yu --- domain/domain.go | 2 +- domain/info.go | 13 +++---------- go.mod | 2 -- tools/check/go.mod | 2 -- 4 files changed, 4 insertions(+), 15 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 34cbbcbb6507f..e5124ba0d7e30 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -425,7 +425,7 @@ func (do *Domain) infoSyncerKeeper() { for { select { case <-ticker.C: - do.info.UpdateMinStartTS() + do.info.ReportMinStartTS() case <-do.info.Done(): logutil.BgLogger().Info("server info syncer need to restart") if err := do.info.Restart(context.Background()); err != nil { diff --git a/domain/info.go b/domain/info.go index 9dce716ae7147..bf0e91acf7e88 100644 --- a/domain/info.go +++ b/domain/info.go @@ -179,8 +179,8 @@ func (is *InfoSyncer) RemoveMinStartTS() { } } -// UpdateMinStartTS updates self server min start timestamp from etcd. -func (is *InfoSyncer) UpdateMinStartTS() { +// ReportMinStartTS reports self server min start timestamp to ETCD. +func (is *InfoSyncer) ReportMinStartTS() { if is.manager == nil { // Server may not start in time. return @@ -225,14 +225,7 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt is.session = session err = is.storeServerInfo(ctx) - if err != nil { - return err - } - err = is.storeMinStartTS(ctx) - if err != nil { - return err - } - return nil + return err } // getInfo gets server information from etcd according to the key and opts. diff --git a/go.mod b/go.mod index 127852e29014e..9c00d5faacc1a 100644 --- a/go.mod +++ b/go.mod @@ -75,5 +75,3 @@ require ( sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) - -go 1.13 diff --git a/tools/check/go.mod b/tools/check/go.mod index 6dfc12cecadbd..ca5d580f6d6a4 100644 --- a/tools/check/go.mod +++ b/tools/check/go.mod @@ -20,5 +20,3 @@ require ( gopkg.in/yaml.v2 v2.2.2 // indirect honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3 ) - -go 1.13