Skip to content

Commit

Permalink
group rows before update
Browse files Browse the repository at this point in the history
  • Loading branch information
Haibin Xie committed Jul 24, 2018
1 parent bf35a1b commit c1ccc03
Showing 1 changed file with 68 additions and 67 deletions.
135 changes: 68 additions & 67 deletions statistics/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/sqlexec"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand Down Expand Up @@ -473,97 +473,98 @@ func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error {
if len(rows) == 0 || err != nil {
return errors.Trace(err)
}
tableID, histID, isIndex := int64(-1), int64(-1), int64(-1)
q := &QueryFeedback{}
var (
cms *CMSketch
hist *Histogram
col *Column
idx *Index
PKIsHandle bool
)
for _, row := range rows {
// merge into previous feedback
if row.GetInt64(0) == tableID && row.GetInt64(1) == histID && row.GetInt64(2) == isIndex {
err = decodeFeedback(row.GetBytes(3), q, cms)
if err != nil {
log.Debugf("decode feedback failed, err: %v", errors.ErrorStack(err))
}
continue
}
// Update the NDV of primary key column.
if col != nil && mysql.HasPriKeyFlag(col.Info.Flag) && PKIsHandle {
hist.NDV = int64(hist.totalRowCount())
col = nil
}
err = h.dumpStatsUpdateToKV(tableID, int(isIndex), histID, q, hist, cms)
if err != nil {
return errors.Trace(err)

var groupedRows [][]types.Row
preIdx := 0
tableID, histID, isIndex := rows[0].GetInt64(0), rows[0].GetInt64(1), rows[0].GetInt64(2)
for i := 1; i < len(rows); i++ {
row := rows[i]
if row.GetInt64(0) != tableID || row.GetInt64(1) != histID || row.GetInt64(2) != isIndex {
groupedRows = append(groupedRows, rows[preIdx:i])
tableID, histID, isIndex = row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
preIdx = i
}
// initialize new feedback
tableID, histID, isIndex = row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
}
groupedRows = append(groupedRows, rows[preIdx:])

q := &QueryFeedback{}
for _, rows := range groupedRows {
tableID, histID, isIndex = rows[0].GetInt64(0), rows[0].GetInt64(1), rows[0].GetInt64(2)
table, ok := is.TableByID(tableID)
// The table has been deleted.
if !ok {
hist, cms = nil, nil
if err := h.deleteOutdatedFeedback(tableID, histID, isIndex); err != nil {
return errors.Trace(err)
}
continue
}
PKIsHandle = table.Meta().PKIsHandle
tbl := h.GetTableStats(table.Meta())
var cms *CMSketch
var hist *Histogram
if isIndex == 1 {
idx, ok = tbl.Indices[histID]
if !ok {
hist, cms = nil, nil
continue
idx, ok := tbl.Indices[histID]
if ok {
idxHist := idx.Histogram
hist = &idxHist
cms = idx.CMSketch.copy()
}
idxHist := idx.Histogram
hist = &idxHist
cms = idx.CMSketch.copy()
} else {
col, ok = tbl.Columns[histID]
if !ok {
hist, cms = nil, nil
continue
col, ok := tbl.Columns[histID]
if ok {
colHist := col.Histogram
hist = &colHist
}
colHist := col.Histogram
hist = &colHist
cms = nil
}
err = decodeFeedback(row.GetBytes(3), q, cms)
// The column or index has been deleted.
if hist == nil {
if err := h.deleteOutdatedFeedback(tableID, histID, isIndex); err != nil {
return errors.Trace(err)
}
continue
}
for _, row := range rows {
err = decodeFeedback(row.GetBytes(3), q, cms)
if err != nil {
log.Debugf("decode feedback failed, err: %v", errors.ErrorStack(err))
}
}
// Update the NDV of primary key column.
if table.Meta().PKIsHandle && isIndex == 0 {
hist.NDV = int64(hist.totalRowCount())
}
err = h.dumpStatsUpdateToKV(tableID, isIndex, q, hist, cms)
if err != nil {
log.Debugf("decode feedback failed, err: %v", errors.ErrorStack(err))
return errors.Trace(err)
}
}
// Update the NDV of primary key column.
if col != nil && mysql.HasPriKeyFlag(col.Info.Flag) && PKIsHandle {
hist.NDV = int64(hist.totalRowCount())
}
// dump the last feedback into kv
err = h.dumpStatsUpdateToKV(tableID, int(isIndex), histID, q, hist, cms)
return nil
}

func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error {
h.mu.Lock()
h.mu.ctx.GetSessionVars().BatchDelete = true
sql := fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d and hist_id = %d and is_index = %d", tableID, histID, isIndex)
_, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
h.mu.ctx.GetSessionVars().BatchDelete = false
h.mu.Unlock()
return errors.Trace(err)
}

func (h *Handle) dumpStatsUpdateToKV(tableID int64, isIndex int, histID int64, q *QueryFeedback, hist *Histogram, cms *CMSketch) (err error) {
func (h *Handle) dumpStatsUpdateToKV(tableID, isIndex int64, q *QueryFeedback, hist *Histogram, cms *CMSketch) (err error) {
defer func() {
if err != nil {
metrics.UpdateStatsCounter.WithLabelValues(metrics.LblError).Inc()
} else {
metrics.UpdateStatsCounter.WithLabelValues(metrics.LblOK).Inc()
}
}()
if hist != nil {
hist = UpdateHistogram(hist, q)
err = h.SaveStatsToStorage(tableID, -1, isIndex, hist, cms, 0)
if err != nil {
return errors.Trace(err)
}
}
h.mu.Lock()
h.mu.ctx.GetSessionVars().BatchDelete = true
sql := fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d and hist_id = %d and is_index = %d", tableID, histID, isIndex)
_, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
h.mu.ctx.GetSessionVars().BatchDelete = false
h.mu.Unlock()
hist = UpdateHistogram(hist, q)
q.feedback = q.feedback[:0]
err = h.SaveStatsToStorage(tableID, -1, int(isIndex), hist, cms, 0)
if err != nil {
return errors.Trace(err)
}
err = h.deleteOutdatedFeedback(tableID, hist.ID, isIndex)
return errors.Trace(err)
}

Expand Down

0 comments on commit c1ccc03

Please sign in to comment.