diff --git a/collector/info_schema_processlist.go b/collector/info_schema_processlist.go old mode 100644 new mode 100755 index 16fcea5ef..02f829224 --- a/collector/info_schema_processlist.go +++ b/collector/info_schema_processlist.go @@ -19,6 +19,8 @@ import ( "context" "database/sql" "fmt" + "reflect" + "sort" "strings" "github.com/go-kit/log" @@ -30,16 +32,15 @@ const infoSchemaProcesslistQuery = ` SELECT user, SUBSTRING_INDEX(host, ':', 1) AS host, - COALESCE(command,'') AS command, - COALESCE(state,'') AS state, - count(*) AS processes, - sum(time) AS seconds + COALESCE(command, '') AS command, + COALESCE(state, '') AS state, + COUNT(*) AS processes, + SUM(time) AS seconds FROM information_schema.processlist WHERE ID != connection_id() AND TIME >= %d - GROUP BY user,SUBSTRING_INDEX(host, ':', 1),command,state - ORDER BY null - ` + GROUP BY user, SUBSTRING_INDEX(host, ':', 1), command, state + ` // Tunable flags. var ( @@ -60,104 +61,23 @@ var ( // Metric descriptors. var ( processlistCountDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, informationSchema, "threads"), - "The number of threads (connections) split by current state.", - []string{"state"}, nil) + prometheus.BuildFQName(namespace, informationSchema, "processlist_threads"), + "The number of threads split by current state.", + []string{"command", "state"}, nil) processlistTimeDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, informationSchema, "threads_seconds"), - "The number of seconds threads (connections) have used split by current state.", - []string{"state"}, nil) + prometheus.BuildFQName(namespace, informationSchema, "processlist_seconds"), + "The number of seconds threads have used split by current state.", + []string{"command", "state"}, nil) processesByUserDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, informationSchema, "processes_by_user"), + prometheus.BuildFQName(namespace, informationSchema, "processlist_processes_by_user"), "The number of processes by user.", []string{"mysql_user"}, nil) processesByHostDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, informationSchema, "processes_by_host"), + prometheus.BuildFQName(namespace, informationSchema, "processlist_processes_by_host"), "The number of processes by host.", []string{"client_host"}, nil) ) -// whitelist for connection/process states in SHOW PROCESSLIST -// tokudb uses the state column for "Queried about _______ rows" -var ( - // TODO: might need some more keys for other MySQL versions or other storage engines - // see https://dev.mysql.com/doc/refman/5.7/en/general-thread-states.html - threadStateCounterMap = map[string]uint32{ - "after create": uint32(0), - "altering table": uint32(0), - "analyzing": uint32(0), - "checking permissions": uint32(0), - "checking table": uint32(0), - "cleaning up": uint32(0), - "closing tables": uint32(0), - "converting heap to myisam": uint32(0), - "copying to tmp table": uint32(0), - "creating sort index": uint32(0), - "creating table": uint32(0), - "creating tmp table": uint32(0), - "deleting": uint32(0), - "executing": uint32(0), - "execution of init_command": uint32(0), - "end": uint32(0), - "freeing items": uint32(0), - "flushing tables": uint32(0), - "fulltext initialization": uint32(0), - "idle": uint32(0), - "init": uint32(0), - "killed": uint32(0), - "waiting for lock": uint32(0), - "logging slow query": uint32(0), - "login": uint32(0), - "manage keys": uint32(0), - "opening tables": uint32(0), - "optimizing": uint32(0), - "preparing": uint32(0), - "reading from net": uint32(0), - "removing duplicates": uint32(0), - "removing tmp table": uint32(0), - "reopen tables": uint32(0), - "repair by sorting": uint32(0), - "repair done": uint32(0), - "repair with keycache": uint32(0), - "replication master": uint32(0), - "rolling back": uint32(0), - "searching rows for update": uint32(0), - "sending data": uint32(0), - "sorting for group": uint32(0), - "sorting for order": uint32(0), - "sorting index": uint32(0), - "sorting result": uint32(0), - "statistics": uint32(0), - "updating": uint32(0), - "waiting for tables": uint32(0), - "waiting for table flush": uint32(0), - "waiting on cond": uint32(0), - "writing to net": uint32(0), - "other": uint32(0), - } - threadStateMapping = map[string]string{ - "user sleep": "idle", - "creating index": "altering table", - "committing alter table to storage engine": "altering table", - "discard or import tablespace": "altering table", - "rename": "altering table", - "setup": "altering table", - "renaming result table": "altering table", - "preparing for alter table": "altering table", - "copying to group table": "copying to tmp table", - "copy to tmp table": "copying to tmp table", - "query end": "end", - "update": "updating", - "updating main table": "updating", - "updating reference tables": "updating", - "system lock": "waiting for lock", - "user lock": "waiting for lock", - "table lock": "waiting for lock", - "deleting from main table": "deleting", - "deleting from reference tables": "deleting", - } -) - // ScrapeProcesslist collects from `information_schema.processlist`. type ScrapeProcesslist struct{} @@ -189,83 +109,102 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prome defer processlistRows.Close() var ( - user string - host string - command string - state string - processes uint32 - time uint32 + user string + host string + command string + state string + count uint32 + time uint32 ) - stateCounts := make(map[string]uint32, len(threadStateCounterMap)) - stateTime := make(map[string]uint32, len(threadStateCounterMap)) - hostCount := make(map[string]uint32) - userCount := make(map[string]uint32) - for k, v := range threadStateCounterMap { - stateCounts[k] = v - stateTime[k] = v - } + // Define maps + stateCounts := make(map[string]map[string]uint32) + stateTime := make(map[string]map[string]uint32) + stateHostCounts := make(map[string]uint32) + stateUserCounts := make(map[string]uint32) for processlistRows.Next() { - err = processlistRows.Scan(&user, &host, &command, &state, &processes, &time) + err = processlistRows.Scan(&user, &host, &command, &state, &count, &time) if err != nil { return err } - realState := deriveThreadState(command, state) - stateCounts[realState] += processes - stateTime[realState] += time - hostCount[host] = hostCount[host] + processes - userCount[user] = userCount[user] + processes - } + command = sanitizeState(command) + state = sanitizeState(state) + if host == "" { + host = "unknown" + } - if *processesByHostFlag { - for host, processes := range hostCount { - ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(processes), host) + // Init maps + if _, ok := stateCounts[command]; !ok { + stateCounts[command] = make(map[string]uint32) + stateTime[command] = make(map[string]uint32) + } + if _, ok := stateCounts[command][state]; !ok { + stateCounts[command][state] = 0 + stateTime[command][state] = 0 + } + if _, ok := stateHostCounts[host]; !ok { + stateHostCounts[host] = 0 } + if _, ok := stateUserCounts[user]; !ok { + stateUserCounts[user] = 0 + } + + stateCounts[command][state] += count + stateTime[command][state] += time + stateHostCounts[host] += count + stateUserCounts[user] += count } - if *processesByUserFlag { - for user, processes := range userCount { - ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(processes), user) + for _, command := range sortedMapKeys(stateCounts) { + for _, state := range sortedMapKeys(stateCounts[command]) { + ch <- prometheus.MustNewConstMetric(processlistCountDesc, prometheus.GaugeValue, float64(stateCounts[command][state]), command, state) + ch <- prometheus.MustNewConstMetric(processlistTimeDesc, prometheus.GaugeValue, float64(stateTime[command][state]), command, state) } } - for state, processes := range stateCounts { - ch <- prometheus.MustNewConstMetric(processlistCountDesc, prometheus.GaugeValue, float64(processes), state) + if *processesByHostFlag { + for _, host := range sortedMapKeys(stateHostCounts) { + ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host) + } } - for state, time := range stateTime { - ch <- prometheus.MustNewConstMetric(processlistTimeDesc, prometheus.GaugeValue, float64(time), state) + if *processesByUserFlag { + for _, user := range sortedMapKeys(stateUserCounts) { + ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user) + } } return nil } -func deriveThreadState(command string, state string) string { - var normCmd = strings.Replace(strings.ToLower(command), "_", " ", -1) - var normState = strings.Replace(strings.ToLower(state), "_", " ", -1) - // check if it's already a valid state - _, knownState := threadStateCounterMap[normState] - if knownState { - return normState - } - // check if plain mapping applies - mappedState, canMap := threadStateMapping[normState] - if canMap { - return mappedState - } - // check special waiting for XYZ lock - if strings.Contains(normState, "waiting for") && strings.Contains(normState, "lock") { - return "waiting for lock" +func sortedMapKeys(m interface{}) []string { + v := reflect.ValueOf(m) + keys := make([]string, 0, len(v.MapKeys())) + for _, key := range v.MapKeys() { + keys = append(keys, key.String()) } - if normCmd == "sleep" && normState == "" { - return "idle" + sort.Strings(keys) + return keys +} + +func sanitizeState(state string) string { + if state == "" { + state = "unknown" } - if normCmd == "query" { - return "executing" + state = strings.ToLower(state) + replacements := map[string]string{ + ";": "", + ",": "", + ":": "", + ".": "", + "(": "", + ")": "", + " ": "_", + "-": "_", } - if normCmd == "binlog dump" { - return "replication master" + for r := range replacements { + state = strings.Replace(state, r, replacements[r], -1) } - return "other" + return state } // check interface diff --git a/collector/info_schema_processlist_test.go b/collector/info_schema_processlist_test.go new file mode 100644 index 000000000..8887ea7be --- /dev/null +++ b/collector/info_schema_processlist_test.go @@ -0,0 +1,97 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "fmt" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" + "gopkg.in/alecthomas/kingpin.v2" +) + +func TestScrapeProcesslist(t *testing.T) { + _, err := kingpin.CommandLine.Parse([]string{ + "--collect.info_schema.processlist.processes_by_user", + "--collect.info_schema.processlist.processes_by_host", + }) + if err != nil { + t.Fatal(err) + } + + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("error opening a stub database connection: %s", err) + } + defer db.Close() + + query := fmt.Sprintf(infoSchemaProcesslistQuery, 0) + columns := []string{"user", "host", "command", "state", "processes", "seconds"} + rows := sqlmock.NewRows(columns). + AddRow("manager", "10.0.7.234", "Sleep", "", 10, 87). + AddRow("feedback", "10.0.7.154", "Sleep", "", 8, 842). + AddRow("root", "10.0.7.253", "Sleep", "", 1, 20). + AddRow("feedback", "10.0.7.179", "Sleep", "", 2, 14). + AddRow("system user", "", "Connect", "waiting for handler commit", 1, 7271248). + AddRow("manager", "10.0.7.234", "Sleep", "", 4, 62). + AddRow("system user", "", "Query", "Slave has read all relay log; waiting for more updates", 1, 7271248). + AddRow("event_scheduler", "localhost", "Daemon", "Waiting on empty queue", 1, 7271248) + mock.ExpectQuery(sanitizeQuery(query)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + if err = (ScrapeProcesslist{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil { + t.Errorf("error calling function on test: %s", err) + } + close(ch) + }() + + expected := []MetricResult{ + {labels: labelMap{"command": "connect", "state": "waiting_for_handler_commit"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"command": "connect", "state": "waiting_for_handler_commit"}, value: 7271248, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"command": "daemon", "state": "waiting_on_empty_queue"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"command": "daemon", "state": "waiting_on_empty_queue"}, value: 7271248, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"command": "query", "state": "slave_has_read_all_relay_log_waiting_for_more_updates"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"command": "query", "state": "slave_has_read_all_relay_log_waiting_for_more_updates"}, value: 7271248, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"command": "sleep", "state": "unknown"}, value: 25, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"command": "sleep", "state": "unknown"}, value: 1025, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"client_host": "10.0.7.154"}, value: 8, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"client_host": "10.0.7.179"}, value: 2, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"client_host": "10.0.7.234"}, value: 14, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"client_host": "10.0.7.253"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"client_host": "localhost"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"client_host": "unknown"}, value: 2, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"mysql_user": "event_scheduler"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"mysql_user": "feedback"}, value: 10, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"mysql_user": "manager"}, value: 14, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"mysql_user": "root"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"mysql_user": "system user"}, value: 2, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + got := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, got) + } + }) + + // Ensure all SQL queries were executed + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +}