Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: report min start timestamp of TiDB server #12133

Merged
merged 8 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,12 @@ func (do *Domain) topNSlowQueryLoop() {
func (do *Domain) infoSyncerKeeper() {
defer do.wg.Done()
defer recoverInDomain("infoSyncerKeeper", false)
ticker := time.NewTicker(time.Second * time.Duration(InfoSessionTTL) / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
do.info.UpdateMinStartTS()
case <-do.info.Done():
logutil.BgLogger().Info("server info syncer need to restart")
if err := do.info.Restart(context.Background()); err != nil {
Expand Down Expand Up @@ -547,6 +551,7 @@ func (do *Domain) Close() {
}
if do.info != nil {
do.info.RemoveServerInfo()
do.info.RemoveMinStartTS()
}
close(do.exit)
if do.etcdClient != nil {
Expand Down
64 changes: 63 additions & 1 deletion domain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"time"

"github.com/coreos/etcd/clientv3"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/owner"
util2 "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/printer"
Expand All @@ -35,6 +38,8 @@ import (
const (
// ServerInformationPath store server information such as IP, port and so on.
ServerInformationPath = "/tidb/server/info"
// ServerMinStartTSPath store the server min start timestamp.
ServerMinStartTSPath = "/tidb/server/minstartts"
// keyOpDefaultRetryCnt is the default retry count for etcd store.
keyOpDefaultRetryCnt = 2
// keyOpDefaultTimeout is the default time out for etcd store.
Expand All @@ -49,6 +54,9 @@ type InfoSyncer struct {
etcdCli *clientv3.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we don't need this field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be used by the next PR.

minStartTSPath string
manager util2.SessionManager
session *concurrency.Session
}

Expand All @@ -75,6 +83,7 @@ func NewInfoSyncer(id string, etcdCli *clientv3.Client) *InfoSyncer {
etcdCli: etcdCli,
info: getServerInfo(id),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
}
}

Expand All @@ -83,6 +92,11 @@ func (is *InfoSyncer) Init(ctx context.Context) error {
return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
}

// SetSessionManager set the session manager for InfoSyncer.
func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) {
is.manager = manager
}

// GetServerInfo gets self server static information.
func (is *InfoSyncer) GetServerInfo() *ServerInfo {
return is.info
Expand Down Expand Up @@ -144,6 +158,47 @@ func (is *InfoSyncer) RemoveServerInfo() {
}
}

// storeMinStartTS stores self server min start timestamp to etcd.
func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error {
if is.etcdCli == nil {
return nil
}
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
strconv.FormatUint(is.minStartTS, 10),
clientv3.WithLease(is.session.Lease()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does all TiDB servers updates the same key in etcd? If so, how do TiDB servers know whether there's another TiDB server who has a smaller minStartTs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I got it. LGTM.

}

// RemoveMinStartTS removes self server min start timestamp from etcd.
func (is *InfoSyncer) RemoveMinStartTS() {
if is.etcdCli == nil {
return
}
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err))
}
}

// UpdateMinStartTS updates self server min start timestamp from etcd.
func (is *InfoSyncer) UpdateMinStartTS() {
if is.manager == nil {
// Server may not start in time.
return
}
pl := is.manager.ShowProcessList()
var minStartTS uint64 = math.MaxUint64
for _, info := range pl {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if len(pl) == 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Report max uint64 to PD.

if info.CurTxnStartTS < minStartTS {
minStartTS = info.CurTxnStartTS
}
}
is.minStartTS = minStartTS
err := is.storeMinStartTS(context.Background())
if err != nil {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
}
}

// Done returns a channel that closes when the info syncer is no longer being refreshed.
func (is InfoSyncer) Done() <-chan struct{} {
if is.etcdCli == nil {
Expand All @@ -170,7 +225,14 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt
is.session = session

err = is.storeServerInfo(ctx)
return err
if err != nil {
return err
}
err = is.storeMinStartTS(ctx)
if err != nil {
return err
}
return nil
}

// getInfo gets server information from etcd according to the key and opts.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ require (
sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
)

go 1.13
1 change: 1 addition & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ func createServer() {
// Both domain and storage have started, so we have to clean them before exiting.
terror.MustNil(err, closeDomainAndStorage)
go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run()
dom.InfoSyncer().SetSessionManager(svr)
}

func serverShutdown(isgraceful bool) {
Expand Down
2 changes: 2 additions & 0 deletions tools/check/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ require (
gopkg.in/yaml.v2 v2.2.2 // indirect
honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3
)

go 1.13