-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
domain: report min start timestamp of TiDB server #12133
Changes from 5 commits
d6b708b
bec9246
4269a29
bcf9859
fbc6a84
6e159ca
8912afd
a4b7d09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ import ( | |
"context" | ||
"encoding/json" | ||
"fmt" | ||
"math" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/coreos/etcd/clientv3" | ||
|
@@ -26,6 +28,7 @@ import ( | |
"github.com/pingcap/tidb/config" | ||
"github.com/pingcap/tidb/ddl/util" | ||
"github.com/pingcap/tidb/owner" | ||
util2 "github.com/pingcap/tidb/util" | ||
"github.com/pingcap/tidb/util/hack" | ||
"github.com/pingcap/tidb/util/logutil" | ||
"github.com/pingcap/tidb/util/printer" | ||
|
@@ -35,6 +38,8 @@ import ( | |
const ( | ||
// ServerInformationPath store server information such as IP, port and so on. | ||
ServerInformationPath = "/tidb/server/info" | ||
// ServerMinStartTSPath store the server min start timestamp. | ||
ServerMinStartTSPath = "/tidb/server/minstartts" | ||
// keyOpDefaultRetryCnt is the default retry count for etcd store. | ||
keyOpDefaultRetryCnt = 2 | ||
// keyOpDefaultTimeout is the default time out for etcd store. | ||
|
@@ -49,6 +54,9 @@ type InfoSyncer struct { | |
etcdCli *clientv3.Client | ||
info *ServerInfo | ||
serverInfoPath string | ||
minStartTS uint64 | ||
minStartTSPath string | ||
manager util2.SessionManager | ||
session *concurrency.Session | ||
} | ||
|
||
|
@@ -75,6 +83,7 @@ func NewInfoSyncer(id string, etcdCli *clientv3.Client) *InfoSyncer { | |
etcdCli: etcdCli, | ||
info: getServerInfo(id), | ||
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), | ||
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), | ||
} | ||
} | ||
|
||
|
@@ -83,6 +92,11 @@ func (is *InfoSyncer) Init(ctx context.Context) error { | |
return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) | ||
} | ||
|
||
// SetSessionManager set the session manager for InfoSyncer. | ||
func (is *InfoSyncer) SetSessionManager(manager util2.SessionManager) { | ||
is.manager = manager | ||
} | ||
|
||
// GetServerInfo gets self server static information. | ||
func (is *InfoSyncer) GetServerInfo() *ServerInfo { | ||
return is.info | ||
|
@@ -144,6 +158,47 @@ func (is *InfoSyncer) RemoveServerInfo() { | |
} | ||
} | ||
|
||
// storeMinStartTS stores self server min start timestamp to etcd. | ||
func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error { | ||
if is.etcdCli == nil { | ||
return nil | ||
} | ||
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath, | ||
strconv.FormatUint(is.minStartTS, 10), | ||
clientv3.WithLease(is.session.Lease())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does all TiDB servers updates the same key in etcd? If so, how do TiDB servers know whether there's another TiDB server who has a smaller There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I got it. LGTM. |
||
} | ||
|
||
// RemoveMinStartTS removes self server min start timestamp from etcd. | ||
func (is *InfoSyncer) RemoveMinStartTS() { | ||
if is.etcdCli == nil { | ||
return | ||
} | ||
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) | ||
if err != nil { | ||
logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err)) | ||
} | ||
} | ||
|
||
// UpdateMinStartTS updates self server min start timestamp from etcd. | ||
func (is *InfoSyncer) UpdateMinStartTS() { | ||
if is.manager == nil { | ||
// Server may not start in time. | ||
return | ||
} | ||
pl := is.manager.ShowProcessList() | ||
var minStartTS uint64 = math.MaxUint64 | ||
for _, info := range pl { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Report max uint64 to PD. |
||
if info.CurTxnStartTS < minStartTS { | ||
minStartTS = info.CurTxnStartTS | ||
} | ||
} | ||
is.minStartTS = minStartTS | ||
err := is.storeMinStartTS(context.Background()) | ||
if err != nil { | ||
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err)) | ||
} | ||
} | ||
|
||
// Done returns a channel that closes when the info syncer is no longer being refreshed. | ||
func (is InfoSyncer) Done() <-chan struct{} { | ||
if is.etcdCli == nil { | ||
|
@@ -170,7 +225,14 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt | |
is.session = session | ||
|
||
err = is.storeServerInfo(ctx) | ||
return err | ||
if err != nil { | ||
return err | ||
} | ||
err = is.storeMinStartTS(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// getInfo gets server information from etcd according to the key and opts. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,3 +20,5 @@ require ( | |
gopkg.in/yaml.v2 v2.2.2 // indirect | ||
honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3 | ||
) | ||
|
||
go 1.13 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we don't need this field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be used by the next PR.