Skip to content

Commit

Permalink
Rewrite processlist collector
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Vynar <roman.vynar@quiq.com>
  • Loading branch information
roman-vynar committed Dec 2, 2021
1 parent 492c004 commit 689fc87
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 150 deletions.
237 changes: 87 additions & 150 deletions collector/info_schema_processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"database/sql"
"fmt"
"reflect"
"sort"
"strings"

"github.com/go-kit/log"
Expand All @@ -30,16 +32,15 @@ const infoSchemaProcesslistQuery = `
SELECT
user,
SUBSTRING_INDEX(host, ':', 1) AS host,
COALESCE(command,'') AS command,
COALESCE(state,'') AS state,
count(*) AS processes,
sum(time) AS seconds
COALESCE(command, '') AS command,
COALESCE(state, '') AS state,
COUNT(*) AS processes,
SUM(time) AS seconds
FROM information_schema.processlist
WHERE ID != connection_id()
AND TIME >= %d
GROUP BY user,SUBSTRING_INDEX(host, ':', 1),command,state
ORDER BY null
`
GROUP BY user, SUBSTRING_INDEX(host, ':', 1), command, state
`

// Tunable flags.
var (
Expand All @@ -60,104 +61,23 @@ var (
// Metric descriptors.
var (
processlistCountDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "threads"),
"The number of threads (connections) split by current state.",
[]string{"state"}, nil)
prometheus.BuildFQName(namespace, informationSchema, "processlist_total"),
"The number of threads split by current state.",
[]string{"command", "state"}, nil)
processlistTimeDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "threads_seconds"),
"The number of seconds threads (connections) have used split by current state.",
[]string{"state"}, nil)
prometheus.BuildFQName(namespace, informationSchema, "processlist_seconds_total"),
"The number of seconds threads have used split by current state.",
[]string{"command", "state"}, nil)
processesByUserDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "processes_by_user"),
prometheus.BuildFQName(namespace, informationSchema, "processlist_by_user"),
"The number of processes by user.",
[]string{"mysql_user"}, nil)
processesByHostDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "processes_by_host"),
prometheus.BuildFQName(namespace, informationSchema, "processlist_by_host"),
"The number of processes by host.",
[]string{"client_host"}, nil)
)

// whitelist for connection/process states in SHOW PROCESSLIST
// tokudb uses the state column for "Queried about _______ rows"
var (
// TODO: might need some more keys for other MySQL versions or other storage engines
// see https://dev.mysql.com/doc/refman/5.7/en/general-thread-states.html
threadStateCounterMap = map[string]uint32{
"after create": uint32(0),
"altering table": uint32(0),
"analyzing": uint32(0),
"checking permissions": uint32(0),
"checking table": uint32(0),
"cleaning up": uint32(0),
"closing tables": uint32(0),
"converting heap to myisam": uint32(0),
"copying to tmp table": uint32(0),
"creating sort index": uint32(0),
"creating table": uint32(0),
"creating tmp table": uint32(0),
"deleting": uint32(0),
"executing": uint32(0),
"execution of init_command": uint32(0),
"end": uint32(0),
"freeing items": uint32(0),
"flushing tables": uint32(0),
"fulltext initialization": uint32(0),
"idle": uint32(0),
"init": uint32(0),
"killed": uint32(0),
"waiting for lock": uint32(0),
"logging slow query": uint32(0),
"login": uint32(0),
"manage keys": uint32(0),
"opening tables": uint32(0),
"optimizing": uint32(0),
"preparing": uint32(0),
"reading from net": uint32(0),
"removing duplicates": uint32(0),
"removing tmp table": uint32(0),
"reopen tables": uint32(0),
"repair by sorting": uint32(0),
"repair done": uint32(0),
"repair with keycache": uint32(0),
"replication master": uint32(0),
"rolling back": uint32(0),
"searching rows for update": uint32(0),
"sending data": uint32(0),
"sorting for group": uint32(0),
"sorting for order": uint32(0),
"sorting index": uint32(0),
"sorting result": uint32(0),
"statistics": uint32(0),
"updating": uint32(0),
"waiting for tables": uint32(0),
"waiting for table flush": uint32(0),
"waiting on cond": uint32(0),
"writing to net": uint32(0),
"other": uint32(0),
}
threadStateMapping = map[string]string{
"user sleep": "idle",
"creating index": "altering table",
"committing alter table to storage engine": "altering table",
"discard or import tablespace": "altering table",
"rename": "altering table",
"setup": "altering table",
"renaming result table": "altering table",
"preparing for alter table": "altering table",
"copying to group table": "copying to tmp table",
"copy to tmp table": "copying to tmp table",
"query end": "end",
"update": "updating",
"updating main table": "updating",
"updating reference tables": "updating",
"system lock": "waiting for lock",
"user lock": "waiting for lock",
"table lock": "waiting for lock",
"deleting from main table": "deleting",
"deleting from reference tables": "deleting",
}
)

// ScrapeProcesslist collects from `information_schema.processlist`.
type ScrapeProcesslist struct{}

Expand Down Expand Up @@ -189,83 +109,100 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prome
defer processlistRows.Close()

var (
user string
host string
command string
state string
processes uint32
time uint32
user string
host string
command string
state string
count uint32
time uint32
)
stateCounts := make(map[string]uint32, len(threadStateCounterMap))
stateTime := make(map[string]uint32, len(threadStateCounterMap))
hostCount := make(map[string]uint32)
userCount := make(map[string]uint32)
for k, v := range threadStateCounterMap {
stateCounts[k] = v
stateTime[k] = v
}
// Define maps
stateCounts := make(map[string]map[string]uint32)
stateTime := make(map[string]map[string]uint32)
stateHostCounts := make(map[string]uint32)
stateUserCounts := make(map[string]uint32)

for processlistRows.Next() {
err = processlistRows.Scan(&user, &host, &command, &state, &processes, &time)
err = processlistRows.Scan(&user, &host, &command, &state, &count, &time)
if err != nil {
return err
}
realState := deriveThreadState(command, state)
stateCounts[realState] += processes
stateTime[realState] += time
hostCount[host] = hostCount[host] + processes
userCount[user] = userCount[user] + processes
}
command = strings.ToLower(command)
state = sanitizeState(state)
if host == "" {
host = "blank"
}

if *processesByHostFlag {
for host, processes := range hostCount {
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(processes), host)
// Init maps
if _, ok := stateCounts[command]; !ok {
stateCounts[command] = make(map[string]uint32)
stateTime[command] = make(map[string]uint32)
}
if _, ok := stateCounts[command][state]; !ok {
stateCounts[command][state] = 0
stateTime[command][state] = 0
}
if _, ok := stateHostCounts[host]; !ok {
stateHostCounts[host] = 0
}
if _, ok := stateUserCounts[user]; !ok {
stateUserCounts[user] = 0
}

stateCounts[command][state] += count
stateTime[command][state] += time
stateHostCounts[host] += count
stateUserCounts[user] += count
}

if *processesByUserFlag {
for user, processes := range userCount {
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(processes), user)
for _, command := range sortedMapKeys(stateCounts) {
for _, state := range sortedMapKeys(stateCounts[command]) {
ch <- prometheus.MustNewConstMetric(processlistCountDesc, prometheus.GaugeValue, float64(stateCounts[command][state]), command, state)
ch <- prometheus.MustNewConstMetric(processlistTimeDesc, prometheus.GaugeValue, float64(stateTime[command][state]), command, state)
}
}

for state, processes := range stateCounts {
ch <- prometheus.MustNewConstMetric(processlistCountDesc, prometheus.GaugeValue, float64(processes), state)
if *processesByHostFlag {
for _, host := range sortedMapKeys(stateHostCounts) {
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host)
}
}
for state, time := range stateTime {
ch <- prometheus.MustNewConstMetric(processlistTimeDesc, prometheus.GaugeValue, float64(time), state)
if *processesByUserFlag {
for _, user := range sortedMapKeys(stateUserCounts) {
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user)
}
}

return nil
}

func deriveThreadState(command string, state string) string {
var normCmd = strings.Replace(strings.ToLower(command), "_", " ", -1)
var normState = strings.Replace(strings.ToLower(state), "_", " ", -1)
// check if it's already a valid state
_, knownState := threadStateCounterMap[normState]
if knownState {
return normState
}
// check if plain mapping applies
mappedState, canMap := threadStateMapping[normState]
if canMap {
return mappedState
}
// check special waiting for XYZ lock
if strings.Contains(normState, "waiting for") && strings.Contains(normState, "lock") {
return "waiting for lock"
func sortedMapKeys(m interface{}) []string {
v := reflect.ValueOf(m)
keys := make([]string, 0, len(v.MapKeys()))
for _, key := range v.MapKeys() {
keys = append(keys, key.String())
}
if normCmd == "sleep" && normState == "" {
return "idle"
sort.Strings(keys)
return keys
}

func sanitizeState(state string) string {
if state == "" {
state = "blank"
}
if normCmd == "query" {
return "executing"
state = strings.ToLower(state)

replacements := map[string]string{
";": "",
",": "",
":": "",
" ": "_",
"-": "_",
}
if normCmd == "binlog dump" {
return "replication master"
for r := range replacements {
state = strings.Replace(state, r, replacements[r], -1)
}
return "other"
return state
}

// check interface
Expand Down
77 changes: 77 additions & 0 deletions collector/info_schema_processlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"fmt"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestScrapeProcesslist(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()

query := fmt.Sprintf(infoSchemaProcesslistQuery, 0)
columns := []string{"user", "host", "command", "state", "processes", "seconds"}
rows := sqlmock.NewRows(columns).
AddRow("manager", "10.0.7.234", "Sleep", "", 10, 87).
AddRow("foobar", "10.0.7.154", "Sleep", "", 8, 842).
AddRow("root", "10.0.7.253", "Sleep", "", 1, 20).
AddRow("feedback", "10.0.7.179", "Sleep", "", 2, 14).
AddRow("system user", "", "Connect", "waiting for handler commit", 1, 7271248).
AddRow("message", "10.0.7.234", "Sleep", "", 4, 62).
AddRow("system user", "", "Query", "Slave has read all relay log; waiting for more updates", 1, 7271248).
AddRow("event_scheduler", "localhost", "Daemon", "Waiting on empty queue", 1, 7271248)
mock.ExpectQuery(sanitizeQuery(query)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeProcesslist{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
}()

expected := []MetricResult{
{labels: labelMap{"command": "connect", "state": "waiting_for_handler_commit"}, value: 1, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "connect", "state": "waiting_for_handler_commit"}, value: 7271248, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "daemon", "state": "waiting_on_empty_queue"}, value: 1, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "daemon", "state": "waiting_on_empty_queue"}, value: 7271248, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "query", "state": "slave_has_read_all_relay_log_waiting_for_more_updates"}, value: 1, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "query", "state": "slave_has_read_all_relay_log_waiting_for_more_updates"}, value: 7271248, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "sleep", "state": "blank"}, value: 25, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "sleep", "state": "blank"}, value: 1025, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
got := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, got)
}
})

// Ensure all SQL queries were executed
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

0 comments on commit 689fc87

Please sign in to comment.