Skip to content

Commit

Permalink
Add the instance struct to handle connections (#859)
Browse files Browse the repository at this point in the history
The intent is to use the instance struct to hold the connection
to the database as well as metadata about the instance:
- version
- flavor (mariadb or mysql)

Change is similar to prometheus-community/postgres_exporter#785

Signed-off-by: Vlad Gusev <vlad.esten@gmail.com>
  • Loading branch information
s10 authored Aug 12, 2024
1 parent dd8afce commit 31bc75a
Show file tree
Hide file tree
Showing 67 changed files with 267 additions and 156 deletions.
4 changes: 2 additions & 2 deletions collector/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -72,8 +71,9 @@ func (ScrapeBinlogSize) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeBinlogSize) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeBinlogSize) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
var logBin uint8
db := instance.getDB()
err := db.QueryRowContext(ctx, logbinQuery).Scan(&logBin)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion collector/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func TestScrapeBinlogSize(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}

mock.ExpectQuery(logbinQuery).WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))

columns := []string{"Log_name", "File_size"}
Expand All @@ -42,7 +44,7 @@ func TestScrapeBinlogSize(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeBinlogSize{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeBinlogSize{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
4 changes: 2 additions & 2 deletions collector/engine_innodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -52,7 +51,8 @@ func (ScrapeEngineInnodbStatus) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeEngineInnodbStatus) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeEngineInnodbStatus) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx, engineInnodbStatusQuery)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions collector/engine_innodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ END OF INNODB MONITOR OUTPUT
rows := sqlmock.NewRows(columns).AddRow("InnoDB", "", sample)

mock.ExpectQuery(sanitizeQuery(engineInnodbStatusQuery)).WillReturnRows(rows)

