-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store TiDB server info to PD and add http api handle #7082
Changes from 12 commits
3918624
2370bb0
b574aff
aca6b97
d5a1922
3c8f418
24eea21
f0007a4
159a7b4
f143709
256798e
8bd6c3d
80cdd6a
20e372f
839f298
bb2f009
ed0217e
d0c1925
a925b92
5fdba0b
73ea84f
440953f
9815df2
321cf31
027c983
c2ff1e5
8baa5be
be06280
8697127
38e30f1
c814267
9d091c9
ea275c1
33d26a0
750407c
ec488e9
b40cf74
c32d982
7403bb1
3801efe
b27b804
11dfd86
7fb9c1a
c4586ea
b7b35e3
d54c1ce
01eed94
6060b57
2a97788
2da0623
26c8b25
93c76ce
f8b7b98
9831a1f
34d12be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import ( | |
"github.com/juju/errors" | ||
"github.com/ngaut/pools" | ||
"github.com/pingcap/tidb/ast" | ||
"github.com/pingcap/tidb/config" | ||
"github.com/pingcap/tidb/ddl/util" | ||
"github.com/pingcap/tidb/infoschema" | ||
"github.com/pingcap/tidb/kv" | ||
|
@@ -38,6 +39,7 @@ import ( | |
"github.com/pingcap/tidb/sessionctx/binloginfo" | ||
"github.com/pingcap/tidb/sessionctx/variable" | ||
"github.com/pingcap/tidb/terror" | ||
"github.com/pingcap/tidb/util/printer" | ||
log "github.com/sirupsen/logrus" | ||
"github.com/twinj/uuid" | ||
"golang.org/x/net/context" | ||
|
@@ -207,6 +209,15 @@ type DDL interface { | |
|
||
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing. | ||
SetBinlogClient(interface{}) | ||
|
||
// GetServerInfo get self DDL server static information. | ||
GetServerInfo() *util.DDLServerInfo | ||
// StoreServerInfoToPD store self DDL server static information to PD when DDL server Started. | ||
StoreServerInfoToPD() error | ||
// GetOwnerServerInfo get owner DDL server static information from PD. | ||
GetOwnerServerInfo() (*util.DDLServerInfo, error) | ||
// GetAllServerInfo get all DDL servers static information from PD. | ||
GetAllServerInfo() (map[string]*util.DDLServerInfo, error) | ||
} | ||
|
||
// ddl is used to handle the statements that define the structure or schema of the database. | ||
|
@@ -376,6 +387,10 @@ func (d *ddl) close() { | |
if err != nil { | ||
log.Errorf("[ddl] remove self version path failed %v", err) | ||
} | ||
err = d.schemaSyncer.RemoveSelfServerInfo() | ||
if err != nil { | ||
log.Errorf("[ddl] remove self server info path failed %v", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove |
||
} | ||
|
||
for _, worker := range d.workers { | ||
worker.close() | ||
|
@@ -510,6 +525,47 @@ func (d *ddl) SetBinlogClient(binlogCli interface{}) { | |
d.binlogCli = binlogCli | ||
} | ||
|
||
func (d *ddl) GetServerInfo() *util.DDLServerInfo { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could write it like this. func (d *ddl) GetServerInfo() *util.DDLServerInfo {
cfg := config.GetGlobalConfig()
info := &util.DDLServerInfo{
ID: d.uuid,
IP: cfg.AdvertiseAddress,
StatusPort: cfg.Status.StatusPort,
Lease: cfg.Lease,
}
info.Version = mysql.ServerVersion
info.GitHash = printer.TiDBGitHash
return info
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
cfg := config.GetGlobalConfig() | ||
info := &util.DDLServerInfo{ | ||
ID: d.uuid, | ||
IP: cfg.AdvertiseAddress, | ||
StatusPort: cfg.Status.StatusPort, | ||
Lease: cfg.Lease, | ||
} | ||
info.Version = mysql.ServerVersion | ||
info.GitHash = printer.TiDBGitHash | ||
return info | ||
} | ||
|
||
func (d *ddl) GetOwnerServerInfo() (*util.DDLServerInfo, error) { | ||
ctx := context.Background() | ||
ddlOwnerID, err := d.ownerManager.GetOwnerID(ctx) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
ownerInfo, err := d.schemaSyncer.GetServerInfoFromPD(ctx, ddlOwnerID) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
return ownerInfo, nil | ||
} | ||
|
||
func (d *ddl) GetAllServerInfo() (map[string]*util.DDLServerInfo, error) { | ||
ctx := context.Background() | ||
AllDDLServerInfo, err := d.schemaSyncer.GetAllServerInfoFromPD(ctx) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
return AllDDLServerInfo, nil | ||
} | ||
|
||
func (d *ddl) StoreServerInfoToPD() error { | ||
info := d.GetServerInfo() | ||
ctx := context.Background() | ||
return d.schemaSyncer.UpdateSelfServerInfo(ctx, info) | ||
} | ||
|
||
// DDL error codes. | ||
const ( | ||
codeInvalidWorker terror.ErrCode = 1 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"github.com/coreos/etcd/clientv3" | ||
"github.com/juju/errors" | ||
"github.com/pingcap/tidb/ast" | ||
"github.com/pingcap/tidb/ddl/util" | ||
"github.com/pingcap/tidb/model" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"golang.org/x/net/context" | ||
|
@@ -32,6 +33,8 @@ const mockCheckVersInterval = 2 * time.Millisecond | |
type mockSchemaSyncer struct { | ||
selfSchemaVersion int64 | ||
globalVerCh chan clientv3.WatchResponse | ||
// selfServerInfo used to save self DDL server information. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. selfServerInfo is self DDL server information There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's mean by |
||
selfServerInfo *util.DDLServerInfo | ||
} | ||
|
||
// NewMockSchemaSyncer creates a new mock SchemaSyncer. | ||
|
@@ -102,6 +105,29 @@ func (s *mockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer | |
} | ||
} | ||
|
||
// GetServerInfoFromPD implements SchemaSyncer.GetServerInfoFromPD interface. | ||
func (s *mockSchemaSyncer) GetServerInfoFromPD(ctx context.Context, _ string) (*util.DDLServerInfo, error) { | ||
return s.selfServerInfo, nil | ||
} | ||
|
||
// GetAllServerInfoFromPD implements SchemaSyncer.GetAllServerInfoFromPD interface. | ||
func (s *mockSchemaSyncer) GetAllServerInfoFromPD(ctx context.Context) (map[string]*util.DDLServerInfo, error) { | ||
allDDLInfo := make(map[string]*util.DDLServerInfo) | ||
allDDLInfo[s.selfServerInfo.ID] = s.selfServerInfo | ||
return allDDLInfo, nil | ||
} | ||
|
||
// UpdateSelfServerInfo implements SchemaSyncer.UpdateSelfServerInfo interface. | ||
func (s *mockSchemaSyncer) UpdateSelfServerInfo(ctx context.Context, info *util.DDLServerInfo) error { | ||
s.selfServerInfo = info | ||
return nil | ||
} | ||
|
||
// RemoveSelfServerInfo implements SchemaSyncer.RemoveSelfServerInfo interface. | ||
func (s *mockSchemaSyncer) RemoveSelfServerInfo() error { | ||
return nil | ||
} | ||
|
||
type mockDelRange struct { | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,11 +20,14 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"encoding/json" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put this line to line 17 |
||
"github.com/coreos/etcd/clientv3" | ||
"github.com/coreos/etcd/clientv3/concurrency" | ||
"github.com/juju/errors" | ||
"github.com/pingcap/tidb/ddl/util" | ||
"github.com/pingcap/tidb/metrics" | ||
"github.com/pingcap/tidb/owner" | ||
"github.com/pingcap/tidb/util/hack" | ||
log "github.com/sirupsen/logrus" | ||
"golang.org/x/net/context" | ||
) | ||
|
@@ -33,6 +36,8 @@ const ( | |
// DDLAllSchemaVersions is the path on etcd that is used to store all servers current schema versions. | ||
// It's exported for testing. | ||
DDLAllSchemaVersions = "/tidb/ddl/all_schema_versions" | ||
//DDLServerInformation store DDL server information such as IP,port and so on | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/store/stores There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And there should be a space between |
||
DDLServerInformation = "/tidb/ddl/info" | ||
// DDLGlobalSchemaVersion is the path on etcd that is used to store the latest schema versions. | ||
// It's exported for testing. | ||
DDLGlobalSchemaVersion = "/tidb/ddl/global_schema_version" | ||
|
@@ -83,13 +88,23 @@ type SchemaSyncer interface { | |
// the latest schema version. If the result is false, wait for a while and check again util the processing time reach 2 * lease. | ||
// It returns until all servers' versions are equal to the latest version or the ctx is done. | ||
OwnerCheckAllVersions(ctx context.Context, latestVer int64) error | ||
|
||
// GetServerInfoFromPD get the DDL_id server information from PD. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/get/gets |
||
GetServerInfoFromPD(ctx context.Context, id string) (*util.DDLServerInfo, error) | ||
// GetAllServerInfoFromPD get all DDL servers information from PD. | ||
GetAllServerInfoFromPD(ctx context.Context) (map[string]*util.DDLServerInfo, error) | ||
// UpdateSelfServerInfo store DDL server information to PD. | ||
UpdateSelfServerInfo(ctx context.Context, info *util.DDLServerInfo) error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the info will not be updated during running, |
||
// RemoveSelfServerInfo remove DDL server information from PD. | ||
RemoveSelfServerInfo() error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we make these four methods name consistent? Either all with |
||
} | ||
|
||
type schemaVersionSyncer struct { | ||
selfSchemaVerPath string | ||
etcdCli *clientv3.Client | ||
session *concurrency.Session | ||
mu struct { | ||
selfSchemaVerPath string | ||
selfServerInfoPath string | ||
etcdCli *clientv3.Client | ||
session *concurrency.Session | ||
mu struct { | ||
sync.RWMutex | ||
globalVerCh clientv3.WatchChan | ||
} | ||
|
@@ -98,8 +113,9 @@ type schemaVersionSyncer struct { | |
// NewSchemaSyncer creates a new SchemaSyncer. | ||
func NewSchemaSyncer(etcdCli *clientv3.Client, id string) SchemaSyncer { | ||
return &schemaVersionSyncer{ | ||
etcdCli: etcdCli, | ||
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id), | ||
etcdCli: etcdCli, | ||
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id), | ||
selfServerInfoPath: fmt.Sprintf("%s/%s", DDLServerInformation, id), | ||
} | ||
} | ||
|
||
|
@@ -153,6 +169,92 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error { | |
return errors.Trace(err) | ||
} | ||
|
||
// GetServerInfoFromPD implements SchemaSyncer.GetServerInfoFromPD interface. | ||
func (s *schemaVersionSyncer) GetServerInfoFromPD(ctx context.Context, id string) (*util.DDLServerInfo, error) { | ||
var err error | ||
var resp *clientv3.GetResponse | ||
ddlPath := fmt.Sprintf("%s/%s", DDLServerInformation, id) | ||
for { | ||
if isContextDone(ctx) { | ||
err = errors.Trace(ctx.Err()) | ||
return nil, err | ||
} | ||
|
||
resp, err = s.etcdCli.Get(ctx, ddlPath) | ||
if err != nil { | ||
log.Infof("[syncer] get ddl server info, ddl %s failed %v, continue checking.", ddlPath, err) | ||
time.Sleep(200 * time.Millisecond) | ||
continue | ||
} | ||
if err == nil && len(resp.Kvs) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to check |
||
info := &util.DDLServerInfo{} | ||
err := json.Unmarshal(resp.Kvs[0].Value, info) | ||
if err != nil { | ||
log.Infof("[syncer] get ddl server info, ddl %s json.Unmarshal %v failed %v.", resp.Kvs[0].Key, resp.Kvs[0].Value, err) | ||
return nil, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. errors.Trace(err) |
||
} | ||
return info, nil | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cloud this(line176-198) be extracted into a function? It will be used in GetAllServerInfoFromPD too. |
||
} | ||
|
||
// GetAllServerInfoFromPD implements SchemaSyncer.GetAllServerInfoFromPD interface. | ||
func (s *schemaVersionSyncer) GetAllServerInfoFromPD(ctx context.Context) (map[string]*util.DDLServerInfo, error) { | ||
var err error | ||
allDDLInfo := make(map[string]*util.DDLServerInfo) | ||
for { | ||
if isContextDone(ctx) { | ||
// ctx is canceled or timeout. | ||
err = errors.Trace(ctx.Err()) | ||
return nil, err | ||
} | ||
|
||
resp, err := s.etcdCli.Get(ctx, DDLServerInformation, clientv3.WithPrefix()) | ||
if err != nil { | ||
log.Infof("[syncer] get all ddl server info failed %v, continue checking.", err) | ||
time.Sleep(200 * time.Millisecond) | ||
continue | ||
} | ||
|
||
for _, kv := range resp.Kvs { | ||
info := &util.DDLServerInfo{} | ||
err := json.Unmarshal(kv.Value, info) | ||
if err != nil { | ||
log.Infof("[syncer] get all ddl server info, ddl %s json.Unmarshal %v failed %v.", kv.Key, kv.Value, err) | ||
return nil, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. errors.Trace(err) |
||
} | ||
allDDLInfo[info.ID] = info | ||
} | ||
return allDDLInfo, nil | ||
} | ||
} | ||
|
||
// UpdateSelfServerInfo implements SchemaSyncer.UpdateSelfServerInfo interface. | ||
func (s *schemaVersionSyncer) UpdateSelfServerInfo(ctx context.Context, info *util.DDLServerInfo) error { | ||
infoBuf, err := json.Marshal(info) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfServerInfoPath, hack.String(infoBuf)) | ||
return errors.Trace(err) | ||
} | ||
|
||
// RemoveSelfServerInfo implements SchemaSyncer.RemoveSelfServerInfo interface. | ||
func (s *schemaVersionSyncer) RemoveSelfServerInfo() error { | ||
var err error | ||
ctx := context.Background() | ||
for i := 0; i < keyOpDefaultRetryCnt; i++ { | ||
childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout) | ||
_, err = s.etcdCli.Delete(childCtx, s.selfServerInfoPath) | ||
cancel() | ||
if err == nil { | ||
return nil | ||
} | ||
log.Warnf("[syncer] remove server info path %s failed %v no.%d", s.selfServerInfoPath, err, i) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cloud this(line176-198) be extracted into a function? It will be used in RemoveSelfVersionPath too. |
||
return errors.Trace(err) | ||
} | ||
|
||
// Done implements SchemaSyncer.Done interface. | ||
func (s *schemaVersionSyncer) Done() <-chan struct{} { | ||
return s.session.Done() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,3 +109,20 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old | |
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) | ||
return errors.Trace(err) | ||
} | ||
|
||
// DDLServerInfo is DDL server static information. | ||
// DDLServerInfo will store into PD when server start up and delete when DDL closed. | ||
// It will not update when DDL server running. So please only put static information in DDLServerInfo struct. | ||
type DDLServerInfo struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update this name too. |
||
ServerVersionInfo | ||
ID string `json:"ddl_id"` | ||
IP string `json:"ip"` | ||
StatusPort uint `json:"status_port"` | ||
Lease string `json:"lease"` | ||
} | ||
|
||
// ServerVersionInfo is the server version and git_hash | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add |
||
type ServerVersionInfo struct { | ||
Version string `json:"version"` | ||
GitHash string `json:"git_hash"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related with DDL.