Skip to content

Commit

Permalink
Merge branch 'master' into fix_hashagg_pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Jan 31, 2023
2 parents ef4e5c9 + 4fd710c commit c097d4b
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 20 deletions.
19 changes: 18 additions & 1 deletion domain/historical_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package domain

import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

var (
Expand All @@ -35,7 +38,21 @@ type HistoricalStatsWorker struct {

// SendTblToDumpHistoricalStats send tableID to worker to dump historical stats
func (w *HistoricalStatsWorker) SendTblToDumpHistoricalStats(tableID int64) {
w.tblCH <- tableID
send := enableDumpHistoricalStats.Load()
failpoint.Inject("sendHistoricalStats", func(val failpoint.Value) {
if val.(bool) {
send = true
}
})
if !send {
return
}
select {
case w.tblCH <- tableID:
return
default:
logutil.BgLogger().Warn("discard dump historical stats task", zap.Int64("table-id", tableID))
}
}

// DumpHistoricalStats dump stats by given tableID
Expand Down
59 changes: 59 additions & 0 deletions executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
Expand All @@ -30,6 +31,8 @@ import (
)

func TestRecordHistoryStatsAfterAnalyze(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)")
defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats")
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -150,6 +153,8 @@ func TestRecordHistoryStatsMetaAfterAnalyze(t *testing.T) {
}

func TestGCHistoryStatsAfterDropTable(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)")
defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats")
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
Expand All @@ -174,6 +179,7 @@ func TestGCHistoryStatsAfterDropTable(t *testing.T) {
tableInfo.Meta().ID)).Check(testkit.Rows("1"))
// drop the table and gc stats
tk.MustExec("drop table t")
is = dom.InfoSchema()
h.GCStats(is, 0)

// assert stats_history tables delete the record of dropped table
Expand All @@ -183,7 +189,56 @@ func TestGCHistoryStatsAfterDropTable(t *testing.T) {
tableInfo.Meta().ID)).Check(testkit.Rows("0"))
}

func TestAssertHistoricalStatsAfterAlterTable(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)")
defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats")
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b varchar(10),c int, KEY `idx` (`c`))")
tk.MustExec("analyze table test.t")
is := dom.InfoSchema()
tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
// dump historical stats
h := dom.StatsHandle()
hsWorker := dom.GetHistoricalStatsWorker()
tblID := hsWorker.GetOneHistoricalStatsTable()
err = hsWorker.DumpHistoricalStats(tblID, h)
require.Nil(t, err)

time.Sleep(1 * time.Second)
snapshot := oracle.GoTimeToTS(time.Now())
jsTable, err := h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot)
require.NoError(t, err)
require.NotNil(t, jsTable)
require.NotEqual(t, jsTable.Version, uint64(0))
originVersion := jsTable.Version

// assert historical stats non-change after drop column
tk.MustExec("alter table t drop column b")
h.GCStats(is, 0)
snapshot = oracle.GoTimeToTS(time.Now())
jsTable, err = h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot)
require.NoError(t, err)
require.NotNil(t, jsTable)
require.Equal(t, jsTable.Version, originVersion)

// assert historical stats non-change after drop index
tk.MustExec("alter table t drop index idx")
h.GCStats(is, 0)
snapshot = oracle.GoTimeToTS(time.Now())
jsTable, err = h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot)
require.NoError(t, err)
require.NotNil(t, jsTable)
require.Equal(t, jsTable.Version, originVersion)
}

func TestGCOutdatedHistoryStats(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)")
defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats")
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
Expand Down Expand Up @@ -219,6 +274,8 @@ func TestGCOutdatedHistoryStats(t *testing.T) {
}

func TestPartitionTableHistoricalStats(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)")
defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats")
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
Expand Down Expand Up @@ -246,6 +303,8 @@ PARTITION p0 VALUES LESS THAN (6)
}