inst := &instance{db: db}
ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeEngineInnodbStatus{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeEngineInnodbStatus{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
3 changes: 2 additions & 1 deletion collector/engine_tokudb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (ScrapeEngineTokudbStatus) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeEngineTokudbStatus) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeEngineTokudbStatus) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
tokudbRows, err := db.QueryContext(ctx, engineTokudbStatusQuery)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion collector/engine_tokudb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestScrapeEngineTokudbStatus(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"Type", "Name", "Status"}
rows := sqlmock.NewRows(columns).
Expand All @@ -59,7 +60,7 @@ func TestScrapeEngineTokudbStatus(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeEngineTokudbStatus{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeEngineTokudbStatus{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
44 changes: 8 additions & 36 deletions collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ package collector

import (
"context"
"database/sql"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -38,18 +35,12 @@ const (

// SQL queries and parameters.
const (
versionQuery = `SELECT @@version`

// System variable params formatting.
// See: https://github.com/go-sql-driver/mysql#system-variables
sessionSettingsParam = `log_slow_filter=%27tmp_table_on_disk,filesort_on_disk%27`
timeoutParam = `lock_wait_timeout=%d`
)

var (
versionRE = regexp.MustCompile(`^\d+\.\d+`)
)

// Tunable flags.
var (
exporterLockTimeout = kingpin.Flag(
Expand Down Expand Up @@ -92,6 +83,7 @@ type Exporter struct {
logger log.Logger
dsn string
scrapers []Scraper
instance *instance
}

// New returns a new MySQL exporter for the provided DSN.
Expand Down Expand Up @@ -135,27 +127,23 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) float64 {
var err error
scrapeTime := time.Now()
db, err := sql.Open("mysql", e.dsn)
instance, err := newInstance(e.dsn)
if err != nil {
level.Error(e.logger).Log("msg", "Error opening connection to database", "err", err)
return 0.0
}
defer db.Close()

// By design exporter should use maximum one connection per request.
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
// Set max lifetime for a connection.
db.SetConnMaxLifetime(1 * time.Minute)
defer instance.Close()
e.instance = instance

if err := db.PingContext(ctx); err != nil {
if err := instance.Ping(); err != nil {
level.Error(e.logger).Log("msg", "Error pinging mysqld", "err", err)
return 0.0
}

ch <- prometheus.MustNewConstMetric(mysqlScrapeDurationSeconds, prometheus.GaugeValue, time.Since(scrapeTime).Seconds(), "connection")

version := getMySQLVersion(db, e.logger)
version := instance.versionMajorMinor

var wg sync.WaitGroup
defer wg.Wait()
for _, scraper := range e.scrapers {
Expand All @@ -169,7 +157,7 @@ func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) floa
label := "collect." + scraper.Name()
scrapeTime := time.Now()
collectorSuccess := 1.0
if err := scraper.Scrape(ctx, db, ch, log.With(e.logger, "scraper", scraper.Name())); err != nil {
if err := scraper.Scrape(ctx, instance, ch, log.With(e.logger, "scraper", scraper.Name())); err != nil {
level.Error(e.logger).Log("msg", "Error from scraper", "scraper", scraper.Name(), "target", e.getTargetFromDsn(), "err", err)
collectorSuccess = 0.0
}
Expand All @@ -189,19 +177,3 @@ func (e *Exporter) getTargetFromDsn() string {
}
return dsnConfig.Addr
}

func getMySQLVersion(db *sql.DB, logger log.Logger) float64 {
var versionStr string
var versionNum float64
if err := db.QueryRow(versionQuery).Scan(&versionStr); err == nil {
versionNum, _ = strconv.ParseFloat(versionRE.FindString(versionStr), 64)
} else {
level.Debug(logger).Log("msg", "Error querying version", "err", err)
}
// If we can't match/parse the version, set it some big value that matches all versions.
if versionNum == 0 {
level.Debug(logger).Log("msg", "Error parsing version string", "version", versionStr)
versionNum = 999
}
return versionNum
}
20 changes: 0 additions & 20 deletions collector/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ package collector

import (
"context"
"database/sql"
"os"
"testing"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/smartystreets/goconvey/convey"
Expand Down Expand Up @@ -68,20 +65,3 @@ func TestExporter(t *testing.T) {
}
})
}

func TestGetMySQLVersion(t *testing.T) {
if testing.Short() {
t.Skip("-short is passed, skipping test")
}

logger := log.NewLogfmtLogger(os.Stderr)
logger = level.NewFilter(logger, level.AllowDebug())

convey.Convey("Version parsing", t, func() {
db, err := sql.Open("mysql", dsn)
convey.So(err, convey.ShouldBeNil)
defer db.Close()

convey.So(getMySQLVersion(db, logger), convey.ShouldBeBetweenOrEqual, 5.6, 12.0)
})
}
3 changes: 2 additions & 1 deletion collector/global_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func (ScrapeGlobalStatus) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeGlobalStatus) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeGlobalStatus) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
globalStatusRows, err := db.QueryContext(ctx, globalStatusQuery)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion collector/global_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestScrapeGlobalStatus(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"Variable_name", "Value"}
rows := sqlmock.NewRows(columns).
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestScrapeGlobalStatus(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeGlobalStatus{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeGlobalStatus{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
3 changes: 2 additions & 1 deletion collector/global_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func (ScrapeGlobalVariables) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeGlobalVariables) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeGlobalVariables) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
globalVariablesRows, err := db.QueryContext(ctx, globalVariablesQuery)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion collector/global_variables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestScrapeGlobalVariables(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"Variable_name", "Value"}
rows := sqlmock.NewRows(columns).
Expand All @@ -52,7 +53,7 @@ func TestScrapeGlobalVariables(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeGlobalVariables{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeGlobalVariables{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
3 changes: 2 additions & 1 deletion collector/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func nowExpr() string {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeHeartbeat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeHeartbeat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable)
heartbeatRows, err := db.QueryContext(ctx, query)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion collector/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ func TestScrapeHeartbeat(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

rows := sqlmock.NewRows(tt.Columns).
AddRow("1487597613.001320", "1487598113.448042", 1)
mock.ExpectQuery(sanitizeQuery(tt.Query)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
4 changes: 2 additions & 2 deletions collector/info_schema_auto_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -70,7 +69,8 @@ func (ScrapeAutoIncrementColumns) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeAutoIncrementColumns) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeAutoIncrementColumns) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
autoIncrementRows, err := db.QueryContext(ctx, infoSchemaAutoIncrementQuery)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions collector/info_schema_clientstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"
"fmt"
"strings"

Expand Down Expand Up @@ -161,8 +160,9 @@ func (ScrapeClientStat) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeClientStat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeClientStat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
var varName, varVal string
db := instance.getDB()
err := db.QueryRowContext(ctx, userstatCheckQuery).Scan(&varName, &varVal)
if err != nil {
level.Debug(logger).Log("msg", "Detailed client stats are not available.")
Expand Down
3 changes: 2 additions & 1 deletion collector/info_schema_clientstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestScrapeClientStat(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

mock.ExpectQuery(sanitizeQuery(userstatCheckQuery)).WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("userstat", "ON"))
Expand All @@ -41,7 +42,7 @@ func TestScrapeClientStat(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeClientStat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeClientStat{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
4 changes: 2 additions & 2 deletions collector/info_schema_innodb_cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -77,7 +76,8 @@ func (ScrapeInnodbCmp) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeInnodbCmp) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeInnodbCmp) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
informationSchemaInnodbCmpRows, err := db.QueryContext(ctx, innodbCmpQuery)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion collector/info_schema_innodb_cmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestScrapeInnodbCmp(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"page_size", "compress_ops", "compress_ops_ok", "compress_time", "uncompress_ops", "uncompress_time"}
rows := sqlmock.NewRows(columns).
Expand All @@ -38,7 +39,7 @@ func TestScrapeInnodbCmp(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeInnodbCmp{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeInnodbCmp{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
Loading

0 comments on commit 31bc75a

Please sign in to comment.