Skip to content

Commit

Permalink
statistics: support merge global topn in concurrency (#38358)
Browse files Browse the repository at this point in the history
ref #35142
  • Loading branch information
Yisaer authored Oct 18, 2022
1 parent d2295ca commit e8d2659
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 6 deletions.
76 changes: 76 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"fmt"
"io/ioutil"
"strconv"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -338,3 +339,78 @@ func TestAnalyzePartitionTableForFloat(t *testing.T) {
}
tk.MustExec("analyze table t1")
}

func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec("use test")
tk.MustExec("create table t(id int) partition by hash(id) partitions 4")
testcases := []struct {
concurrency string
}{
{
concurrency: "1",
},
{
concurrency: "2",
},
{
concurrency: "3",
},
{
concurrency: "4",
},
{
concurrency: "5",
},
}
// assert empty table
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'")
}

for i := 1; i <= 500; i++ {
for j := 1; j <= 20; j++ {
tk.MustExec(fmt.Sprintf("insert into t (id) values (%v)", j))
}
}
var expected [][]interface{}
for i := 1; i <= 20; i++ {
expected = append(expected, []interface{}{
strconv.FormatInt(int64(i), 10), "500",
})
}
testcases = []struct {
concurrency string
}{
{
concurrency: "1",
},
{
concurrency: "2",
},
{
concurrency: "3",
},
{
concurrency: "4",
},
{
concurrency: "5",
},
}
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'").CheckAt([]int{5, 6}, expected)
}
}
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,9 @@ type SessionVars struct {
// LastPlanReplayerToken indicates the last plan replayer token
LastPlanReplayerToken string

// AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats
AnalyzePartitionMergeConcurrency int

HookContext
}

Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1917,6 +1917,13 @@ var defaultSysVars = []*SysVar{
s.RangeMaxSize = TidbOptInt64(val, DefTiDBOptRangeMaxSize)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBMergePartitionStatsConcurrency, Value: strconv.FormatInt(DefTiDBMergePartitionStatsConcurrency, 10), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency,
SetSession: func(s *SessionVars, val string) error {
s.AnalyzePartitionMergeConcurrency = TidbOptInt(val, DefTiDBMergePartitionStatsConcurrency)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,9 @@ const (
// ranges would exceed the limit, it chooses less accurate ranges such as full range. 0 indicates that there is no memory
// limit for ranges.
TiDBOptRangeMaxSize = "tidb_opt_range_max_size"

// TiDBMergePartitionStatsConcurrency indicates the concurrecny when merge partition stats into global stats
TiDBMergePartitionStatsConcurrency = "tidb_merge_partition_stats_concurrency"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -1057,6 +1060,7 @@ const (
DefTiDBOptRangeMaxSize = 0
DefTiDBCostModelVer = 1
DefTiDBServerMemoryLimitSessMinSize = 128 << 20
DefTiDBMergePartitionStatsConcurrency = 1
DefTiDBServerMemoryLimitGCTrigger = 0.7
DefTiDBEnableGOGCTuner = true
)
Expand Down
21 changes: 19 additions & 2 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -729,7 +730,7 @@ func NewTopN(n int) *TopN {
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) {
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) {
if checkEmptyTopNs(topNs) {
return nil, nil, hists, nil
}
Expand Down Expand Up @@ -781,7 +782,7 @@ func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs [
var err error
if types.IsTypeTime(hists[0].Tp.GetType()) {
// handle datetime values specially since they are encoded to int and we'll get int values if using DecodeOne.
_, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), sc.TimeZone)
_, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), loc)
} else if types.IsTypeFloat(hists[0].Tp.GetType()) {
_, d, err = codec.DecodeAsFloat32(val.Encoded, hists[0].Tp.GetType())
} else {
Expand Down Expand Up @@ -866,6 +867,22 @@ func checkEmptyTopNs(topNs []*TopN) bool {
return count == 0
}

// SortTopnMeta sort topnMeta
func SortTopnMeta(topnMetas []TopNMeta) []TopNMeta {
slices.SortFunc(topnMetas, func(i, j TopNMeta) bool {
if i.Count != j.Count {
return i.Count > j.Count
}
return bytes.Compare(i.Encoded, j.Encoded) < 0
})
return topnMetas
}

// GetMergedTopNFromSortedSlice returns merged topn
func GetMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) {
return getMergedTopNFromSortedSlice(sorted, n)
}

func getMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) {
slices.SortFunc(sorted, func(i, j TopNMeta) bool {
if i.Count != j.Count {
Expand Down
112 changes: 108 additions & 4 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package handle

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -28,7 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
ddlUtil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
Expand All @@ -56,6 +58,9 @@ import (
const (
// TiDBGlobalStats represents the global-stats for a partitioned table.
TiDBGlobalStats = "global"

// maxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
maxPartitionMergeBatchSize = 256
)

// Handle can update stats info periodically.
Expand Down Expand Up @@ -83,7 +88,7 @@ type Handle struct {

// ddlEventCh is a channel to notify a ddl operation has happened.
// It is sent only by owner or the drop stats executor, and read by stats handle.
ddlEventCh chan *util.Event
ddlEventCh chan *ddlUtil.Event
// listHead contains all the stats collector required by session.
listHead *SessionStatsCollector
// globalMap contains all the delta map from collectors when we dump them to KV.
Expand Down Expand Up @@ -197,7 +202,7 @@ type sessionPool interface {
func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tracker sessionctx.SysProcTracker, serverIDGetter func() uint64) (*Handle, error) {
cfg := config.GetGlobalConfig()
handle := &Handle{
ddlEventCh: make(chan *util.Event, 100),
ddlEventCh: make(chan *ddlUtil.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)},
pool: pool,
Expand Down Expand Up @@ -547,7 +552,8 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
// Because after merging TopN, some numbers will be left.
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
globalStats.TopN[i], popedTopN, allHg[i], err = statistics.MergePartTopN2GlobalTopN(sc.GetSessionVars().StmtCtx, sc.GetSessionVars().AnalyzeVersion, allTopN[i], uint32(opts[ast.AnalyzeOptNumTopN]), allHg[i], isIndex == 1)
wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], popedTopN, allHg[i], err = h.mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
if err != nil {
return
}
Expand Down Expand Up @@ -579,6 +585,104 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
return
}

func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency
// use original method if concurrency equals 1 or for version1
if mergeConcurrency < 2 {
return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex)
}
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
batchSize = 1
} else if batchSize > maxPartitionMergeBatchSize {
batchSize = maxPartitionMergeBatchSize
}
return h.mergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex)
}

// mergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
// the partition size for each worker to solve it
func (h *Handle) mergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
if len(wrapper.AllTopN) < mergeConcurrency {
mergeConcurrency = len(wrapper.AllTopN)
}
tasks := make([]*statistics.TopnStatsMergeTask, 0)
for start := 0; start < len(wrapper.AllTopN); {
end := start + mergeBatchSize
if end > len(wrapper.AllTopN) {
end = len(wrapper.AllTopN)
}
task := statistics.NewTopnStatsMergeTask(start, end)
tasks = append(tasks, task)
start = end
}
var wg util.WaitGroupWrapper
taskNum := len(tasks)
taskCh := make(chan *statistics.TopnStatsMergeTask, taskNum)
respCh := make(chan *statistics.TopnStatsMergeResponse, taskNum)
for i := 0; i < mergeConcurrency; i++ {
worker := statistics.NewTopnStatsMergeWorker(taskCh, respCh, wrapper)
wg.Run(func() {
worker.Run(timeZone, isIndex, n, version)
})
}
for _, task := range tasks {
taskCh <- task
}
close(taskCh)
wg.Wait()
close(respCh)
resps := make([]*statistics.TopnStatsMergeResponse, 0)

// handle Error
hasErr := false
for resp := range respCh {
if resp.Err != nil {
hasErr = true
}
resps = append(resps, resp)
}
if hasErr {
errMsg := make([]string, 0)
for _, resp := range resps {
if resp.Err != nil {
errMsg = append(errMsg, resp.Err.Error())
}
}
return nil, nil, nil, errors.New(strings.Join(errMsg, ","))
}

// fetch the response from each worker and merge them into global topn stats
sorted := make([]statistics.TopNMeta, 0, mergeConcurrency)
leftTopn := make([]statistics.TopNMeta, 0)
for _, resp := range resps {
if resp.TopN != nil {
sorted = append(sorted, resp.TopN.TopN...)
}
leftTopn = append(leftTopn, resp.PopedTopn...)
for i, removeTopn := range resp.RemoveVals {
// Remove the value from the Hists.
if len(removeTopn) > 0 {
tmp := removeTopn
slices.SortFunc(tmp, func(i, j statistics.TopNMeta) bool {
cmpResult := bytes.Compare(i.Encoded, j.Encoded)
return cmpResult < 0
})
wrapper.AllHg[i].RemoveVals(tmp)
}
}
}

globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n)
return globalTopN, statistics.SortTopnMeta(append(leftTopn, popedTopn...)), wrapper.AllHg, nil
}

func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) {
if is.SchemaMetaVersion() != h.mu.schemaVersion {
h.mu.schemaVersion = is.SchemaMetaVersion()
Expand Down
Loading

0 comments on commit e8d2659

Please sign in to comment.