diff --git a/go.mod b/go.mod index a8ecf223bcd31..47a07309c45f6 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/gorilla/websocket v1.4.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 github.com/grpc-ecosystem/grpc-gateway v1.5.1 // indirect + github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf github.com/json-iterator/go v1.1.6 // indirect github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index d4a6b390e87c7..96ffad5590c69 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf h1:Ut4tTtPNmInWiEWJRernsWm688R0RN6PFO8sZhwI0sk= +github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf/go.mod h1:4AmD/VxjWcI5SRB0n6szE2A6s2fsNHDLO0nAlMHgfLQ= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= diff --git a/infoschema/perfschema/const.go b/infoschema/perfschema/const.go index 4c4a3335cf1d4..938884bfab2ca 100644 --- a/infoschema/perfschema/const.go +++ b/infoschema/perfschema/const.go @@ -447,8 +447,10 @@ const tableGoroutine = "CREATE TABLE IF NOT EXISTS " + tableNameGoroutines + " ( "LOCATION VARCHAR(512));" const tableTiKVCpuProfile = "CREATE TABLE IF NOT EXISTS " + tableNameTiKVCpuProfile + " (" + -"FUNCTION VARCHAR(512) NOT NULL," + -"PERCENT_ABS VARCHAR(8) NOT NULL," + -"PERCENT_REL VARCHAR(8) NOT NULL," + -"DEPTH INT(8) NOT NULL," + -"FILE VARCHAR(512) NOT NULL);" \ No newline at end of file + "NAME VARCHAR(16) NOT NULL," + + "ADDRESS VARCHAR(64) NOT NULL," + + "FUNCTION VARCHAR(512) NOT NULL," + + "PERCENT_ABS VARCHAR(8) NOT NULL," + + "PERCENT_REL VARCHAR(8) NOT NULL," + + "DEPTH INT(8) NOT NULL," + + "FILE VARCHAR(512) NOT NULL);" diff --git a/infoschema/perfschema/profile.go b/infoschema/perfschema/profile.go index 5f840366059c5..4924d2cf561e1 100644 --- a/infoschema/perfschema/profile.go +++ b/infoschema/perfschema/profile.go @@ -21,6 +21,7 @@ import ( "runtime/pprof" "strconv" "strings" + "sync" "time" "github.com/google/pprof/graph" @@ -28,7 +29,9 @@ import ( "github.com/google/pprof/profile" "github.com/google/pprof/report" "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" ) type Node struct { @@ -200,13 +203,59 @@ func cpuProfileGraph() ([][]types.Datum, error) { } // TODO: use cluster info to get all tikv profile -func tikvCpuProfileGraph() ([][]types.Datum, error) { - resp, err := http.Get("http://127.0.0.1:49904/pprof/cpu?seconds=20") +func tikvCpuProfileGraph(ctx sessionctx.Context) ([][]types.Datum, error) { + sql := "SELECT name, address, status_address FROM INFORMATION_SCHEMA.TIDB_CLUSTER_INFO WHERE type='tikv'" + rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) if err != nil { return nil, err } - defer resp.Body.Close() - return profileReaderToDatums(resp.Body) + + type result struct{rows [][]types.Datum; err error} + + var finalRows [][]types.Datum + wg := sync.WaitGroup{} + ch := make(chan result, len(rows)) + for _, row := range rows { + name := row.GetString(0) + address := row.GetString(1) + statusAddr := row.GetString(2) + if len(statusAddr) == 0 { + ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("tikv node %s(%s) does not contain status address", name, address)) + continue + } + + wg.Add(1) + go func () { + defer wg.Done() + resp, err := http.Get(fmt.Sprintf("http://%s/pprof/cpu?seconds=20", statusAddr)) + if err != nil { + ch <- result{err: err} + return + } + defer resp.Body.Close() + rows, err := profileReaderToDatums(resp.Body) + if err != nil { + ch <- result{err: err} + return + } + // add extra info + for i := range rows { + rows[i] = append(types.MakeDatums(name, address), rows[i]...) + } + ch <- result{rows: rows} + }() + } + + wg.Wait() + close(ch) + for result := range ch { + if result.err != nil { + ctx.GetSessionVars().StmtCtx.AppendWarning(err) + continue + } + finalRows = append(finalRows, result.rows...) + } + return finalRows, nil } func profileGraph(name string) ([][]types.Datum, error) { diff --git a/infoschema/perfschema/tables.go b/infoschema/perfschema/tables.go index 5dc0503873f7b..6dfc5e8f65d38 100644 --- a/infoschema/perfschema/tables.go +++ b/infoschema/perfschema/tables.go @@ -110,7 +110,7 @@ func (vt *perfSchemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) case tableNameGoroutines: fullRows, err = goroutinesList() case tableNameTiKVCpuProfile: - fullRows, err = tikvCpuProfileGraph() + fullRows, err = tikvCpuProfileGraph(ctx) } if err != nil { return diff --git a/infoschema/tables.go b/infoschema/tables.go index 2d9cd9ef1db9f..583ed9eb0cc63 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -17,11 +17,13 @@ import ( "context" "encoding/json" "fmt" + "net/http" "sort" "strings" "sync" "time" + "github.com/jeremywohl/flatten" "github.com/pingcap/errors" "github.com/pingcap/parser/charset" "github.com/pingcap/parser/model" @@ -87,6 +89,7 @@ const ( tableTiKVRegionPeers = "TIKV_REGION_PEERS" tableTiDBServersInfo = "TIDB_SERVERS_INFO" tableTiDBClusterInfo = "TIDB_CLUSTER_INFO" + tableTiDBClusterConfig = "TIDB_CLUSTER_CONFIG" ) type columnInfo struct { @@ -672,6 +675,14 @@ var tableTiDBClusterInfoCols = []columnInfo{ {"GIT_HASH", mysql.TypeVarchar, 64, 0, nil, nil}, } +var tableTiDBClusterConfigCols = []columnInfo{ + {"TYPE", mysql.TypeVarchar, 64, 0, nil, nil}, + {"NAME", mysql.TypeVarchar, 64, 0, nil, nil}, + {"ADDRESS", mysql.TypeVarchar, 64, 0, nil, nil}, + {"KEY", mysql.TypeVarchar, 256, 0, nil, nil}, + {"VALUE", mysql.TypeVarchar, 128, 0, nil, nil}, +} + func dataForTiKVRegionStatus(ctx sessionctx.Context) (records [][]types.Datum, err error) { tikvStore, ok := ctx.GetStore().(tikv.Storage) if !ok { @@ -1939,10 +1950,90 @@ func dataForTiDBClusterInfo(ctx sessionctx.Context) ([][]types.Datum, error) { rows = append(rows, row) idx++ } - return rows, nil } +func dataForClusterConfig(ctx sessionctx.Context) ([][]types.Datum, error) { + sql := "SELECT type, name, address, status_address FROM INFORMATION_SCHEMA.TIDB_CLUSTER_INFO" + rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) + if err != nil { + return nil, err + } + + type result struct { + rows [][]types.Datum + err error + } + + var finalRows [][]types.Datum + wg := sync.WaitGroup{} + ch := make(chan result, len(rows)) + for _, row := range rows { + typ := row.GetString(0) + name := row.GetString(1) + address := row.GetString(2) + statusAddr := row.GetString(3) + if len(statusAddr) == 0 { + ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("%s node %s(%s) does not contain status address", typ, name, address)) + continue + } + + wg.Add(1) + go func() { + defer wg.Done() + var url string + switch typ { + case "pd": + url = fmt.Sprintf("http://%s/pd/api/v1/config", statusAddr) + case "tikv", "tidb": + url = fmt.Sprintf("http://%s/config", statusAddr) + default: + ch <- result{err: errors.Errorf("unknown node: %s(%s)", typ, address)} + return + } + resp, err := http.Get(url) + if err != nil { + ch <- result{err: err} + return + } + defer resp.Body.Close() + + var nested map[string]interface{} + if err = json.NewDecoder(resp.Body).Decode(&nested); err != nil { + ch <- result{err: err} + return + } + data, err := flatten.Flatten(nested, "", flatten.DotStyle) + if err != nil { + ch <- result{err: err} + return + } + var rows [][]types.Datum + for key, val := range data { + rows = append(rows, types.MakeDatums( + typ, + name, + address, + key, + fmt.Sprintf("%v", val), + )) + } + ch <- result{rows: rows} + }() + } + + wg.Wait() + close(ch) + for result := range ch { + if result.err != nil { + ctx.GetSessionVars().StmtCtx.AppendWarning(err) + continue + } + finalRows = append(finalRows, result.rows...) + } + return finalRows, nil +} + var tableNameToColumns = map[string][]columnInfo{ tableSchemata: schemataCols, tableTables: tablesCols, @@ -1985,6 +2076,7 @@ var tableNameToColumns = map[string][]columnInfo{ tableTiKVRegionPeers: tableTiKVRegionPeersCols, tableTiDBServersInfo: tableTiDBServersInfoCols, tableTiDBClusterInfo: tableTiDBClusterInfoCols, + tableTiDBClusterConfig: tableTiDBClusterConfigCols, } func createInfoSchemaTable(handle *Handle, meta *model.TableInfo) *infoschemaTable { @@ -2092,6 +2184,8 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) fullRows, err = dataForServersInfo() case tableTiDBClusterInfo: fullRows, err = dataForTiDBClusterInfo(ctx) + case tableTiDBClusterConfig: + fullRows, err = dataForClusterConfig(ctx) } if err != nil { return nil, err