Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store TiDB server info to PD and add http api handle #7082

Merged
merged 55 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
3918624
add get all server info api
crazycs520 Jul 17, 2018
2370bb0
rename and add comment
crazycs520 Jul 17, 2018
b574aff
add cluster info
crazycs520 Jul 18, 2018
aca6b97
remove isOwner from server static info
crazycs520 Jul 18, 2018
d5a1922
refine comment
crazycs520 Jul 18, 2018
3c8f418
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Jul 18, 2018
24eea21
add lease to server info and change tidb_http_api.md
crazycs520 Jul 18, 2018
f0007a4
refine log
crazycs520 Jul 18, 2018
159a7b4
refine http-api ddl/info
crazycs520 Jul 18, 2018
f143709
refine code
crazycs520 Jul 19, 2018
256798e
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Jul 19, 2018
8bd6c3d
rename variable
crazycs520 Jul 19, 2018
80cdd6a
refactor code
crazycs520 Jul 22, 2018
20e372f
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Jul 23, 2018
839f298
refactor and add comment
crazycs520 Jul 23, 2018
bb2f009
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Jul 23, 2018
ed0217e
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Jul 31, 2018
d0c1925
refine code
crazycs520 Aug 2, 2018
a925b92
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 2, 2018
5fdba0b
refine code
crazycs520 Aug 2, 2018
73ea84f
add http test
crazycs520 Aug 2, 2018
440953f
fmt code
crazycs520 Aug 2, 2018
9815df2
add test and comment
crazycs520 Aug 5, 2018
321cf31
update test
crazycs520 Aug 5, 2018
027c983
refactor code
crazycs520 Aug 5, 2018
c2ff1e5
refactor code
crazycs520 Aug 6, 2018
8baa5be
refine code
crazycs520 Aug 6, 2018
be06280
refine code and add comment
crazycs520 Aug 7, 2018
8697127
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 7, 2018
38e30f1
update test and comment
crazycs520 Aug 7, 2018
c814267
refine code
crazycs520 Aug 7, 2018
9d091c9
rename function
crazycs520 Aug 8, 2018
ea275c1
refine log
crazycs520 Aug 8, 2018
33d26a0
refine code
crazycs520 Aug 8, 2018
750407c
refine code and add comment
crazycs520 Aug 8, 2018
ec488e9
refine code and comment
crazycs520 Aug 8, 2018
b40cf74
refine code
crazycs520 Aug 8, 2018
c32d982
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 9, 2018
7403bb1
only return self server info when do /info http request
crazycs520 Aug 9, 2018
3801efe
refine code and comment
crazycs520 Aug 9, 2018
b27b804
refine comment
crazycs520 Aug 9, 2018
11dfd86
store server info with lease
crazycs520 Aug 9, 2018
7fb9c1a
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 9, 2018
c4586ea
use new seesion for server info syncer
crazycs520 Aug 10, 2018
b7b35e3
address comment
crazycs520 Aug 10, 2018
d54c1ce
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 11, 2018
01eed94
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 13, 2018
6060b57
address comment
crazycs520 Aug 13, 2018
2a97788
address comment
crazycs520 Aug 14, 2018
2da0623
address comment
crazycs520 Aug 14, 2018
26c8b25
address comment
crazycs520 Aug 14, 2018
93c76ce
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 14, 2018
f8b7b98
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 15, 2018
9831a1f
Merge branch 'master' of https://github.com/pingcap/tidb into put_ser…
crazycs520 Aug 15, 2018
34d12be
Merge branch 'master' into put_server_info_to_pd
crazycs520 Aug 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/juju/errors"
"github.com/ngaut/pools"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/printer"
log "github.com/sirupsen/logrus"
"github.com/twinj/uuid"
"golang.org/x/net/context"
Expand Down Expand Up @@ -207,6 +209,15 @@ type DDL interface {

// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(interface{})

// GetServerInfo get self DDL server static information.
GetServerInfo() *util.DDLServerInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related with DDL.

// StoreServerInfoToPD store self DDL server static information to PD when DDL server Started.
StoreServerInfoToPD() error
// GetOwnerServerInfo get owner DDL server static information from PD.
GetOwnerServerInfo() (*util.DDLServerInfo, error)
// GetAllServerInfo get all DDL servers static information from PD.
GetAllServerInfo() (map[string]*util.DDLServerInfo, error)
}

