Skip to content

Commit

Permalink
timer,ttl: add some metrics for timer framework and TTL timer syncing (
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Jul 28, 2023
1 parent 50e4911 commit 6d6547a
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 2 deletions.
1 change: 1 addition & 0 deletions metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//parser/terror",
"//timer/metrics",
"//util/logutil",
"//util/mathutil",
"@com_github_pingcap_errors//:errors",
Expand Down
5 changes: 5 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package metrics
import (
"sync"

timermetrics "github.com/pingcap/tidb/timer/metrics"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
Expand Down Expand Up @@ -91,6 +92,7 @@ func InitMetrics() {
InitTelemetryMetrics()
InitTopSQLMetrics()
InitTTLMetrics()
timermetrics.InitTimerMetrics()

PanicCounter = NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -255,6 +257,9 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLPhaseTime)
prometheus.MustRegister(TTLInsertRowsCount)
prometheus.MustRegister(TTLWatermarkDelay)
prometheus.MustRegister(TTLEventCounter)

prometheus.MustRegister(timermetrics.TimerEventCounter)

prometheus.MustRegister(EMACPUUsageGauge)
prometheus.MustRegister(PoolConcurrencyCounter)
Expand Down
17 changes: 17 additions & 0 deletions metrics/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ var (
TTLInsertRowsCount prometheus.Counter

TTLWatermarkDelay *prometheus.GaugeVec

TTLEventCounter *prometheus.CounterVec

TTLSyncTimerCounter prometheus.Counter

TTLFullRefreshTimersCounter prometheus.Counter
)

// InitTTLMetrics initializes ttl metrics.
Expand Down Expand Up @@ -91,4 +97,15 @@ func InitTTLMetrics() {
Name: "ttl_watermark_delay",
Help: "Bucketed delay time in seconds for TTL tables.",
}, []string{LblType, LblName})

TTLEventCounter = NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "ttl_event_count",
Help: "Counter of ttl event.",
}, []string{LblType})

TTLSyncTimerCounter = TTLEventCounter.WithLabelValues("sync_one_timer")
TTLFullRefreshTimersCounter = TTLEventCounter.WithLabelValues("full_refresh_timers")
}
9 changes: 9 additions & 0 deletions timer/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "metrics",
srcs = ["metrics.go"],
importpath = "github.com/pingcap/tidb/timer/metrics",
visibility = ["//visibility:public"],
deps = ["@com_github_prometheus_client_golang//prometheus"],
)
49 changes: 49 additions & 0 deletions timer/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"
)

// Timer metrics
var (
TimerEventCounter *prometheus.CounterVec

TimerFullRefreshCounter prometheus.Counter
TimerPartialRefreshCounter prometheus.Counter
)

// InitTimerMetrics initializes timers metrics.
func InitTimerMetrics() {
TimerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "timer_event_count",
Help: "Counter of timer event.",
}, []string{"scope", "type"})

rtScope := "runtime"
TimerFullRefreshCounter = TimerEventCounter.WithLabelValues(rtScope, "full_refresh_timers")
TimerPartialRefreshCounter = TimerEventCounter.WithLabelValues(rtScope, "partial_refresh_timers")
}

