Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: report min start timestamp of TiDB server #12133

Merged
merged 8 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,12 @@ func (do *Domain) topNSlowQueryLoop() {
func (do *Domain) infoSyncerKeeper() {
defer do.wg.Done()
defer recoverInDomain("infoSyncerKeeper", false)
ticker := time.NewTicker(time.Second * time.Duration(InfoSessionTTL) / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
do.info.ReportMinStartTS()
case <-do.info.Done():
logutil.BgLogger().Info("server info syncer need to restart")
if err := do.info.Restart(context.Background()); err != nil {
Expand Down Expand Up @@ -547,6 +551,7 @@ func (do *Domain) Close() {
}
if do.info != nil {
do.info.RemoveServerInfo()
do.info.RemoveMinStartTS()
}
close(do.exit)
if do.etcdClient != nil {
Expand Down
55 changes: 55 additions & 0 deletions domain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"time"

"github.com/coreos/etcd/clientv3"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/owner"
util2 "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/printer"
Expand All @@ -35,6 +38,8 @@ import (
const (
// ServerInformationPath store server information such as IP, port and so on.
ServerInformationPath = "/tidb/server/info"
// ServerMinStartTSPath store the server min start timestamp.
ServerMinStartTSPath = "/tidb/server/minstartts"
// keyOpDefaultRetryCnt is the default retry count for etcd store.
keyOpDefaultRetryCnt = 2
// keyOpDefaultTimeout is the default time out for etcd store.
Expand All @@ -49,6 +54,9 @@ type InfoSyncer struct {
etcdCli *clientv3.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we don't need this field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be used by the next PR.

minStartTSPath string
manager util2.SessionManager
session *concurrency.Session
}

Expand All @@ -75,6 +83,7 @@ func NewInfoSyncer(id string, etcdCli *clientv3.Client) *InfoSyncer {
etcdCli: etcdCli,
info: getServerInfo(id),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
}
}

Expand All @@ -83,6 +92,11 @@ func (is *InfoSyncer) Init(ctx context.Context) error {
return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
}

// SetSessionManager set the session manager for InfoSyncer.
func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) {
is.manager = manager
}

// GetServerInfo gets self server static information.
func (is *InfoSyncer) GetServerInfo() *ServerInfo {
return is.info
Expand Down Expand Up @@ -144,6 +158,47 @@ func (is *InfoSyncer) RemoveServerInfo() {
}
}

// storeMinStartTS stores self server min start timestamp to etcd.
func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error {
if is.etcdCli == nil {
return nil
}
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
strconv.FormatUint(is.minStartTS, 10),
clientv3.WithLease(is.session.Lease()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does all TiDB servers updates the same key in etcd? If so, how do TiDB servers know whether there's another TiDB server who has a smaller minStartTs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I got it. LGTM.

}

// RemoveMinStartTS removes self server min start timestamp from etcd.
func (is *InfoSyncer) RemoveMinStartTS() {
if is.etcdCli == nil {
return
}
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err))
}
}

// ReportMinStartTS reports self server min start timestamp to ETCD.
func (is *InfoSyncer) ReportMinStartTS() {
if is.manager == nil {
// Server may not start in time.
return
}
pl := is.manager.ShowProcessList()
var minStartTS uint64 = math.MaxUint64
for _, info := range pl {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if len(pl) == 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Report max uint64 to PD.

if info.CurTxnStartTS < minStartTS {
minStartTS = info.CurTxnStartTS
}
}
is.minStartTS = minStartTS
err := is.storeMinStartTS(context.Background())
if err != nil {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
}
}

// Done returns a channel that closes when the info syncer is no longer being refreshed.
func (is InfoSyncer) Done() <-chan struct{} {
if is.etcdCli == nil {
Expand Down
1 change: 1 addition & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ func createServer() {
// Both domain and storage have started, so we have to clean them before exiting.
terror.MustNil(err, closeDomainAndStorage)
go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run()
dom.InfoSyncer().SetSessionManager(svr)
}

func serverShutdown(isgraceful bool) {
Expand Down