Skip to content

Commit

Permalink
Merge pull request pingcap#19 from lonng/hackthon
Browse files Browse the repository at this point in the history
infoschema: add virtual table cluster_config
  • Loading branch information
qiuyesuifeng authored Oct 24, 2019
2 parents adaea5c + b3703aa commit 43dd3bf
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/gorilla/websocket v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/grpc-gateway v1.5.1 // indirect
github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf
github.com/json-iterator/go v1.1.6 // indirect
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf h1:Ut4tTtPNmInWiEWJRernsWm688R0RN6PFO8sZhwI0sk=
github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf/go.mod h1:4AmD/VxjWcI5SRB0n6szE2A6s2fsNHDLO0nAlMHgfLQ=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs=
Expand Down
12 changes: 7 additions & 5 deletions infoschema/perfschema/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,10 @@ const tableGoroutine = "CREATE TABLE IF NOT EXISTS " + tableNameGoroutines + " (
"LOCATION VARCHAR(512));"

const tableTiKVCpuProfile = "CREATE TABLE IF NOT EXISTS " + tableNameTiKVCpuProfile + " (" +
"FUNCTION VARCHAR(512) NOT NULL," +
"PERCENT_ABS VARCHAR(8) NOT NULL," +
"PERCENT_REL VARCHAR(8) NOT NULL," +
"DEPTH INT(8) NOT NULL," +
"FILE VARCHAR(512) NOT NULL);"
"NAME VARCHAR(16) NOT NULL," +
"ADDRESS VARCHAR(64) NOT NULL," +
"FUNCTION VARCHAR(512) NOT NULL," +
"PERCENT_ABS VARCHAR(8) NOT NULL," +
"PERCENT_REL VARCHAR(8) NOT NULL," +
"DEPTH INT(8) NOT NULL," +
"FILE VARCHAR(512) NOT NULL);"
57 changes: 53 additions & 4 deletions infoschema/perfschema/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"

"github.com/google/pprof/graph"
"github.com/google/pprof/measurement"
"github.com/google/pprof/profile"
"github.com/google/pprof/report"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/sqlexec"
)

