Skip to content

Commit

Permalink
retrieve cluster config
Browse files Browse the repository at this point in the history
Signed-off-by: Lonng <heng@lonng.org>
  • Loading branch information
lonng committed Oct 23, 2019
1 parent 456ab97 commit b3703aa
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 35 deletions.
12 changes: 7 additions & 5 deletions infoschema/perfschema/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,10 @@ const tableGoroutine = "CREATE TABLE IF NOT EXISTS " + tableNameGoroutines + " (
"LOCATION VARCHAR(512));"

const tableTiKVCpuProfile = "CREATE TABLE IF NOT EXISTS " + tableNameTiKVCpuProfile + " (" +
"FUNCTION VARCHAR(512) NOT NULL," +
"PERCENT_ABS VARCHAR(8) NOT NULL," +
"PERCENT_REL VARCHAR(8) NOT NULL," +
"DEPTH INT(8) NOT NULL," +
"FILE VARCHAR(512) NOT NULL);"
"NAME VARCHAR(16) NOT NULL," +
"ADDRESS VARCHAR(64) NOT NULL," +
"FUNCTION VARCHAR(512) NOT NULL," +
"PERCENT_ABS VARCHAR(8) NOT NULL," +
"PERCENT_REL VARCHAR(8) NOT NULL," +
"DEPTH INT(8) NOT NULL," +
"FILE VARCHAR(512) NOT NULL);"
57 changes: 53 additions & 4 deletions infoschema/perfschema/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"

"github.com/google/pprof/graph"
"github.com/google/pprof/measurement"
"github.com/google/pprof/profile"
"github.com/google/pprof/report"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/sqlexec"
)