// TimerHookWorkerCounter creates a counter for a hook's event
func TimerHookWorkerCounter(hookClass string, event string) prometheus.Counter {
return TimerEventCounter.WithLabelValues(fmt.Sprintf("hook.%s", hookClass), event)
}
4 changes: 4 additions & 0 deletions timer/runtime/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//timer/api",
"//timer/metrics",
"//util/logutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
],
Expand All @@ -35,6 +37,8 @@ go_test(
deps = [
"//testkit/testsetup",
"//timer/api",
"//timer/metrics",
"//util/mock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//mock",
Expand Down
3 changes: 3 additions & 0 deletions timer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/google/uuid"
"github.com/pingcap/tidb/timer/api"
"github.com/pingcap/tidb/timer/metrics"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -210,6 +211,7 @@ func (rt *TimerGroupRuntime) loop() {
}

func (rt *TimerGroupRuntime) fullRefreshTimers() {
metrics.TimerFullRefreshCounter.Inc()
timers, err := rt.store.List(rt.ctx, rt.cond)
if err != nil {
rt.logger.Error("error occurs when fullRefreshTimers", zap.Error(err))
Expand Down Expand Up @@ -354,6 +356,7 @@ func (rt *TimerGroupRuntime) partialRefreshTimers(timerIDs map[string]struct{})
return false
}

metrics.TimerPartialRefreshCounter.Inc()
cond := rt.buildTimerIDsCond(timerIDs)
timers, err := rt.store.List(rt.ctx, cond)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions timer/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/timer/api"
"github.com/pingcap/tidb/timer/metrics"
mockutil "github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -388,6 +390,14 @@ func TestNextTryTriggerDuration(t *testing.T) {
}

func TestFullRefreshTimers(t *testing.T) {
origFullRefreshCounter := metrics.TimerFullRefreshCounter
defer func() {
metrics.TimerFullRefreshCounter = origFullRefreshCounter
}()

fullRefreshCounter := &mockutil.MetricsCounter{}
metrics.TimerFullRefreshCounter = fullRefreshCounter

mockCore, mockStore := newMockStore()
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")}
Expand Down Expand Up @@ -428,11 +438,14 @@ func TestFullRefreshTimers(t *testing.T) {
t6New.Version++

mockCore.On("List", mock.Anything, runtime.cond).Return(timers[0:], errors.New("mockErr")).Once()
require.Equal(t, float64(0), fullRefreshCounter.Val())
runtime.fullRefreshTimers()
require.Equal(t, float64(1), fullRefreshCounter.Val())
require.Equal(t, 7, len(runtime.cache.items))

mockCore.On("List", mock.Anything, runtime.cond).Return([]*api.TimerRecord{t0New, timers[1], t2New, t4New, t6New}, nil).Once()
runtime.fullRefreshTimers()
require.Equal(t, float64(2), fullRefreshCounter.Val())
mockCore.AssertExpectations(t)
require.Equal(t, 5, len(runtime.cache.items))
require.Equal(t, t0New, runtime.cache.items["t0"].timer)
Expand All @@ -446,6 +459,14 @@ func TestFullRefreshTimers(t *testing.T) {
}

func TestBatchHandlerWatchResponses(t *testing.T) {
origPartialRefreshCounter := metrics.TimerPartialRefreshCounter
defer func() {
metrics.TimerPartialRefreshCounter = origPartialRefreshCounter
}()

partialRefreshCounter := &mockutil.MetricsCounter{}
metrics.TimerPartialRefreshCounter = partialRefreshCounter

mockCore, mockStore := newMockStore()
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")}
Expand Down Expand Up @@ -509,6 +530,7 @@ func TestBatchHandlerWatchResponses(t *testing.T) {
require.Contains(t, condIDs, "t2")
})

require.Equal(t, float64(0), partialRefreshCounter.Val())
runtime.batchHandleWatchResponses([]api.WatchTimerResponse{
{
Events: []*api.WatchTimerEvent{
Expand All @@ -535,6 +557,7 @@ func TestBatchHandlerWatchResponses(t *testing.T) {
},
},
})
require.Equal(t, float64(1), partialRefreshCounter.Val())

mockCore.AssertExpectations(t)
require.Equal(t, 6, len(runtime.cache.items))
Expand Down
24 changes: 22 additions & 2 deletions timer/runtime/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/timer/api"
"github.com/pingcap/tidb/timer/metrics"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -68,6 +70,13 @@ type hookWorker struct {
ch chan *triggerEventRequest
logger *zap.Logger
nowFunc func() time.Time
// metrics for worker
triggerRequestCounter prometheus.Counter
onPreSchedEventCounter prometheus.Counter
onPreSchedEventErrCounter prometheus.Counter
onPreSchedEventDelayCounter prometheus.Counter
onSchedEventCounter prometheus.Counter
onSchedEventErrCounter prometheus.Counter
}

func newHookWorker(ctx context.Context, wg *sync.WaitGroup, groupID string, hookClass string, hook api.Hook, nowFunc func() time.Time) *hookWorker {
Expand All @@ -85,7 +94,13 @@ func newHookWorker(ctx context.Context, wg *sync.WaitGroup, groupID string, hook
zap.String("groupID", groupID),
zap.String("hookClass", hookClass),
),
nowFunc: nowFunc,
nowFunc: nowFunc,
triggerRequestCounter: metrics.TimerHookWorkerCounter(hookClass, "trigger"),
onPreSchedEventCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent"),
onPreSchedEventErrCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent_error"),
onPreSchedEventDelayCounter: metrics.TimerHookWorkerCounter(hookClass, "OnPreSchedEvent_delay"),
onSchedEventCounter: metrics.TimerHookWorkerCounter(hookClass, "OnSchedEvent"),
onSchedEventErrCounter: metrics.TimerHookWorkerCounter(hookClass, "OnSchedEvent_error"),
}

wg.Add(1)
Expand Down Expand Up @@ -127,6 +142,7 @@ func (w *hookWorker) loop() {
}

func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger) *triggerEventResponse {
w.triggerRequestCounter.Inc()
timer := req.timer
resp := &triggerEventResponse{
timerID: timer.ID,
Expand Down Expand Up @@ -177,6 +193,7 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger)
var preResult api.PreSchedEventResult
if w.hook != nil {
logger.Debug("call OnPreSchedEvent")
w.onPreSchedEventCounter.Inc()
result, err := w.hook.OnPreSchedEvent(w.ctx, &timerEvent{
eventID: req.eventID,
record: timer,
Expand All @@ -188,15 +205,16 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger)
zap.Error(err),
zap.Duration("retryAfter", workerEventDefaultRetryInterval),
)
w.onPreSchedEventErrCounter.Inc()
resp.retryAfter.Set(workerEventDefaultRetryInterval)
return resp
}

if result.Delay > 0 {
w.onPreSchedEventDelayCounter.Inc()
resp.retryAfter.Set(result.Delay)
return resp
}

preResult = result
}

Expand Down Expand Up @@ -257,12 +275,14 @@ func (w *hookWorker) triggerEvent(req *triggerEventRequest, logger *zap.Logger)

if w.hook != nil {
logger.Debug("call OnSchedEvent")
w.onSchedEventCounter.Inc()
err = w.hook.OnSchedEvent(w.ctx, &timerEvent{
eventID: req.eventID,
record: timer,
})

if err != nil {
w.onSchedEventErrCounter.Inc()
logger.Error(
"error occurs when invoking hook OnTimerEvent",
zap.Error(err),
Expand Down
Loading

0 comments on commit 6d6547a

Please sign in to comment.