Skip to content

Commit

Permalink
add metrics for task manager
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Jan 30, 2023
1 parent fd5e675 commit ae5f65d
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 12 deletions.
102 changes: 102 additions & 0 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -18049,6 +18049,108 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The TTL task statuses in each worker",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 100
},
"hiddenSeries": false,
"id": 294,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "running",
"color": "#5794F2"
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(tidb_server_ttl_task_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type, instance)",
"interval": "",
"legendFormat": "{{ instance }} {{ type }}",
"queryType": "randomWalk",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "TTL Task Count By Status",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "TTL",
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLQueryDuration)
prometheus.MustRegister(TTLProcessedExpiredRowsCounter)
prometheus.MustRegister(TTLJobStatus)
prometheus.MustRegister(TTLTaskStatus)
prometheus.MustRegister(TTLPhaseTime)

prometheus.MustRegister(EMACPUUsageGauge)
Expand Down
8 changes: 8 additions & 0 deletions metrics/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ var (
Help: "The jobs count in the specified status",
}, []string{LblType})

TTLTaskStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "ttl_task_status",
Help: "The tasks count in the specified status",
}, []string{LblType})

TTLPhaseTime = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Expand Down
2 changes: 2 additions & 0 deletions ttl/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var (

RunningJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "running"})
CancellingJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "cancelling"})

RunningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "running"})
)

func initWorkerPhases(workerType string) map[string]prometheus.Counter {
Expand Down
3 changes: 3 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
],
embed = [":ttlworker"],
flaky = True,
race = "on",
deps = [
"//domain",
"//infoschema",
Expand All @@ -69,13 +70,15 @@ go_test(
"//testkit",
"//ttl/cache",
"//ttl/client",
"//ttl/metrics",
"//ttl/session",
"//types",
"//util/chunk",
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
Expand Down
5 changes: 3 additions & 2 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
SET current_job_id = %?,
current_job_owner_id = %?,
current_job_start_time = %?,
current_job_status = 'waiting',
current_job_status = 'running',
current_job_status_update_time = %?,
current_job_ttl_expire = %?,
current_job_owner_hb_time = %?
Expand Down Expand Up @@ -161,6 +161,7 @@ func (m *JobManager) jobLoop() error {
m.taskManager.resizeWorkersWithSysVar()
for {
m.reportMetrics()
m.taskManager.reportMetrics()
now := se.Now()

select {
Expand Down Expand Up @@ -651,7 +652,7 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca
// information from schema cache directly
tbl: table,

status: cache.JobStatusWaiting,
status: cache.JobStatusRunning,
}
}

Expand Down
40 changes: 40 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/client"
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util/logutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -574,3 +576,41 @@ func TestGCTTLHistory(t *testing.T) {
ttlworker.DoGC(context.TODO(), se)
tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5"))
}

func TestJobMetrics(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)

waitAndStopTTLManager(t, dom)

now := time.Now()
tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'")
table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL)

se := sessionFactory()
m := ttlworker.NewJobManager("manager-1", nil, store, nil)
m.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, m.InfoSchemaCache().Update(se))
// schedule jobs
m.RescheduleJobs(se, now)
// set the worker to be empty, so none of the tasks will be scheduled
m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{})

sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID)
rows, err := se.ExecuteSQL(ctx, sql, args...)
require.NoError(t, err)
tableStatus, err := cache.RowToTableStatus(se, rows[0])
require.NoError(t, err)

require.NotEmpty(t, tableStatus.CurrentJobID)
require.Equal(t, "manager-1", tableStatus.CurrentJobOwnerID)
require.Equal(t, cache.JobStatusRunning, tableStatus.CurrentJobStatus)

m.ReportMetrics()
out := &dto.Metric{}
require.NoError(t, metrics.RunningJobsCnt.Write(out))
require.Equal(t, float64(1), out.GetGauge().GetValue())
}
5 changes: 5 additions & 0 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, no
return m.updateHeartBeat(ctx, se, now)
}

// ReportMetrics is an exported version of reportMetrics
func (m *JobManager) ReportMetrics() {
m.reportMetrics()
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) {
j.finish(se, now, summary)
}
Expand Down
48 changes: 38 additions & 10 deletions ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/multierr"
Expand Down Expand Up @@ -327,23 +328,17 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
}

err := se.RunInTxn(ctx, func() error {
sql, args := cache.SelectFromTTLTaskWithID(task.JobID, task.ScanID)
rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return errors.Errorf("didn't find task with jobID: %s, scanID: %d", task.JobID, task.ScanID)
}
task, err = cache.RowToTTLTask(se, rows[0])
var err error

task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, true)
if err != nil {
return err
}
if task.OwnerID != "" && !task.OwnerHBTime.Add(2*jobManagerLoopTickerInterval).Before(now) {
return errors.New("task is already scheduled")
}

sql, args = setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now)
sql, args := setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
Expand All @@ -355,6 +350,12 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
return nil, err
}

// update the task after setting status and owner
task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, false)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(m.ctx)
scanTask := &ttlScanTask{
ctx: ctx,
Expand All @@ -371,6 +372,28 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
}, nil
}

func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID int64, detectLock bool) (*cache.TTLTask, error) {
ctx := m.ctx

sql, args := cache.SelectFromTTLTaskWithID(jobID, scanID)
if detectLock {
sql += " FOR UPDATE NOWAIT"
}
rows, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return nil, errors.Errorf("didn't find task with jobID: %s, scanID: %d", jobID, scanID)
}
task, err := cache.RowToTTLTask(se, rows[0])
if err != nil {
return nil, err
}

return task, nil
}

// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
for _, task := range m.runningTasks {
Expand Down Expand Up @@ -427,6 +450,7 @@ func (m *taskManager) reportTaskFinished(se session.Session, now time.Time, task
if err != nil {
return err
}
task.Status = cache.TaskStatusFinished

timeoutCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
_, err = se.ExecuteSQL(timeoutCtx, sql, args...)
Expand Down Expand Up @@ -474,6 +498,10 @@ func (m *taskManager) checkInvalidTask(se session.Session) {
m.runningTasks = ownRunningTask
}

func (m *taskManager) reportMetrics() {
metrics.RunningTaskCnt.Set(float64(len(m.runningTasks)))
}

type runningScanTask struct {
*ttlScanTask
cancel func()
Expand Down
Loading

0 comments on commit ae5f65d

Please sign in to comment.