type Node struct {
Expand Down Expand Up @@ -200,13 +203,59 @@ func cpuProfileGraph() ([][]types.Datum, error) {
}

// TODO: use cluster info to get all tikv profile
func tikvCpuProfileGraph() ([][]types.Datum, error) {
resp, err := http.Get("http://127.0.0.1:49904/pprof/cpu?seconds=20")
func tikvCpuProfileGraph(ctx sessionctx.Context) ([][]types.Datum, error) {
sql := "SELECT name, address, status_address FROM INFORMATION_SCHEMA.TIDB_CLUSTER_INFO WHERE type='tikv'"
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return profileReaderToDatums(resp.Body)

type result struct{rows [][]types.Datum; err error}

var finalRows [][]types.Datum
wg := sync.WaitGroup{}
ch := make(chan result, len(rows))
for _, row := range rows {
name := row.GetString(0)
address := row.GetString(1)
statusAddr := row.GetString(2)
if len(statusAddr) == 0 {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("tikv node %s(%s) does not contain status address", name, address))
continue
}

wg.Add(1)
go func () {
defer wg.Done()
resp, err := http.Get(fmt.Sprintf("http://%s/pprof/cpu?seconds=20", statusAddr))
if err != nil {
ch <- result{err: err}
return
}
defer resp.Body.Close()
rows, err := profileReaderToDatums(resp.Body)
if err != nil {
ch <- result{err: err}
return
}
// add extra info
for i := range rows {
rows[i] = append(types.MakeDatums(name, address), rows[i]...)
}
ch <- result{rows: rows}
}()
}

wg.Wait()
close(ch)
for result := range ch {
if result.err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
finalRows = append(finalRows, result.rows...)
}
return finalRows, nil
}

func profileGraph(name string) ([][]types.Datum, error) {
Expand Down
2 changes: 1 addition & 1 deletion infoschema/perfschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (vt *perfSchemaTable) getRows(ctx sessionctx.Context, cols []*table.Column)
case tableNameGoroutines:
fullRows, err = goroutinesList()
case tableNameTiKVCpuProfile:
fullRows, err = tikvCpuProfileGraph()
fullRows, err = tikvCpuProfileGraph(ctx)
}
if err != nil {
return
Expand Down
96 changes: 95 additions & 1 deletion infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
"strings"
"sync"
"time"

"github.com/jeremywohl/flatten"
"github.com/pingcap/errors"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -87,6 +89,7 @@ const (
tableTiKVRegionPeers = "TIKV_REGION_PEERS"
tableTiDBServersInfo = "TIDB_SERVERS_INFO"
tableTiDBClusterInfo = "TIDB_CLUSTER_INFO"
tableTiDBClusterConfig = "TIDB_CLUSTER_CONFIG"
)

type columnInfo struct {
Expand Down Expand Up @@ -672,6 +675,14 @@ var tableTiDBClusterInfoCols = []columnInfo{
{"GIT_HASH", mysql.TypeVarchar, 64, 0, nil, nil},
}

var tableTiDBClusterConfigCols = []columnInfo{
{"TYPE", mysql.TypeVarchar, 64, 0, nil, nil},
{"NAME", mysql.TypeVarchar, 64, 0, nil, nil},
{"ADDRESS", mysql.TypeVarchar, 64, 0, nil, nil},
{"KEY", mysql.TypeVarchar, 256, 0, nil, nil},
{"VALUE", mysql.TypeVarchar, 128, 0, nil, nil},
}

func dataForTiKVRegionStatus(ctx sessionctx.Context) (records [][]types.Datum, err error) {
tikvStore, ok := ctx.GetStore().(tikv.Storage)
if !ok {
Expand Down Expand Up @@ -1939,10 +1950,90 @@ func dataForTiDBClusterInfo(ctx sessionctx.Context) ([][]types.Datum, error) {
rows = append(rows, row)
idx++
}

return rows, nil
}

func dataForClusterConfig(ctx sessionctx.Context) ([][]types.Datum, error) {
sql := "SELECT type, name, address, status_address FROM INFORMATION_SCHEMA.TIDB_CLUSTER_INFO"
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return nil, err
}

type result struct {
rows [][]types.Datum
err error
}

var finalRows [][]types.Datum
wg := sync.WaitGroup{}
ch := make(chan result, len(rows))
for _, row := range rows {
typ := row.GetString(0)
name := row.GetString(1)
address := row.GetString(2)
statusAddr := row.GetString(3)
if len(statusAddr) == 0 {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("%s node %s(%s) does not contain status address", typ, name, address))
continue
}

wg.Add(1)
go func() {
defer wg.Done()
var url string
switch typ {
case "pd":
url = fmt.Sprintf("http://%s/pd/api/v1/config", statusAddr)
case "tikv", "tidb":
url = fmt.Sprintf("http://%s/config", statusAddr)
default:
ch <- result{err: errors.Errorf("unknown node: %s(%s)", typ, address)}
return
}
resp, err := http.Get(url)
if err != nil {
ch <- result{err: err}
return
}
defer resp.Body.Close()

var nested map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&nested); err != nil {
ch <- result{err: err}
return
}
data, err := flatten.Flatten(nested, "", flatten.DotStyle)
if err != nil {
ch <- result{err: err}
return
}
var rows [][]types.Datum
for key, val := range data {
rows = append(rows, types.MakeDatums(
typ,
name,
address,
key,
fmt.Sprintf("%v", val),
))
}
ch <- result{rows: rows}
}()
}

wg.Wait()
close(ch)
for result := range ch {
if result.err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
finalRows = append(finalRows, result.rows...)
}
return finalRows, nil
}

var tableNameToColumns = map[string][]columnInfo{
tableSchemata: schemataCols,
tableTables: tablesCols,
Expand Down Expand Up @@ -1985,6 +2076,7 @@ var tableNameToColumns = map[string][]columnInfo{
tableTiKVRegionPeers: tableTiKVRegionPeersCols,
tableTiDBServersInfo: tableTiDBServersInfoCols,
tableTiDBClusterInfo: tableTiDBClusterInfoCols,
tableTiDBClusterConfig: tableTiDBClusterConfigCols,
}

func createInfoSchemaTable(handle *Handle, meta *model.TableInfo) *infoschemaTable {
Expand Down Expand Up @@ -2092,6 +2184,8 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column)
fullRows, err = dataForServersInfo()
case tableTiDBClusterInfo:
fullRows, err = dataForTiDBClusterInfo(ctx)
case tableTiDBClusterConfig:
fullRows, err = dataForClusterConfig(ctx)
}
if err != nil {
return nil, err
Expand Down

0 comments on commit 43dd3bf

Please sign in to comment.