func TestDumpHistoricalStatsByTable(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)")
defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats")
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
Expand Down
2 changes: 1 addition & 1 deletion executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func TestSetVar(t *testing.T) {
tk.MustQuery("select @@tidb_enable_tso_follower_proxy").Check(testkit.Rows("0"))
require.Error(t, tk.ExecToErr("set tidb_enable_tso_follower_proxy = 1"))

tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("1"))
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("1"))
tk.MustExec("set global tidb_enable_historical_stats = 0")
Expand Down
102 changes: 102 additions & 0 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -18049,6 +18049,108 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The TTL task statuses in each worker",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 100
},
"hiddenSeries": false,
"id": 294,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "running",
"color": "#5794F2"
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(tidb_server_ttl_task_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type, instance)",
"interval": "",
"legendFormat": "{{ instance }} {{ type }}",
"queryType": "randomWalk",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "TTL Task Count By Status",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "TTL",
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLQueryDuration)
prometheus.MustRegister(TTLProcessedExpiredRowsCounter)
prometheus.MustRegister(TTLJobStatus)
prometheus.MustRegister(TTLTaskStatus)
prometheus.MustRegister(TTLPhaseTime)

prometheus.MustRegister(EMACPUUsageGauge)
Expand Down
8 changes: 8 additions & 0 deletions metrics/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ var (
Help: "The jobs count in the specified status",
}, []string{LblType})

TTLTaskStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "ttl_task_status",
Help: "The tasks count in the specified status",
}, []string{LblType})

TTLPhaseTime = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal, Name: TiDBEnableTelemetry, Value: BoolToOnOff(DefTiDBEnableTelemetry), Type: TypeBool},
{Scope: ScopeGlobal, Name: TiDBEnableHistoricalStats, Value: Off, Type: TypeBool},
{Scope: ScopeGlobal, Name: TiDBEnableHistoricalStats, Value: On, Type: TypeBool},
/* tikv gc metrics */
{Scope: ScopeGlobal, Name: TiDBGCEnable, Value: On, Type: TypeBool, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
return getTiDBTableValue(s, "tikv_gc_enable", On)
Expand Down
7 changes: 5 additions & 2 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error
if err := h.gcTableStats(is, row.GetInt64(0)); err != nil {
return errors.Trace(err)
}
if err := h.gcHistoryStatsFromKV(row.GetInt64(0)); err != nil {
return errors.Trace(err)
_, existed := is.TableByID(row.GetInt64(0))
if !existed {
if err := h.gcHistoryStatsFromKV(row.GetInt64(0)); err != nil {
return errors.Trace(err)
}
}
}
if err := h.ClearOutdatedHistoryStats(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions ttl/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var (

RunningJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "running"})
CancellingJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "cancelling"})

RunningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "running"})
)

func initWorkerPhases(workerType string) map[string]prometheus.Counter {
Expand Down
3 changes: 3 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
],
embed = [":ttlworker"],
flaky = True,
race = "on",
deps = [
"//domain",
"//infoschema",
Expand All @@ -69,13 +70,15 @@ go_test(
"//testkit",
"//ttl/cache",
"//ttl/client",
"//ttl/metrics",
"//ttl/session",
"//types",
"//util/chunk",
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
Expand Down
5 changes: 3 additions & 2 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
SET current_job_id = %?,
current_job_owner_id = %?,
current_job_start_time = %?,
current_job_status = 'waiting',
current_job_status = 'running',
current_job_status_update_time = %?,
current_job_ttl_expire = %?,
current_job_owner_hb_time = %?
Expand Down Expand Up @@ -161,6 +161,7 @@ func (m *JobManager) jobLoop() error {
m.taskManager.resizeWorkersWithSysVar()
for {
m.reportMetrics()
m.taskManager.reportMetrics()
now := se.Now()

select {
Expand Down Expand Up @@ -651,7 +652,7 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca
// information from schema cache directly
tbl: table,

status: cache.JobStatusWaiting,
status: cache.JobStatusRunning,
}
}

Expand Down
Loading

0 comments on commit c097d4b

Please sign in to comment.