// ddl is used to handle the statements that define the structure or schema of the database.
Expand Down Expand Up @@ -376,6 +387,10 @@ func (d *ddl) close() {
if err != nil {
log.Errorf("[ddl] remove self version path failed %v", err)
}
err = d.schemaSyncer.RemoveSelfServerInfo()
if err != nil {
log.Errorf("[ddl] remove self server info path failed %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove path?

}

for _, worker := range d.workers {
worker.close()
Expand Down Expand Up @@ -510,6 +525,46 @@ func (d *ddl) SetBinlogClient(binlogCli interface{}) {
d.binlogCli = binlogCli
}

func (d *ddl) GetServerInfo() *util.DDLServerInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could write it like this.

func (d *ddl) GetServerInfo() *util.DDLServerInfo {
	cfg := config.GetGlobalConfig()
	info := &util.DDLServerInfo{
		ID:         d.uuid,
		IP:         cfg.AdvertiseAddress,
		StatusPort: cfg.Status.StatusPort,
		Lease:      cfg.Lease,
	}
	info.Version = mysql.ServerVersion
	info.GitHash = printer.TiDBGitHash
	return info
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

info := &util.DDLServerInfo{}
cfg := config.GetGlobalConfig()
info.IP = cfg.AdvertiseAddress
info.StatusPort = cfg.Status.StatusPort
info.Lease = cfg.Lease
info.Version = mysql.ServerVersion
info.GitHash = printer.TiDBGitHash
info.ID = d.uuid
return info
}

func (d *ddl) GetOwnerServerInfo() (*util.DDLServerInfo, error) {
ctx := context.Background()
ddlOwnerID, err := d.ownerManager.GetOwnerID(ctx)
if err != nil {
return nil, errors.Trace(err)
}
ownerInfo, err := d.schemaSyncer.GetServerInfoFromPD(ctx, ddlOwnerID)
if err != nil {
return nil, errors.Trace(err)
}
return ownerInfo, nil
}

func (d *ddl) GetAllServerInfo() (map[string]*util.DDLServerInfo, error) {
ctx := context.Background()
AllDDLServerInfo, err := d.schemaSyncer.GetAllServerInfoFromPD(ctx)
if err != nil {
return nil, errors.Trace(err)
}
return AllDDLServerInfo, nil
}

func (d *ddl) StoreServerInfoToPD() error {
info := d.GetServerInfo()
ctx := context.Background()
return d.schemaSyncer.UpdateSelfServerInfo(ctx, info)
}

// DDL error codes.
const (
codeInvalidWorker terror.ErrCode = 1
Expand Down
26 changes: 26 additions & 0 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"golang.org/x/net/context"
Expand All @@ -32,6 +33,8 @@ const mockCheckVersInterval = 2 * time.Millisecond
type mockSchemaSyncer struct {
selfSchemaVersion int64
globalVerCh chan clientv3.WatchResponse
// selfServerInfo used to save self DDL server information.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

selfServerInfo is self DDL server information

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's mean by DDL server?

selfServerInfo *util.DDLServerInfo
}

// NewMockSchemaSyncer creates a new mock SchemaSyncer.
Expand Down Expand Up @@ -102,6 +105,29 @@ func (s *mockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer
}
}

// GetServerInfoFromPD implements SchemaSyncer.GetServerInfoFromPD interface.
func (s *mockSchemaSyncer) GetServerInfoFromPD(ctx context.Context, _ string) (*util.DDLServerInfo, error) {
return s.selfServerInfo, nil
}

// GetAllServerInfoFromPD implements SchemaSyncer.GetAllServerInfoFromPD interface.
func (s *mockSchemaSyncer) GetAllServerInfoFromPD(ctx context.Context) (map[string]*util.DDLServerInfo, error) {
allDDLInfo := make(map[string]*util.DDLServerInfo)
allDDLInfo[s.selfServerInfo.ID] = s.selfServerInfo
return allDDLInfo, nil
}

// UpdateSelfServerInfo implements SchemaSyncer.UpdateSelfServerInfo interface.
func (s *mockSchemaSyncer) UpdateSelfServerInfo(ctx context.Context, info *util.DDLServerInfo) error {
s.selfServerInfo = info
return nil
}

// RemoveSelfServerInfo implements SchemaSyncer.RemoveSelfServerInfo interface.
func (s *mockSchemaSyncer) RemoveSelfServerInfo() error {
return nil
}

type mockDelRange struct {
}

Expand Down
114 changes: 108 additions & 6 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"sync"
"time"

"encoding/json"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this line to line 17

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/juju/errors"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/util/hack"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand All @@ -33,6 +36,8 @@ const (
// DDLAllSchemaVersions is the path on etcd that is used to store all servers current schema versions.
// It's exported for testing.
DDLAllSchemaVersions = "/tidb/ddl/all_schema_versions"
//DDLServerInformation store DDL server information such as IP,port and so on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/store/stores
s/IP,port/IP, port
Add . at the end of a line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And there should be a space between // and DDL....

DDLServerInformation = "/tidb/ddl/info"
// DDLGlobalSchemaVersion is the path on etcd that is used to store the latest schema versions.
// It's exported for testing.
DDLGlobalSchemaVersion = "/tidb/ddl/global_schema_version"
Expand Down Expand Up @@ -83,13 +88,23 @@ type SchemaSyncer interface {
// the latest schema version. If the result is false, wait for a while and check again util the processing time reach 2 * lease.
// It returns until all servers' versions are equal to the latest version or the ctx is done.
OwnerCheckAllVersions(ctx context.Context, latestVer int64) error

// GetServerInfoFromPD get the DDL_id server information from PD.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/get/gets
And update line 94、line 96、line 98.

GetServerInfoFromPD(ctx context.Context, id string) (*util.DDLServerInfo, error)
// GetAllServerInfoFromPD get all DDL servers information from PD.
GetAllServerInfoFromPD(ctx context.Context) (map[string]*util.DDLServerInfo, error)
// UpdateSelfServerInfo store DDL server information to PD.
UpdateSelfServerInfo(ctx context.Context, info *util.DDLServerInfo) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the info will not be updated during running,
I think that name it as StoreSelfServerInfoToPD is enough

// RemoveSelfServerInfo remove DDL server information from PD.
RemoveSelfServerInfo() error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make these four methods name consistent? Either all with PD suffix or all without the PD suffix.

}

type schemaVersionSyncer struct {
selfSchemaVerPath string
etcdCli *clientv3.Client
session *concurrency.Session
mu struct {
selfSchemaVerPath string
selfServerInfoPath string
etcdCli *clientv3.Client
session *concurrency.Session
mu struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
}
Expand All @@ -98,8 +113,9 @@ type schemaVersionSyncer struct {
// NewSchemaSyncer creates a new SchemaSyncer.
func NewSchemaSyncer(etcdCli *clientv3.Client, id string) SchemaSyncer {
return &schemaVersionSyncer{
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id),
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id),
selfServerInfoPath: fmt.Sprintf("%s/%s", DDLServerInformation, id),
}
}

Expand Down Expand Up @@ -153,6 +169,92 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
return errors.Trace(err)
}

// GetServerInfoFromPD implements SchemaSyncer.GetServerInfoFromPD interface.
func (s *schemaVersionSyncer) GetServerInfoFromPD(ctx context.Context, id string) (*util.DDLServerInfo, error) {
var err error
var resp *clientv3.GetResponse
ddlPath := fmt.Sprintf("%s/%s", DDLServerInformation, id)
for {
if isContextDone(ctx) {
err = errors.Trace(ctx.Err())
return nil, err
}

resp, err = s.etcdCli.Get(ctx, ddlPath)
if err != nil {
log.Infof("[syncer] get ddl server info, ddl %s failed %v, continue checking.", ddlPath, err)
time.Sleep(200 * time.Millisecond)
continue
}
if err == nil && len(resp.Kvs) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to check err == nil here

info := &util.DDLServerInfo{}
err := json.Unmarshal(resp.Kvs[0].Value, info)
if err != nil {
log.Infof("[syncer] get ddl server info, ddl %s json.Unmarshal %v failed %v.", resp.Kvs[0].Key, resp.Kvs[0].Value, err)
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.Trace(err)

}
return info, nil
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud this(line176-198) be extracted into a function? It will be used in GetAllServerInfoFromPD too.

}

// GetAllServerInfoFromPD implements SchemaSyncer.GetAllServerInfoFromPD interface.
func (s *schemaVersionSyncer) GetAllServerInfoFromPD(ctx context.Context) (map[string]*util.DDLServerInfo, error) {
var err error
allDDLInfo := make(map[string]*util.DDLServerInfo)
for {
if isContextDone(ctx) {
// ctx is canceled or timeout.
err = errors.Trace(ctx.Err())
return nil, err
}

resp, err := s.etcdCli.Get(ctx, DDLServerInformation, clientv3.WithPrefix())
if err != nil {
log.Infof("[syncer] get all ddl server info failed %v, continue checking.", err)
time.Sleep(200 * time.Millisecond)
continue
}

for _, kv := range resp.Kvs {
info := &util.DDLServerInfo{}
err := json.Unmarshal(kv.Value, info)
if err != nil {
log.Infof("[syncer] get all ddl server info, ddl %s json.Unmarshal %v failed %v.", kv.Key, kv.Value, err)
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.Trace(err)

}
allDDLInfo[info.ID] = info
}
return allDDLInfo, nil
}
}

// UpdateSelfServerInfo implements SchemaSyncer.UpdateSelfServerInfo interface.
func (s *schemaVersionSyncer) UpdateSelfServerInfo(ctx context.Context, info *util.DDLServerInfo) error {
infoBuf, err := json.Marshal(info)
if err != nil {
return errors.Trace(err)
}
err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfServerInfoPath, hack.String(infoBuf))
return errors.Trace(err)
}

// RemoveSelfServerInfo implements SchemaSyncer.RemoveSelfServerInfo interface.
func (s *schemaVersionSyncer) RemoveSelfServerInfo() error {
var err error
ctx := context.Background()
for i := 0; i < keyOpDefaultRetryCnt; i++ {
childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
_, err = s.etcdCli.Delete(childCtx, s.selfServerInfoPath)
cancel()
if err == nil {
return nil
}
log.Warnf("[syncer] remove server info path %s failed %v no.%d", s.selfServerInfoPath, err, i)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud this(line176-198) be extracted into a function? It will be used in RemoveSelfVersionPath too.

return errors.Trace(err)
}

// Done implements SchemaSyncer.Done interface.
func (s *schemaVersionSyncer) Done() <-chan struct{} {
return s.session.Done()
Expand Down
17 changes: 17 additions & 0 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,20 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
return errors.Trace(err)
}

// DDLServerInfo is DDL server static information.
// DDLServerInfo will store into PD when server start up and delete when DDL closed.
// It will not update when DDL server running. So please only put static information in DDLServerInfo struct.
type DDLServerInfo struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update this name too.

ServerVersionInfo
ID string `json:"ddl_id"`
IP string `json:"ip"`
StatusPort uint `json:"status_port"`
Lease string `json:"lease"`
}

// ServerVersionInfo is the server version and git_hash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add . at the end of a line.

type ServerVersionInfo struct {
Version string `json:"version"`
GitHash string `json:"git_hash"`
}
12 changes: 12 additions & 0 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ timezone.*
curl http://{TiDBIP}:10080/settings
```

1. Get TiDB server information and owner server infomation.

```shell
curl http://{TiDBIP}:10080/ddl/info
```

1. Get TiDB cluster all servers information.

```shell
curl http://{TiDBIP}:10080/ddl/info/all
```

1. Enable/Disable TiDB server general log

```shell
Expand Down
4 changes: 4 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,10 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
if err != nil {
return errors.Trace(err)
}
err = do.ddl.StoreServerInfoToPD()
if err != nil {
return errors.Trace(err)
}
err = do.Reload()
if err != nil {
return errors.Trace(err)
Expand Down
Loading