Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement disk/cpu/memory/load diagnosis rules #14668

Merged
merged 18 commits into from
Feb 10, 2020
Merged
15 changes: 3 additions & 12 deletions executor/cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (e *clusterConfigRetriever) retrieve(_ context.Context, sctx sessionctx.Con
if err != nil {
return nil, err
}
serversInfo = filterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Addresses)
serversInfo = filterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Instances)

var finalRows [][]types.Datum
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (e *clusterServerInfoRetriever) retrieve(ctx context.Context, sctx sessionc
if err != nil {
return nil, err
}
serversInfo = filterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Addresses)
serversInfo = filterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Instances)

type result struct {
idx int
Expand All @@ -244,22 +244,13 @@ func (e *clusterServerInfoRetriever) retrieve(ctx context.Context, sctx sessionc
}
wg := sync.WaitGroup{}
ch := make(chan result, len(serversInfo))
ipMap := make(map[string]struct{}, len(serversInfo))
infoTp := e.serverInfoType
finalRows := make([][]types.Datum, 0, len(serversInfo)*10)
for i, srv := range serversInfo {
address := srv.Address
if srv.ServerType == "tidb" {
address = srv.StatusAddr
}
ip := address
if idx := strings.Index(address, ":"); idx != -1 {
ip = address[:idx]
}
if _, ok := ipMap[ip]; ok {
continue
}
ipMap[ip] = struct{}{}
wg.Add(1)
go func(index int, address, serverTP string) {
util.WithRecovery(func() {
Expand Down Expand Up @@ -441,7 +432,7 @@ func (e *clusterLogRetriever) startRetrieving(ctx context.Context, sctx sessionc
return nil, err
}

addresses := e.extractor.Addresses
addresses := e.extractor.Instances
lonng marked this conversation as resolved.
Show resolved Hide resolved
nodeTypes := e.extractor.NodeTypes
serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, addresses)

Expand Down
40 changes: 20 additions & 20 deletions executor/cluster_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func (s *testClusterReaderSuite) TestMetricTableData(c *C) {
c.Assert(err, IsNil)
result := tk.ResultSetToResultWithCtx(ctx, rs[0], Commentf("execute sql fail"))
result.Check(testkit.Rows(
"2019-12-23 20:11:35.000000 0.1 127.0.0.1:10080 0.9"))
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.9 0.1"))

rs, err = tk.Se.Execute(ctx, "select * from tidb_query_duration where quantile in (0.85, 0.95);")
c.Assert(err, IsNil)
result = tk.ResultSetToResultWithCtx(ctx, rs[0], Commentf("execute sql fail"))
result.Check(testkit.Rows(
"2019-12-23 20:11:35.000000 0.1 127.0.0.1:10080 0.85",
"2019-12-23 20:11:35.000000 0.1 127.0.0.1:10080 0.95"))
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.85 0.1",
"2019-12-23 20:11:35.000000 127.0.0.1:10080 0.95 0.1"))
}

func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
Expand Down Expand Up @@ -237,7 +237,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
),
},
{
sql: "select * from information_schema.cluster_config where type='pd' or address='" + testServers[0].address + "'",
sql: "select * from information_schema.cluster_config where type='pd' or instance='" + testServers[0].address + "'",
reqCount: 9,
rows: flatten(
rows["tidb"][0],
Expand Down Expand Up @@ -315,7 +315,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
),
},
{
sql: fmt.Sprintf(`select * from information_schema.cluster_config where address='%s'`,
sql: fmt.Sprintf(`select * from information_schema.cluster_config where instance='%s'`,
testServers[0].address),
reqCount: 3,
rows: flatten(
Expand All @@ -325,15 +325,15 @@ func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
),
},
{
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type='tidb' and address='%s'`,
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type='tidb' and instance='%s'`,
testServers[0].address),
reqCount: 1,
rows: flatten(
rows["tidb"][0],
),
},
{
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and address='%s'`,
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and instance='%s'`,
testServers[0].address),
reqCount: 2,
rows: flatten(
Expand All @@ -342,7 +342,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
),
},
{
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and address in ('%s', '%s')`,
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and instance in ('%s', '%s')`,
testServers[0].address, testServers[0].address),
reqCount: 2,
rows: flatten(
Expand All @@ -351,7 +351,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
),
},
{
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and address in ('%s', '%s')`,
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and instance in ('%s', '%s')`,
testServers[0].address, testServers[1].address),
reqCount: 4,
rows: flatten(
Expand All @@ -362,17 +362,17 @@ func (s *testClusterReaderSuite) TestTiDBClusterConfig(c *C) {
),
},
{
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and type='pd' and address in ('%s', '%s')`,
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and type='pd' and instance in ('%s', '%s')`,
testServers[0].address, testServers[1].address),
reqCount: 0,
},
{
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and address in ('%s', '%s') and address='%s'`,
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and instance in ('%s', '%s') and instance='%s'`,
testServers[0].address, testServers[1].address, testServers[2].address),
reqCount: 0,
},
{
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and address in ('%s', '%s') and address='%s'`,
sql: fmt.Sprintf(`select * from information_schema.cluster_config where type in ('tidb', 'tikv') and instance in ('%s', '%s') and instance='%s'`,
testServers[0].address, testServers[1].address, testServers[0].address),
reqCount: 2,
rows: flatten(
Expand Down Expand Up @@ -465,7 +465,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
logtime(`2019/08/26 06:19:16.011`) + ` [trace] [test log message tidb 4, foo]`,
logtime(`2019/08/26 06:19:17.011`) + ` [CRITICAL] [test log message tidb 5, foo]`,
})
s.writeTmpFile(c, testServers["tidb"].tmpDir, "tidb.log.1", []string{
s.writeTmpFile(c, testServers["tidb"].tmpDir, "tidb-1.log", []string{
logtime(`2019/08/26 06:25:13.011`) + ` [info] [test log message tidb 10, bar]`,
logtime(`2019/08/26 06:25:14.011`) + ` [debug] [test log message tidb 11, bar]`,
logtime(`2019/08/26 06:25:15.011`) + ` [ERROR] [test log message tidb 12, bar]`,
Expand All @@ -481,7 +481,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
logtime(`2019/08/26 06:22:16.011`) + ` [trace] [test log message tikv 4, foo]`,
logtime(`2019/08/26 06:23:17.011`) + ` [CRITICAL] [test log message tikv 5, foo]`,
})
s.writeTmpFile(c, testServers["tikv"].tmpDir, "tikv.log.1", []string{
s.writeTmpFile(c, testServers["tikv"].tmpDir, "tikv-1.log", []string{
logtime(`2019/08/26 06:24:15.011`) + ` [info] [test log message tikv 10, bar]`,
logtime(`2019/08/26 06:25:16.011`) + ` [debug] [test log message tikv 11, bar]`,
logtime(`2019/08/26 06:26:17.011`) + ` [ERROR] [test log message tikv 12, bar]`,
Expand All @@ -497,7 +497,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
logtime(`2019/08/26 06:21:16.011`) + ` [trace] [test log message pd 4, foo]`,
logtime(`2019/08/26 06:22:17.011`) + ` [CRITICAL] [test log message pd 5, foo]`,
})
s.writeTmpFile(c, testServers["pd"].tmpDir, "pd.log.1", []string{
s.writeTmpFile(c, testServers["pd"].tmpDir, "pd-1.log", []string{
logtime(`2019/08/26 06:23:13.011`) + ` [info] [test log message pd 10, bar]`,
logtime(`2019/08/26 06:24:14.011`) + ` [debug] [test log message pd 11, bar]`,
logtime(`2019/08/26 06:25:15.011`) + ` [ERROR] [test log message pd 12, bar]`,
Expand Down Expand Up @@ -650,7 +650,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
conditions: []string{
"time>='2019/08/26 06:19:13.011'",
"time<='2019/08/26 06:21:15.011'",
fmt.Sprintf("address='%s'", testServers["pd"].address),
fmt.Sprintf("instance='%s'", testServers["pd"].address),
},
expected: [][]string{
{"2019/08/26 06:19:14.011", "pd", "DEBUG", "[test log message pd 2, foo]"},
Expand All @@ -661,7 +661,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
conditions: []string{
"time>='2019/08/26 06:19:13.011'",
"time<='2019/08/26 06:21:15.011'",
fmt.Sprintf("address='%s'", testServers["tidb"].address),
fmt.Sprintf("instance='%s'", testServers["tidb"].address),
},
expected: [][]string{
{"2019/08/26 06:19:13.011", "tidb", "INFO", "[test log message tidb 1, foo]"},
Expand All @@ -675,7 +675,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
conditions: []string{
"time>='2019/08/26 06:19:13.011'",
"time<='2019/08/26 06:21:15.011'",
fmt.Sprintf("address='%s'", testServers["tikv"].address),
fmt.Sprintf("instance='%s'", testServers["tikv"].address),
},
expected: [][]string{
{"2019/08/26 06:19:13.011", "tikv", "INFO", "[test log message tikv 1, foo]"},
Expand All @@ -687,7 +687,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
conditions: []string{
"time>='2019/08/26 06:19:13.011'",
"time<='2019/08/26 06:21:15.011'",
fmt.Sprintf("address in ('%s', '%s')", testServers["pd"].address, testServers["tidb"].address),
fmt.Sprintf("instance in ('%s', '%s')", testServers["pd"].address, testServers["tidb"].address),
},
expected: [][]string{
{"2019/08/26 06:19:13.011", "tidb", "INFO", "[test log message tidb 1, foo]"},
Expand Down Expand Up @@ -819,7 +819,7 @@ func (s *testClusterReaderSuite) TestTiDBClusterLog(c *C) {
expectedRow := []string{
restime(row[0]), // time column
row[1], // type column
testServers[row[1]].address, // address column
testServers[row[1]].address, // instance column
strings.ToUpper(sysutil.ParseLogLevel(row[2]).String()), // level column
row[3], // message column
}
Expand Down
Loading