type Node struct {
Expand Down Expand Up @@ -200,13 +203,59 @@ func cpuProfileGraph() ([][]types.Datum, error) {
}

// TODO: use cluster info to get all tikv profile
func tikvCpuProfileGraph() ([][]types.Datum, error) {
resp, err := http.Get("http://127.0.0.1:49904/pprof/cpu?seconds=20")
func tikvCpuProfileGraph(ctx sessionctx.Context) ([][]types.Datum, error) {
sql := "SELECT name, address, status_address FROM INFORMATION_SCHEMA.TIDB_CLUSTER_INFO WHERE type='tikv'"
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return profileReaderToDatums(resp.Body)

type result struct{rows [][]types.Datum; err error}

var finalRows [][]types.Datum
wg := sync.WaitGroup{}
ch := make(chan result, len(rows))
for _, row := range rows {
name := row.GetString(0)
address := row.GetString(1)
statusAddr := row.GetString(2)
if len(statusAddr) == 0 {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("tikv node %s(%s) does not contain status address", name, address))
continue
}

wg.Add(1)
go func () {
defer wg.Done()
resp, err := http.Get(fmt.Sprintf("http://%s/pprof/cpu?seconds=20", statusAddr))
if err != nil {
ch <- result{err: err}
return
}
defer resp.Body.Close()
rows, err := profileReaderToDatums(resp.Body)
if err != nil {
ch <- result{err: err}
return
}
// add extra info
for i := range rows {
rows[i] = append(types.MakeDatums(name, address), rows[i]...)
}
ch <- result{rows: rows}
}()
}

wg.Wait()
close(ch)
for result := range ch {
if result.err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
finalRows = append(finalRows, result.rows...)
}
return finalRows, nil
}

func profileGraph(name string) ([][]types.Datum, error) {
Expand Down
2 changes: 1 addition & 1 deletion infoschema/perfschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (vt *perfSchemaTable) getRows(ctx sessionctx.Context, cols []*table.Column)
case tableNameGoroutines:
fullRows, err = goroutinesList()
case tableNameTiKVCpuProfile:
fullRows, err = tikvCpuProfileGraph()
fullRows, err = tikvCpuProfileGraph(ctx)
}
if err != nil {
return
Expand Down
102 changes: 77 additions & 25 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,10 @@ var tableTiDBClusterInfoCols = []columnInfo{
}

var tableTiDBClusterConfigCols = []columnInfo{
{"SERVER_TYPE", mysql.TypeVarchar, 8, 0, nil, nil},
{"INSTANCE", mysql.TypeVarchar, 64, 0, nil, nil},
{"NAME", mysql.TypeVarchar, 256, 0, nil, nil},
{"TYPE", mysql.TypeVarchar, 64, 0, nil, nil},
{"NAME", mysql.TypeVarchar, 64, 0, nil, nil},
{"ADDRESS", mysql.TypeVarchar, 64, 0, nil, nil},
{"KEY", mysql.TypeVarchar, 256, 0, nil, nil},
{"VALUE", mysql.TypeVarchar, 128, 0, nil, nil},
}

Expand Down Expand Up @@ -1952,34 +1953,85 @@ func dataForTiDBClusterInfo(ctx sessionctx.Context) ([][]types.Datum, error) {
return rows, nil
}

// TODO: poc for config flatten, fetch all servers
func dataForClusterConfig() ([][]types.Datum, error) {
resp, err := http.Get("http://127.0.0.1:49904/config")
func dataForClusterConfig(ctx sessionctx.Context) ([][]types.Datum, error) {
sql := "SELECT type, name, address, status_address FROM INFORMATION_SCHEMA.TIDB_CLUSTER_INFO"
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return nil, err
}
defer resp.Body.Close()

var nested map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&nested); err != nil {
return nil, err
type result struct {
rows [][]types.Datum
err error
}
data, err := flatten.Flatten(nested, "", flatten.DotStyle)
if err != nil {
return nil, err

var finalRows [][]types.Datum
wg := sync.WaitGroup{}
ch := make(chan result, len(rows))
for _, row := range rows {
typ := row.GetString(0)
name := row.GetString(1)
address := row.GetString(2)
statusAddr := row.GetString(3)
if len(statusAddr) == 0 {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("%s node %s(%s) does not contain status address", typ, name, address))
continue
}

wg.Add(1)
go func() {
defer wg.Done()
var url string
switch typ {
case "pd":
url = fmt.Sprintf("http://%s/pd/api/v1/config", statusAddr)
case "tikv", "tidb":
url = fmt.Sprintf("http://%s/config", statusAddr)
default:
ch <- result{err: errors.Errorf("unknown node: %s(%s)", typ, address)}
return
}
resp, err := http.Get(url)
if err != nil {
ch <- result{err: err}
return
}
defer resp.Body.Close()

var nested map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&nested); err != nil {
ch <- result{err: err}
return
}
data, err := flatten.Flatten(nested, "", flatten.DotStyle)
if err != nil {
ch <- result{err: err}
return
}
var rows [][]types.Datum
for key, val := range data {
rows = append(rows, types.MakeDatums(
typ,
name,
address,
key,
fmt.Sprintf("%v", val),
))
}
ch <- result{rows: rows}
}()
}
var rows [][]types.Datum
instance := types.NewStringDatum("127.0.0.1:49904")
serverTp := types.NewStringDatum("TiKV")
for key, val := range data {
rows = append(rows, []types.Datum{
serverTp,
instance,
types.NewStringDatum(key),
types.NewStringDatum(fmt.Sprintf("%v", val)),
})

wg.Wait()
close(ch)
for result := range ch {
if result.err != nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
finalRows = append(finalRows, result.rows...)
}
return rows, nil
return finalRows, nil
}

var tableNameToColumns = map[string][]columnInfo{
Expand Down Expand Up @@ -2133,7 +2185,7 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column)
case tableTiDBClusterInfo:
fullRows, err = dataForTiDBClusterInfo(ctx)
case tableTiDBClusterConfig:
fullRows, err = dataForClusterConfig()
fullRows, err = dataForClusterConfig(ctx)
}
if err != nil {
return nil, err
Expand Down

0 comments on commit b3703aa

Please sign in to comment.