From b0cc8bd0c81c62ab53ec1aaa6da883913bd5a897 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Tue, 13 Sep 2022 11:32:15 +0800 Subject: [PATCH] *: add metrics to prometheus (#79) --- lib/config/proxy.go | 2 + lib/config/proxy_test.go | 5 +- lib/go.mod | 4 +- lib/go.sum | 10 +- lib/util/logger/logger.go | 40 +++++ lib/util/systimemon/systime_mon.go | 48 ++++++ lib/util/systimemon/systime_mon_test.go | 50 ++++++ pkg/manager/config/manager_test.go | 21 +-- pkg/manager/router/backend_observer.go | 11 +- pkg/manager/router/backend_observer_test.go | 1 + pkg/manager/router/metrics.go | 65 +++++++ pkg/manager/router/router.go | 6 + pkg/manager/router/router_test.go | 28 ++- pkg/metrics/backend.go | 51 ------ pkg/metrics/balance.go | 64 +++++++ pkg/metrics/metrics.go | 182 +++++++++++++++----- pkg/metrics/metrics_test.go | 95 ++++++++++ pkg/metrics/queryctx.go | 106 ------------ pkg/metrics/server.go | 45 ++--- pkg/metrics/session.go | 47 +++-- pkg/proxy/backend/backend_conn_mgr.go | 17 +- pkg/proxy/backend/backend_conn_mgr_test.go | 10 +- pkg/proxy/backend/metrics.go | 63 +++++++ pkg/proxy/client/client_conn.go | 4 +- pkg/proxy/net/packetio.go | 10 +- pkg/proxy/proxy.go | 12 +- pkg/server/server.go | 7 +- 27 files changed, 701 insertions(+), 303 deletions(-) create mode 100644 lib/util/logger/logger.go create mode 100644 lib/util/systimemon/systime_mon.go create mode 100644 lib/util/systimemon/systime_mon_test.go create mode 100644 pkg/manager/router/metrics.go delete mode 100644 pkg/metrics/backend.go create mode 100644 pkg/metrics/balance.go create mode 100644 pkg/metrics/metrics_test.go delete mode 100644 pkg/metrics/queryctx.go create mode 100644 pkg/proxy/backend/metrics.go diff --git a/lib/config/proxy.go b/lib/config/proxy.go index 93bdca35..d2df9960 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -33,6 +33,8 @@ type Config struct { } type Metrics struct { + MetricsAddr string `toml:"metrics-addr" json:"metrics-addr"` + MetricsInterval uint `toml:"metrics-interval" json:"metrics-interval"` } type ProxyServerOnline struct { diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index bb0fb41b..d205e6d5 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -41,7 +41,10 @@ var testProxyConfig = Config{ User: "user", Password: "pwd", }, - Metrics: Metrics{}, + Metrics: Metrics{ + MetricsAddr: "127.0.0.1:9021", + MetricsInterval: 15, + }, Log: Log{ Level: "info", Encoder: "tidb", diff --git a/lib/go.mod b/lib/go.mod index 26debcf1..29cb4ac5 100644 --- a/lib/go.mod +++ b/lib/go.mod @@ -7,6 +7,7 @@ require ( github.com/pingcap/log v1.1.0 github.com/spf13/cobra v1.5.0 github.com/stretchr/testify v1.8.0 + go.uber.org/atomic v1.9.0 go.uber.org/zap v1.23.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -15,9 +16,10 @@ require ( github.com/benbjohnson/clock v1.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect ) diff --git a/lib/go.sum b/lib/go.sum index 68eeebf7..e9b6df37 100644 --- a/lib/go.sum +++ b/lib/go.sum @@ -3,16 +3,19 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= @@ -57,8 +60,9 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/lib/util/logger/logger.go b/lib/util/logger/logger.go new file mode 100644 index 00000000..5f5014ee --- /dev/null +++ b/lib/util/logger/logger.go @@ -0,0 +1,40 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logger + +import ( + "testing" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type testingLog struct { + *testing.T +} + +func (t *testingLog) Write(b []byte) (int, error) { + t.Logf("%s", b) + return len(b), nil +} + +// CreateLoggerForTest creates a logger for unit tests. +func CreateLoggerForTest(t *testing.T) *zap.Logger { + return zap.New(zapcore.NewCore( + zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), + zapcore.AddSync(&testingLog{t}), + zap.InfoLevel, + )).Named(t.Name()) +} diff --git a/lib/util/systimemon/systime_mon.go b/lib/util/systimemon/systime_mon.go new file mode 100644 index 00000000..88fcf8b3 --- /dev/null +++ b/lib/util/systimemon/systime_mon.go @@ -0,0 +1,48 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package systimemon + +import ( + "context" + "time" + + "go.uber.org/zap" +) + +// StartMonitor calls systimeErrHandler if system time jump backward. +func StartMonitor(ctx context.Context, logger *zap.Logger, now func() time.Time, systimeErrHandler func(), successCallback func()) { + logger.Info("start system time monitor") + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + tickCount := 0 + for { + last := now().UnixNano() + select { + case <-tick.C: + case <-ctx.Done(): + return + } + if now().UnixNano() < last { + logger.Error("system time jump backward", zap.Int64("last", last)) + systimeErrHandler() + } + // call successCallback per second. + tickCount++ + if tickCount >= 10 { + tickCount = 0 + successCallback() + } + } +} diff --git a/lib/util/systimemon/systime_mon_test.go b/lib/util/systimemon/systime_mon_test.go new file mode 100644 index 00000000..fd308360 --- /dev/null +++ b/lib/util/systimemon/systime_mon_test.go @@ -0,0 +1,50 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package systimemon + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/TiProxy/lib/util/logger" + "github.com/pingcap/TiProxy/lib/util/waitgroup" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func TestSystimeMonitor(t *testing.T) { + errTriggered := atomic.NewBool(false) + nowTriggered := atomic.NewBool(false) + log := logger.CreateLoggerForTest(t) + ctx, cancel := context.WithCancel(context.Background()) + var wg waitgroup.WaitGroup + wg.Run(func() { + StartMonitor(ctx, log, + func() time.Time { + if !nowTriggered.Load() { + nowTriggered.Store(true) + return time.Now() + } + return time.Now().Add(-2 * time.Second) + }, func() { + errTriggered.Store(true) + }, func() {}) + }) + + require.Eventually(t, errTriggered.Load, time.Second, 10*time.Millisecond) + cancel() + wg.Wait() +} diff --git a/pkg/manager/config/manager_test.go b/pkg/manager/config/manager_test.go index c0d5d412..deca5c06 100644 --- a/pkg/manager/config/manager_test.go +++ b/pkg/manager/config/manager_test.go @@ -23,40 +23,27 @@ import ( "testing" "github.com/pingcap/TiProxy/lib/config" + "github.com/pingcap/TiProxy/lib/util/logger" "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) -type testingLog struct { - *testing.T -} - -func (t *testingLog) Write(b []byte) (int, error) { - t.Logf("%s", b) - return len(b), nil -} - func testConfigManager(t *testing.T, cfg config.Advance) (*ConfigManager, context.Context) { addr, err := url.Parse("http://127.0.0.1:0") require.NoError(t, err) testDir := t.TempDir() - logger := zap.New(zapcore.NewCore( - zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), - zapcore.AddSync(&testingLog{t}), - zap.InfoLevel, - )).Named(t.Name()) + log := logger.CreateLoggerForTest(t) etcd_cfg := embed.NewConfig() etcd_cfg.LCUrls = []url.URL{*addr} etcd_cfg.LPUrls = []url.URL{*addr} etcd_cfg.Dir = filepath.Join(testDir, "etcd") - etcd_cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger.Named("etcd")) + etcd_cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(log.Named("etcd")) etcd, err := embed.StartEtcd(etcd_cfg) require.NoError(t, err) @@ -71,7 +58,7 @@ func testConfigManager(t *testing.T, cfg config.Advance) (*ConfigManager, contex } cfgmgr := NewConfigManager() - require.NoError(t, cfgmgr.Init(ctx, ends, cfg, logger)) + require.NoError(t, cfgmgr.Init(ctx, ends, cfg, log)) t.Cleanup(func() { require.NoError(t, cfgmgr.Close()) diff --git a/pkg/manager/router/backend_observer.go b/pkg/manager/router/backend_observer.go index 563743a0..163f3977 100644 --- a/pkg/manager/router/backend_observer.go +++ b/pkg/manager/router/backend_observer.go @@ -61,7 +61,7 @@ const ( var statusNames = map[BackendStatus]string{ StatusHealthy: "healthy", - StatusCannotConnect: "cannot connect", + StatusCannotConnect: "down", StatusMemoryHigh: "memory high", StatusRunSlow: "run slow", StatusSchemaOutdated: "schema outdated", @@ -419,15 +419,22 @@ func (bo *BackendObserver) notifyIfChanged(backendStatus map[string]BackendStatu if lastStatus == StatusHealthy { if newStatus, ok := backendStatus[addr]; !ok { updatedBackends[addr] = StatusCannotConnect + updateBackendStatusMetrics(addr, lastStatus, StatusCannotConnect) } else if newStatus != StatusHealthy { updatedBackends[addr] = newStatus + updateBackendStatusMetrics(addr, lastStatus, newStatus) } } } for addr, newStatus := range backendStatus { if newStatus == StatusHealthy { - if lastStatus, ok := bo.curBackendInfo[addr]; !ok || lastStatus != StatusHealthy { + lastStatus, ok := bo.curBackendInfo[addr] + if !ok { + lastStatus = StatusCannotConnect + } + if lastStatus != StatusHealthy { updatedBackends[addr] = newStatus + updateBackendStatusMetrics(addr, lastStatus, newStatus) } } } diff --git a/pkg/manager/router/backend_observer_test.go b/pkg/manager/router/backend_observer_test.go index 31aff87f..cca2777d 100644 --- a/pkg/manager/router/backend_observer_test.go +++ b/pkg/manager/router/backend_observer_test.go @@ -364,6 +364,7 @@ func checkStatus(t *testing.T, backendChan chan map[string]BackendStatus, backen status, ok := backends[backend.sqlAddr] require.True(t, ok) require.Equal(t, expectedStatus, status) + require.True(t, checkBackendStatusMetrics(backend.sqlAddr, status)) } // Update the TTL for a backend. diff --git a/pkg/manager/router/metrics.go b/pkg/manager/router/metrics.go new file mode 100644 index 00000000..12175c5f --- /dev/null +++ b/pkg/manager/router/metrics.go @@ -0,0 +1,65 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package router + +import ( + "time" + + "github.com/pingcap/TiProxy/pkg/metrics" +) + +func updateBackendStatusMetrics(addr string, prevStatus, curStatus BackendStatus) { + metrics.BackendStatusGauge.WithLabelValues(addr, prevStatus.String()).Set(0) + metrics.BackendStatusGauge.WithLabelValues(addr, curStatus.String()).Set(1) +} + +func checkBackendStatusMetrics(addr string, status BackendStatus) bool { + val, err := metrics.ReadGauge(metrics.BackendStatusGauge.WithLabelValues(addr, status.String())) + if err != nil { + return false + } + return val == 1 +} + +func addBackendConnMetrics(addr string) { + metrics.BackendConnGauge.WithLabelValues(addr).Add(1) +} + +func subBackendConnMetrics(addr string) { + metrics.BackendConnGauge.WithLabelValues(addr).Sub(1) +} + +func readBackendConnMetrics(addr string) (int, error) { + return metrics.ReadGauge(metrics.BackendConnGauge.WithLabelValues(addr)) +} + +func succeedToLabel(succeed bool) string { + if succeed { + return "succeed" + } + return "fail" +} + +func addMigrateMetrics(from, to string, succeed bool, startTime time.Time) { + resLabel := succeedToLabel(succeed) + metrics.MigrateCounter.WithLabelValues(from, to, resLabel).Inc() + + cost := time.Since(startTime) + metrics.MigrateDurationHistogram.WithLabelValues(from, to, resLabel).Observe(float64(cost.Milliseconds())) +} + +func readMigrateCounter(from, to string, succeed bool) (int, error) { + return metrics.ReadCounter(metrics.MigrateCounter.WithLabelValues(from, to, succeedToLabel(succeed))) +} diff --git a/pkg/manager/router/router.go b/pkg/manager/router/router.go index 5ac66069..e644f667 100644 --- a/pkg/manager/router/router.go +++ b/pkg/manager/router/router.go @@ -153,6 +153,7 @@ func (router *ScoreBasedRouter) Route(conn RedirectableConn) (string, error) { phase: phaseNotRedirected, } router.addConn(be, connWrapper) + addBackendConnMetrics(backend.addr) conn.SetEventReceiver(router) return backend.addr, nil } @@ -256,6 +257,9 @@ func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn Redirect } connWrapper := e.Value.(*connWrapper) connWrapper.phase = phaseRedirectEnd + addMigrateMetrics(from, to, true, connWrapper.lastRedirect) + subBackendConnMetrics(from) + addBackendConnMetrics(to) return nil } @@ -286,6 +290,7 @@ func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn Redirectabl } connWrapper := ce.Value.(*connWrapper) connWrapper.phase = phaseRedirectFail + addMigrateMetrics(from, to, false, connWrapper.lastRedirect) router.addConn(be, connWrapper) return nil } @@ -310,6 +315,7 @@ func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), addr)) } router.removeConn(be, ce) + subBackendConnMetrics(addr) return nil } diff --git a/pkg/manager/router/router_test.go b/pkg/manager/router/router_test.go index 155b7a1f..d1e44783 100644 --- a/pkg/manager/router/router_test.go +++ b/pkg/manager/router/router_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/waitgroup" + "github.com/pingcap/TiProxy/pkg/metrics" "github.com/stretchr/testify/require" ) @@ -115,6 +116,7 @@ func (tester *routerTester) addBackends(num int) { tester.backendID++ addr := strconv.Itoa(tester.backendID) backends[addr] = StatusHealthy + metrics.BackendConnGauge.WithLabelValues(addr).Set(0) } tester.router.OnBackendChanged(backends) tester.checkBackendOrder() @@ -215,15 +217,23 @@ func (tester *routerTester) redirectFinish(num int, succeed bool) { if len(conn.GetRedirectingAddr()) == 0 { continue } + + from, to := conn.from, conn.to + prevCount, err := readMigrateCounter(from, to, succeed) + require.NoError(tester.t, err) if succeed { - err := tester.router.OnRedirectSucceed(conn.from, conn.to, conn) + err = tester.router.OnRedirectSucceed(from, to, conn) require.NoError(tester.t, err) conn.redirectSucceed() } else { - err := tester.router.OnRedirectFail(conn.from, conn.to, conn) + err = tester.router.OnRedirectFail(from, to, conn) require.NoError(tester.t, err) conn.redirectFail() } + curCount, err := readMigrateCounter(from, to, succeed) + require.NoError(tester.t, err) + require.Equal(tester.t, prevCount+1, curCount) + i++ if i >= num { break @@ -264,6 +274,15 @@ func (tester *routerTester) checkBackendNum(num int) { require.Equal(tester.t, num, tester.router.backends.Len()) } +func (tester *routerTester) checkBackendConnMetrics() { + for be := tester.router.backends.Front(); be != nil; be = be.Next() { + backend := be.Value.(*backendWrapper) + val, err := readBackendConnMetrics(backend.addr) + require.NoError(tester.t, err) + require.Equal(tester.t, backend.connList.Len(), val) + } +} + func (tester *routerTester) clear() { tester.conns = make(map[uint64]*mockRedirectableConn) tester.router.backends = list.New() @@ -275,8 +294,10 @@ func TestBackendScore(t *testing.T) { tester.addBackends(3) tester.killBackends(2) tester.addConnections(100) + tester.checkBackendConnMetrics() // 90 not redirecting tester.closeConnections(10, false) + tester.checkBackendConnMetrics() // make sure rebalance will work tester.addBackends(3) // 40 not redirecting, 50 redirecting @@ -310,17 +331,20 @@ func TestConnBalanced(t *testing.T) { tester.rebalance(100) tester.redirectFinish(100, true) tester.checkBalanced() + tester.checkBackendConnMetrics() // balanced after scale out tester.addBackends(1) tester.rebalance(100) tester.redirectFinish(100, true) tester.checkBalanced() + tester.checkBackendConnMetrics() // balanced after closing connections tester.closeConnections(10, false) tester.rebalance(100) tester.checkBalanced() + tester.checkBackendConnMetrics() } // Test that routing fails when there's no healthy backends. diff --git a/pkg/metrics/backend.go b/pkg/metrics/backend.go deleted file mode 100644 index 32d4e8df..00000000 --- a/pkg/metrics/backend.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2020 Ipalfish, Inc. -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metrics - -import "github.com/prometheus/client_golang/prometheus" - -const ( - BackendEventIniting = "initing" - BackendEventInited = "inited" - BackendEventClosing = "closing" - BackendEventClosed = "closed" -) - -var ( - BackendEventCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelBackend, - Name: "backend_event_total", - Help: "Counter of backend event.", - }, []string{LblCluster, LblNamespace, LblType}) - - BackendQueryCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelBackend, - Name: "b_conn_cnt", - Help: "Counter of backend query count.", - }, []string{LblCluster, LblNamespace, LblBackendAddr}) - - BackendConnInUseGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelBackend, - Name: "b_conn_in_use", - Help: "Number of backend conn in use.", - }, []string{LblCluster, LblNamespace, LblBackendAddr}) -) diff --git a/pkg/metrics/balance.go b/pkg/metrics/balance.go new file mode 100644 index 00000000..46712135 --- /dev/null +++ b/pkg/metrics/balance.go @@ -0,0 +1,64 @@ +// Copyright 2020 Ipalfish, Inc. +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// Label constants. +const ( + LblBackend = "backend" + LblFrom = "from" + LblTo = "to" + LblStatus = "status" + LblMigrateResult = "migrate_res" +) + +var ( + BackendStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: ModuleWeirProxy, + Subsystem: LabelBalance, + Name: "b_status", + Help: "Gauge of backend status.", + }, []string{LblBackend, LblStatus}) + + BackendConnGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: ModuleWeirProxy, + Subsystem: LabelBalance, + Name: "b_conn", + Help: "Number of backend connections.", + }, []string{LblBackend}) + + MigrateCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: ModuleWeirProxy, + Subsystem: LabelBalance, + Name: "migrate_total", + Help: "Number and result of session migration.", + }, []string{LblFrom, LblTo, LblMigrateResult}) + + MigrateDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: ModuleWeirProxy, + Subsystem: LabelBalance, + Name: "migrate_duration_millis", + Help: "Bucketed histogram of migrating time (s) of sessions.", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 26), // 0.1ms ~ 1h + }, []string{LblFrom, LblTo, LblMigrateResult}) +) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 95265bf5..4f338d93 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -16,7 +16,21 @@ package metrics import ( + "context" + "fmt" + "net" + "os" + "runtime" + "sync" + "time" + + "github.com/pingcap/TiProxy/lib/util/systimemon" + "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/push" + dto "github.com/prometheus/client_model/go" + "go.uber.org/zap" ) const ( @@ -25,54 +39,136 @@ const ( // metrics labels. const ( - LabelServer = "server" - LabelQueryCtx = "queryctx" - LabelBackend = "backend" - LabelSession = "session" - LabelDomain = "domain" - LabelDDLOwner = "ddl-owner" - LabelDDL = "ddl" - LabelDDLWorker = "ddl-worker" - LabelDDLSyncer = "ddl-syncer" - LabelGCWorker = "gcworker" - LabelAnalyze = "analyze" - - LabelBatchRecvLoop = "batch-recv-loop" - LabelBatchSendLoop = "batch-send-loop" - - opSucc = "ok" - opFailed = "err" - - LableScope = "scope" - ScopeGlobal = "global" - ScopeSession = "session" + LabelServer = "server" + LabelBalance = "balance" + LabelSession = "session" + LabelMonitor = "monitor" ) -// RetLabel returns "ok" when err == nil and "err" when err != nil. -// This could be useful when you need to observe the operation result. -func RetLabel(err error) string { - if err == nil { - return opSucc +// MetricsManager manages metrics. +type MetricsManager struct { + wg waitgroup.WaitGroup + cancel context.CancelFunc + logger *zap.Logger +} + +// NewMetricsManager creates a MetricsManager. +func NewMetricsManager() *MetricsManager { + return &MetricsManager{} +} + +var registerOnce = &sync.Once{} + +// Init registers metrics and pushes metrics to prometheus. +func (mm *MetricsManager) Init(ctx context.Context, logger *zap.Logger, metricsAddr string, metricsInterval uint, proxyAddr string) { + mm.logger = logger + registerOnce.Do(registerProxyMetrics) + ctx, mm.cancel = context.WithCancel(ctx) + mm.setupMonitor(ctx) + mm.pushMetric(ctx, metricsAddr, time.Duration(metricsInterval)*time.Second, proxyAddr) +} + +// Close stops all goroutines. +func (mm *MetricsManager) Close() { + if mm.cancel != nil { + mm.cancel() } - return opFailed + mm.wg.Wait() } -func RegisterProxyMetrics() { - prometheus.MustRegister(PanicCounter) - prometheus.MustRegister(QueryTotalCounter) - prometheus.MustRegister(ExecuteErrorCounter) +func (mm *MetricsManager) setupMonitor(ctx context.Context) { + // Enable the mutex profile, 1/10 of mutex blocking event sampling. + runtime.SetMutexProfileFraction(10) + systimeErrHandler := func() { + TimeJumpBackCounter.Inc() + } + callBackCount := 0 + successCallBack := func() { + callBackCount++ + // It is callback by monitor per second, we increase metrics.KeepAliveCounter per 5s. + if callBackCount >= 5 { + callBackCount = 0 + KeepAliveCounter.Inc() + } + } + mm.wg.Run(func() { + systimemon.StartMonitor(ctx, mm.logger, time.Now, systimeErrHandler, successCallBack) + }) +} + +// pushMetric pushes metrics in background. +func (mm *MetricsManager) pushMetric(ctx context.Context, addr string, interval time.Duration, proxyAddr string) { + if interval == time.Duration(0) || len(addr) == 0 { + mm.logger.Info("disable Prometheus push client") + return + } + mm.logger.Info("start prometheus push client", zap.String("server addr", addr), zap.String("interval", interval.String())) + mm.wg.Run(func() { + prometheusPushClient(ctx, mm.logger, addr, interval, proxyAddr) + }) +} + +// registerProxyMetrics registers metrics. +func registerProxyMetrics() { + prometheus.DefaultRegisterer.Unregister(prometheus.NewGoCollector()) + prometheus.MustRegister(collectors.NewGoCollector(collectors.WithGoCollections(collectors.GoRuntimeMetricsCollection | collectors.GoRuntimeMemStatsCollection))) + prometheus.MustRegister(ConnGauge) + prometheus.MustRegister(TimeJumpBackCounter) + prometheus.MustRegister(KeepAliveCounter) + prometheus.MustRegister(BackendStatusGauge) + prometheus.MustRegister(BackendConnGauge) + prometheus.MustRegister(QueryTotalCounter) + prometheus.MustRegister(QueryDurationHistogram) + prometheus.MustRegister(MigrateCounter) + prometheus.MustRegister(MigrateDurationHistogram) +} - // query ctx metrics - prometheus.MustRegister(QueryCtxQueryCounter) - prometheus.MustRegister(QueryCtxQueryDeniedCounter) - prometheus.MustRegister(QueryCtxQueryDurationHistogram) - prometheus.MustRegister(QueryCtxGauge) - prometheus.MustRegister(QueryCtxAttachedConnGauge) - prometheus.MustRegister(QueryCtxTransactionDuration) - - // backend metrics - prometheus.MustRegister(BackendEventCounter) - prometheus.MustRegister(BackendQueryCounter) - prometheus.MustRegister(BackendConnInUseGauge) +// prometheusPushClient pushes metrics to Prometheus Pushgateway. +func prometheusPushClient(ctx context.Context, logger *zap.Logger, addr string, interval time.Duration, proxyAddr string) { + job := "proxy" + pusher := push.New(addr, job) + pusher = pusher.Gatherer(prometheus.DefaultGatherer) + pusher = pusher.Grouping("instance", instanceName(proxyAddr)) + for ctx.Err() == nil { + err := pusher.Push() + if err != nil { + logger.Error("could not push metrics to prometheus pushgateway", zap.String("err", err.Error())) + } + select { + case <-time.After(interval): + case <-ctx.Done(): + return + } + } +} + +func instanceName(proxyAddr string) string { + hostname, err := os.Hostname() + if err != nil { + return "unknown" + } + _, port, err := net.SplitHostPort(proxyAddr) + if err != nil { + return "unknown" + } + return fmt.Sprintf("%s_%s", hostname, port) +} + +// ReadCounter reads the value from the counter. It is only used for testing. +func ReadCounter(counter prometheus.Counter) (int, error) { + var metric dto.Metric + if err := counter.Write(&metric); err != nil { + return 0, err + } + return int(metric.Counter.GetValue()), nil +} + +// ReadGauge reads the value from the gauge. It is only used for testing. +func ReadGauge(gauge prometheus.Gauge) (int, error) { + var metric dto.Metric + if err := gauge.Write(&metric); err != nil { + return 0, err + } + return int(metric.Gauge.GetValue()), nil } diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 00000000..8604e12c --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,95 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/pingcap/TiProxy/lib/util/logger" + "github.com/stretchr/testify/require" +) + +// Test that the metrics are pushed or not pushed with different configurations. +func TestPushMetrics(t *testing.T) { + proxyAddr := "0.0.0.0:6000" + labelName := fmt.Sprintf("%s_%s_connections", ModuleWeirProxy, LabelServer) + hostname, err := os.Hostname() + require.NoError(t, err) + expectedPath := fmt.Sprintf("/metrics/job/proxy/instance/%s_6000", hostname) + bodyCh := make(chan string) + pgwOK := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + bodyCh <- string(body) + require.Equal(t, expectedPath, r.URL.EscapedPath()) + w.Header().Set("Content-Type", `text/plain; charset=utf-8`) + w.WriteHeader(http.StatusOK) + }), + ) + defer pgwOK.Close() + log := logger.CreateLoggerForTest(t) + + tests := []struct { + metricsAddr string + metricsInterval uint + pushed bool + }{ + { + metricsAddr: pgwOK.URL, + metricsInterval: 1, + pushed: true, + }, + { + metricsAddr: "", + metricsInterval: 1, + pushed: false, + }, + { + metricsAddr: pgwOK.URL, + metricsInterval: 0, + pushed: false, + }, + } + for _, tt := range tests { + for len(bodyCh) > 0 { + <-bodyCh + } + mm := NewMetricsManager() + mm.Init(context.Background(), log, tt.metricsAddr, tt.metricsInterval, proxyAddr) + if tt.pushed { + select { + case body := <-bodyCh: + require.Contains(t, body, labelName) + case <-time.After(2 * time.Second): + t.Fatal("not pushed") + } + } else { + select { + case <-bodyCh: + t.Fatal("pushed") + case <-time.After(2 * time.Second): + } + } + mm.Close() + } +} diff --git a/pkg/metrics/queryctx.go b/pkg/metrics/queryctx.go deleted file mode 100644 index 7e4872f9..00000000 --- a/pkg/metrics/queryctx.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2020 Ipalfish, Inc. -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metrics - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -type AstStmtType int - -const ( - StmtTypeUnknown AstStmtType = iota - StmtTypeSelect - StmtTypeInsert - StmtTypeUpdate - StmtTypeDelete - StmtTypeDDL - StmtTypeBegin - StmtTypeCommit - StmtTypeRollback - StmtTypeSet - StmtTypeShow - StmtTypeUse - StmtTypeComment -) - -const ( - StmtNameUnknown = "unknown" - StmtNameSelect = "select" - StmtNameInsert = "insert" - StmtNameUpdate = "update" - StmtNameDelete = "delete" - StmtNameDDL = "ddl" - StmtNameBegin = "begin" - StmtNameCommit = "commit" - StmtNameRollback = "rollback" - StmtNameSet = "set" - StmtNameShow = "show" - StmtNameUse = "use" - StmtNameComment = "comment" -) - -var ( - QueryCtxQueryCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelQueryCtx, - Name: "query_total", - Help: "Counter of queries.", - }, []string{LblCluster, LblNamespace, LblDb, LblTable, LblSQLType, LblResult}) - - QueryCtxQueryDeniedCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelQueryCtx, - Name: "query_denied", - Help: "Counter of denied queries.", - }, []string{LblCluster, LblNamespace, LblDb, LblTable, LblSQLType}) - - QueryCtxQueryDurationHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelQueryCtx, - Name: "handle_query_duration_seconds", - Help: "Bucketed histogram of processing time (s) of handled queries.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days - }, []string{LblCluster, LblNamespace, LblDb, LblTable, LblSQLType}) - - QueryCtxGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelQueryCtx, - Name: "queryctx", - Help: "Number of queryctx (equals to client connection).", - }, []string{LblCluster, LblNamespace}) - - QueryCtxAttachedConnGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelQueryCtx, - Name: "attached_connections", - Help: "Number of attached backend connections.", - }, []string{LblCluster, LblNamespace}) - - QueryCtxTransactionDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "tidb", - Subsystem: "session", - Name: "transaction_duration_seconds", - Help: "Bucketed histogram of a transaction execution duration, including retry.", - Buckets: prometheus.ExponentialBuckets(0.001, 2, 28), // 1ms ~ 1.5days - }, []string{LblCluster, LblNamespace, LblDb, LblSQLType}) -) diff --git a/pkg/metrics/server.go b/pkg/metrics/server.go index 44a0feea..6e62beb9 100644 --- a/pkg/metrics/server.go +++ b/pkg/metrics/server.go @@ -20,42 +20,27 @@ import ( ) var ( - // PanicCounter measures the count of panics. - PanicCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ + ConnGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ Namespace: ModuleWeirProxy, Subsystem: LabelServer, - Name: "panic_total", - Help: "Counter of panic.", - }, []string{LblCluster, LblType}) + Name: "connections", + Help: "Number of connections.", + }) - QueryTotalCounter = prometheus.NewCounterVec( + TimeJumpBackCounter = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: ModuleWeirProxy, - Subsystem: LabelServer, - Name: "query_total", - Help: "Counter of queries.", - }, []string{LblCluster, LblType, LblResult}) + Subsystem: LabelMonitor, + Name: "time_jump_back_total", + Help: "Counter of system time jumps backward.", + }) - ExecuteErrorCounter = prometheus.NewCounterVec( + KeepAliveCounter = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: ModuleWeirProxy, - Subsystem: LabelServer, - Name: "execute_error_total", - Help: "Counter of execute errors.", - }, []string{LblCluster, LblType}) - - ConnGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: ModuleWeirProxy, - Subsystem: LabelServer, - Name: "connections", - Help: "Number of connections.", - }, []string{LblCluster}) - - EventStart = "start" - EventGracefulDown = "graceful_shutdown" - // Eventkill occurs when the server.Kill() function is called. - EventKill = "kill" - EventClose = "close" + Subsystem: LabelMonitor, + Name: "keep_alive_total", + Help: "Counter of proxy keep alive.", + }) ) diff --git a/pkg/metrics/session.go b/pkg/metrics/session.go index 577a9a18..431c015e 100644 --- a/pkg/metrics/session.go +++ b/pkg/metrics/session.go @@ -15,31 +15,28 @@ package metrics -// Label constants. +import "github.com/prometheus/client_golang/prometheus" + +// LblCmdType is the label constant. const ( - LblUnretryable = "unretryable" - LblReachMax = "reach_max" - LblOK = "ok" - LblError = "error" - LblCommit = "commit" - LblAbort = "abort" - LblRollback = "rollback" - LblType = "type" - LblDb = "db" - LblTable = "table" - LblResult = "result" - LblSQLType = "sql_type" - LblGeneral = "general" - LblInternal = "internal" - LbTxnMode = "txn_mode" - LblPessimistic = "pessimistic" - LblOptimistic = "optimistic" - LblStore = "store" - LblAddress = "address" - LblBatchGet = "batch_get" - LblGet = "get" - LblNamespace = "namespace" - LblCluster = "cluster" + LblCmdType = "cmd_type" +) + +var ( + QueryTotalCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: ModuleWeirProxy, + Subsystem: LabelSession, + Name: "query_total", + Help: "Counter of queries.", + }, []string{LblBackend, LblCmdType}) - LblBackendAddr = "backend_addr" + QueryDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: ModuleWeirProxy, + Subsystem: LabelSession, + Name: "query_duration_seconds", + Help: "Bucketed histogram of processing time (s) of handled queries.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days + }, []string{LblBackend, LblCmdType}) ) diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index 6c7ae7a1..7b88c90b 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -23,13 +23,14 @@ import ( "strings" "sync" "sync/atomic" + "time" "unsafe" gomysql "github.com/go-mysql-org/go-mysql/mysql" - "github.com/pingcap/TiProxy/pkg/manager/router" - pnet "github.com/pingcap/TiProxy/pkg/proxy/net" "github.com/pingcap/TiProxy/lib/util/errors" "github.com/pingcap/TiProxy/lib/util/waitgroup" + "github.com/pingcap/TiProxy/pkg/manager/router" + pnet "github.com/pingcap/TiProxy/pkg/proxy/net" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -123,15 +124,24 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, serverAddr string, c // ExecuteCmd forwards messages between the client and the backend. // If it finds that the session is ready for redirection, it migrates the session. func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, clientIO *pnet.PacketIO) error { + if len(request) < 1 { + return mysql.ErrMalformPacket + } + cmd := request[0] + startTime := time.Now() mgr.processLock.Lock() defer mgr.processLock.Unlock() + waitingRedirect := atomic.LoadPointer(&mgr.signal) != nil holdRequest, err := mgr.cmdProcessor.executeCmd(request, clientIO, mgr.backendConn.PacketIO(), waitingRedirect) + if !holdRequest { + addCmdMetrics(cmd, mgr.backendConn.Addr(), startTime) + } if err != nil && !IsMySQLError(err) { return err } if err == nil { - switch request[0] { + switch cmd { case mysql.ComQuit: return nil case mysql.ComSetOption: @@ -158,6 +168,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, c // Execute the held request no matter redirection succeeds or not. if holdRequest { _, err = mgr.cmdProcessor.executeCmd(request, clientIO, mgr.backendConn.PacketIO(), false) + addCmdMetrics(cmd, mgr.backendConn.Addr(), startTime) } if err != nil && !IsMySQLError(err) { return err diff --git a/pkg/proxy/backend/backend_conn_mgr_test.go b/pkg/proxy/backend/backend_conn_mgr_test.go index 6f37481d..ded3efe4 100644 --- a/pkg/proxy/backend/backend_conn_mgr_test.go +++ b/pkg/proxy/backend/backend_conn_mgr_test.go @@ -19,9 +19,9 @@ import ( "sync/atomic" "testing" + "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/pingcap/TiProxy/pkg/manager/router" pnet "github.com/pingcap/TiProxy/pkg/proxy/net" - "github.com/pingcap/TiProxy/lib/util/waitgroup" "github.com/pingcap/tidb/parser/mysql" "github.com/stretchr/testify/require" ) @@ -152,7 +152,13 @@ func (ts *backendMgrTester) forwardCmd4Proxy(clientIO, backendIO *pnet.PacketIO) clientIO.ResetSequence() request, err := clientIO.ReadPacket() require.NoError(ts.t, err) - return ts.mp.ExecuteCmd(context.Background(), request, clientIO) + prevCounter, err := readCmdCounter(request[0], ts.tc.backendListener.Addr().String()) + require.NoError(ts.t, err) + rsErr := ts.mp.ExecuteCmd(context.Background(), request, clientIO) + curCounter, err := readCmdCounter(request[0], ts.tc.backendListener.Addr().String()) + require.NoError(ts.t, err) + require.Equal(ts.t, prevCounter+1, curCounter) + return rsErr } func (ts *backendMgrTester) respondWithNoTxn4Backend(packetIO *pnet.PacketIO) error { diff --git a/pkg/proxy/backend/metrics.go b/pkg/proxy/backend/metrics.go new file mode 100644 index 00000000..7ef74ff4 --- /dev/null +++ b/pkg/proxy/backend/metrics.go @@ -0,0 +1,63 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "strconv" + "time" + + "github.com/pingcap/TiProxy/pkg/metrics" + "github.com/pingcap/tidb/parser/mysql" +) + +// The labels are consistent with TiDB. +var ( + cmdToLabel = map[byte]string{ + mysql.ComSleep: "Sleep", + mysql.ComQuit: "Quit", + mysql.ComInitDB: "InitDB", + mysql.ComQuery: "Query", + mysql.ComPing: "Ping", + mysql.ComFieldList: "FieldList", + mysql.ComStmtPrepare: "StmtPrepare", + mysql.ComStmtExecute: "StmtExecute", + mysql.ComStmtFetch: "StmtFetch", + mysql.ComStmtClose: "StmtClose", + mysql.ComStmtSendLongData: "StmtSendLongData", + mysql.ComStmtReset: "StmtReset", + mysql.ComSetOption: "SetOption", + } +) + +func addCmdMetrics(cmd byte, addr string, startTime time.Time) { + label, ok := cmdToLabel[cmd] + if !ok { + label = strconv.Itoa(int(cmd)) + } + metrics.QueryTotalCounter.WithLabelValues(addr, label).Inc() + + // The duration labels are different with TiDB: Labels in TiDB are statement types. + // However, the proxy is not aware of the statement types, so we use command types instead. + cost := time.Since(startTime) + metrics.QueryDurationHistogram.WithLabelValues(addr, label).Observe(cost.Seconds()) +} + +func readCmdCounter(cmd byte, addr string) (int, error) { + label, ok := cmdToLabel[cmd] + if !ok { + label = strconv.Itoa(int(cmd)) + } + return metrics.ReadCounter(metrics.QueryTotalCounter.WithLabelValues(addr, label)) +} diff --git a/pkg/proxy/client/client_conn.go b/pkg/proxy/client/client_conn.go index 28082aaf..e33296cd 100644 --- a/pkg/proxy/client/client_conn.go +++ b/pkg/proxy/client/client_conn.go @@ -19,11 +19,10 @@ import ( "crypto/tls" "net" + "github.com/pingcap/TiProxy/lib/util/errors" "github.com/pingcap/TiProxy/pkg/manager/namespace" "github.com/pingcap/TiProxy/pkg/proxy/backend" pnet "github.com/pingcap/TiProxy/pkg/proxy/net" - "github.com/pingcap/TiProxy/lib/util/errors" - "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -75,7 +74,6 @@ func (cc *ClientConnection) connectBackend(ctx context.Context) error { func (cc *ClientConnection) Run(ctx context.Context) { if err := cc.connectBackend(ctx); err != nil { logutil.Logger(ctx).Info("new connection fails", zap.String("remoteAddr", cc.Addr()), zap.Error(err)) - metrics.HandShakeErrorCounter.Inc() return } diff --git a/pkg/proxy/net/packetio.go b/pkg/proxy/net/packetio.go index 4dbd15bd..92c5af8b 100644 --- a/pkg/proxy/net/packetio.go +++ b/pkg/proxy/net/packetio.go @@ -128,7 +128,7 @@ func (p *PacketIO) ReadOnePacket() ([]byte, bool, error) { var header [4]byte if _, err := io.ReadFull(p.conn, header[:]); err != nil { - return nil, false, errors.WithStack(errors.Wrap(ErrReadConn, err)) + return nil, false, errors.Wrap(ErrReadConn, err) } // probe proxy V2 @@ -150,7 +150,7 @@ func (p *PacketIO) ReadOnePacket() ([]byte, bool, error) { // refill mysql headers if refill { if _, err := io.ReadFull(p.conn, header[:]); err != nil { - return nil, false, errors.WithStack(errors.Wrap(ErrReadConn, err)) + return nil, false, errors.Wrap(ErrReadConn, err) } } @@ -163,7 +163,7 @@ func (p *PacketIO) ReadOnePacket() ([]byte, bool, error) { data := make([]byte, length) if _, err := io.ReadFull(p.conn, data); err != nil { - return nil, false, errors.WithStack(errors.Wrap(ErrReadConn, err)) + return nil, false, errors.Wrap(ErrReadConn, err) } return data, length == mysql.MaxPayloadLen, nil } @@ -202,11 +202,11 @@ func (p *PacketIO) WriteOnePacket(data []byte) (int, bool, error) { p.sequence++ if _, err := io.Copy(p.buf, bytes.NewReader(header[:])); err != nil { - return 0, more, errors.WithStack(errors.Wrap(ErrWriteConn, err)) + return 0, more, errors.Wrap(ErrWriteConn, err) } if _, err := io.Copy(p.buf, bytes.NewReader(data[:length])); err != nil { - return 0, more, errors.WithStack(errors.Wrap(ErrWriteConn, err)) + return 0, more, errors.Wrap(ErrWriteConn, err) } return length, more, nil diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 34f69364..57230dbe 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -21,13 +21,13 @@ import ( "sync" "github.com/pingcap/TiProxy/lib/config" - mgrns "github.com/pingcap/TiProxy/pkg/manager/namespace" - "github.com/pingcap/TiProxy/pkg/proxy/backend" - "github.com/pingcap/TiProxy/pkg/proxy/client" "github.com/pingcap/TiProxy/lib/util/errors" "github.com/pingcap/TiProxy/lib/util/security" "github.com/pingcap/TiProxy/lib/util/waitgroup" - "github.com/pingcap/tidb/metrics" + mgrns "github.com/pingcap/TiProxy/pkg/manager/namespace" + "github.com/pingcap/TiProxy/pkg/metrics" + "github.com/pingcap/TiProxy/pkg/proxy/backend" + "github.com/pingcap/TiProxy/pkg/proxy/client" "go.uber.org/zap" ) @@ -81,8 +81,6 @@ func NewSQLServer(logger *zap.Logger, workdir string, cfg config.ProxyServer, sc } func (s *SQLServer) Run(ctx context.Context, onlineProxyConfig <-chan *config.ProxyServerOnline) { - metrics.ServerEventCounter.WithLabelValues(metrics.EventStart).Inc() - for { select { case <-ctx.Done(): @@ -172,7 +170,5 @@ func (s *SQLServer) Close() error { s.mu.Unlock() s.wg.Wait() - - metrics.ServerEventCounter.WithLabelValues(metrics.EventClose).Inc() return errors.Collect(ErrCloseServer, errs...) } diff --git a/pkg/server/server.go b/pkg/server/server.go index 80e039c0..d8f11aea 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -46,6 +46,7 @@ type Server struct { ConfigManager *mgrcfg.ConfigManager NamespaceManager *mgrns.NamespaceManager ObserverClient *clientv3.Client + MetricsManager *metrics.MetricsManager // HTTP/GRPC services Etcd *embed.Etcd @@ -58,12 +59,13 @@ func NewServer(ctx context.Context, cfg *config.Config, logger *zap.Logger, pubA srv = &Server{ ConfigManager: mgrcfg.NewConfigManager(), NamespaceManager: mgrns.NewNamespaceManager(), + MetricsManager: metrics.NewMetricsManager(), } ready := atomic.NewBool(false) // setup metrics - metrics.RegisterProxyMetrics() + srv.MetricsManager.Init(ctx, logger.Named("metrics"), cfg.Metrics.MetricsAddr, cfg.Metrics.MetricsInterval, cfg.Proxy.Addr) // setup gin and etcd { @@ -215,6 +217,9 @@ func (s *Server) Close() error { s.Etcd.Close() wg.Wait() } + if s.MetricsManager != nil { + s.MetricsManager.Close() + } return errors.Collect(ErrCloseServer, errs...) }