Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mysql: type conversion follow-up #9966

Merged
merged 6 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 52 additions & 38 deletions plugins/inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mysql

import (
"bytes"
"database/sql"
"fmt"
"strconv"
Expand Down Expand Up @@ -657,11 +656,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu

func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
if m.MetricVersion < 2 {
v, ok := v1.ParseValue(value)
if ok {
return v, nil
}
return v, fmt.Errorf("could not parse value: %q", string(value))
return v1.ParseValue(value)
}
return v2.ConvertGlobalVariables(key, value)
}
Expand Down Expand Up @@ -692,35 +687,53 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
// scanning keys and values separately

// get columns names, and create an array with its length
cols, err := rows.Columns()
cols, err := rows.ColumnTypes()
if err != nil {
return err
}
vals := make([]interface{}, len(cols))
vals := make([]sql.RawBytes, len(cols))
valPtrs := make([]interface{}, len(cols))
// fill the array with sql.Rawbytes
for i := range vals {
vals[i] = &sql.RawBytes{}
vals[i] = sql.RawBytes{}
valPtrs[i] = &vals[i]
}
if err = rows.Scan(vals...); err != nil {
if err = rows.Scan(valPtrs...); err != nil {
return err
}

// range over columns, and try to parse values
for i, col := range cols {
colName := col.Name()

if m.MetricVersion >= 2 {
col = strings.ToLower(col)
colName = strings.ToLower(colName)
}

colValue := vals[i]

if m.GatherAllSlaveChannels &&
(strings.ToLower(col) == "channel_name" || strings.ToLower(col) == "connection_name") {
(strings.ToLower(colName) == "channel_name" || strings.ToLower(colName) == "connection_name") {
// Since the default channel name is empty, we need this block
channelName := "default"
if len(*vals[i].(*sql.RawBytes)) > 0 {
channelName = string(*vals[i].(*sql.RawBytes))
if len(colValue) > 0 {
channelName = string(colValue)
}
tags["channel"] = channelName
} else if value, ok := m.parseValue(*vals[i].(*sql.RawBytes)); ok {
fields["slave_"+col] = value
continue
}

if colValue == nil || len(colValue) == 0 {
continue
}

value, err := m.parseValueByDatabaseTypeName(colValue, col.DatabaseTypeName())
if err != nil {
m.Log.Debugf("Error parsing %s=%q: %v", colName, string(colValue), err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is really an error please use acc.AddError() to make those visible. In any case, I think debugging is too invisible as you can only see those when starting telegraf in debug mode. So either acc.AddError() or m.Log.Errorf() here.

continue
}

fields["slave_"+colName] = value
}
acc.AddFields("mysql", fields, tags)

Expand Down Expand Up @@ -1343,10 +1356,16 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
if err := rows.Scan(&key, &val); err != nil {
return err
}

key = strings.ToLower(key)
if value, ok := m.parseValue(val); ok {
fields[key] = value
value, err := m.parseValue(val)
if err != nil {
m.Log.Debugf("Error parsing %s=%q: %v", key, string(val), err)
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

}

fields[key] = value

// Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql_innodb", fields, tags)
Expand Down Expand Up @@ -1901,34 +1920,29 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
return nil
}

func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) {
func (m *Mysql) parseValueByDatabaseTypeName(value sql.RawBytes, databaseTypeName string) (interface{}, error) {
if m.MetricVersion < 2 {
return v1.ParseValue(value)
}
return parseValue(value)
}

// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
func parseValue(value sql.RawBytes) (interface{}, bool) {
if bytes.EqualFold(value, []byte("YES")) || bytes.Equal(value, []byte("ON")) {
return 1, true
}

if bytes.EqualFold(value, []byte("NO")) || bytes.Equal(value, []byte("OFF")) {
return 0, true
switch databaseTypeName {
case "INT":
return v2.ParseInt(value)
case "BIGINT":
return v2.ParseUint(value)
case "VARCHAR":
return v2.ParseString(value)
default:
return v2.ParseValue(value)
}
}

if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
return val, true
}
if val, err := strconv.ParseFloat(string(value), 64); err == nil {
return val, true
func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, error) {
if m.MetricVersion < 2 {
return v1.ParseValue(value)
}

if len(string(value)) > 0 {
return string(value), true
}
return nil, false
return v2.ParseValue(value)
}

// findThreadState can be used to find thread state by command and plain state
Expand Down
27 changes: 1 addition & 26 deletions plugins/inputs/mysql/mysql_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mysql

import (
"database/sql"
"fmt"
"testing"

Expand Down Expand Up @@ -178,31 +177,7 @@ func TestMysqlDNSAddTimeout(t *testing.T) {
}
}
}
func TestParseValue(t *testing.T) {
testCases := []struct {
rawByte sql.RawBytes
output interface{}
boolValue bool
}{
{sql.RawBytes("123"), int64(123), true},
{sql.RawBytes("abc"), "abc", true},
{sql.RawBytes("10.1"), 10.1, true},
{sql.RawBytes("ON"), 1, true},
{sql.RawBytes("OFF"), 0, true},
{sql.RawBytes("NO"), 0, true},
{sql.RawBytes("YES"), 1, true},
{sql.RawBytes("No"), 0, true},
{sql.RawBytes("Yes"), 1, true},
{sql.RawBytes("-794"), int64(-794), true},
{sql.RawBytes("18446744073709552333"), float64(18446744073709552000), true},
{sql.RawBytes(""), nil, false},
}
for _, cases := range testCases {
if got, ok := parseValue(cases.rawByte); got != cases.output && ok != cases.boolValue {
t.Errorf("for %s wanted %t, got %t", string(cases.rawByte), cases.output, got)
}
}
}

func TestNewNamespace(t *testing.T) {
testCases := []struct {
words []string
Expand Down
8 changes: 4 additions & 4 deletions plugins/inputs/mysql/v1/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ var Mappings = []*Mapping{
},
}

func ParseValue(value sql.RawBytes) (float64, bool) {
func ParseValue(value sql.RawBytes) (float64, error) {
if bytes.Equal(value, []byte("Yes")) || bytes.Equal(value, []byte("ON")) {
return 1, true
return 1, nil
}

if bytes.Equal(value, []byte("No")) || bytes.Equal(value, []byte("OFF")) {
return 0, true
return 0, nil
}
n, err := strconv.ParseFloat(string(value), 64)
return n, err == nil
return n, err
}
12 changes: 10 additions & 2 deletions plugins/inputs/mysql/v2/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func ParseUint(value sql.RawBytes) (interface{}, error) {
return strconv.ParseUint(string(value), 10, 64)
}

func ParseFloat(value sql.RawBytes) (interface{}, error) {
return strconv.ParseFloat(string(value), 64)
}

func ParseBoolAsInteger(value sql.RawBytes) (interface{}, error) {
if bytes.EqualFold(value, []byte("YES")) || bytes.EqualFold(value, []byte("ON")) {
return int64(1), nil
Expand Down Expand Up @@ -86,11 +90,15 @@ var GlobalStatusConversions = map[string]ConversionFunc{
"innodb_data_pending_fsyncs": ParseUint,
"ssl_ctx_verify_depth": ParseUint,
"ssl_verify_depth": ParseUint,

// see https://galeracluster.com/library/documentation/galera-status-variables.html
"wsrep_local_index": ParseUint,
"wsrep_local_send_queue_avg": ParseFloat,
}

// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
var GlobalVariableConversions = map[string]ConversionFunc{
// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
"delay_key_write": ParseString, // ON, OFF, ALL
"enforce_gtid_consistency": ParseString, // ON, OFF, WARN
"event_scheduler": ParseString, // YES, NO, DISABLED
Expand Down
41 changes: 41 additions & 0 deletions plugins/inputs/mysql/v2/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"database/sql"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -84,3 +85,43 @@ func TestCovertGlobalVariables(t *testing.T) {
})
}
}

func TestParseValue(t *testing.T) {
testCases := []struct {
rawByte sql.RawBytes
output interface{}
err string
}{
{sql.RawBytes("123"), int64(123), ""},
{sql.RawBytes("abc"), "abc", ""},
{sql.RawBytes("10.1"), 10.1, ""},
{sql.RawBytes("ON"), 1, ""},
{sql.RawBytes("OFF"), 0, ""},
{sql.RawBytes("NO"), 0, ""},
{sql.RawBytes("YES"), 1, ""},
{sql.RawBytes("No"), 0, ""},
{sql.RawBytes("Yes"), 1, ""},
{sql.RawBytes("-794"), int64(-794), ""},
{sql.RawBytes("2147483647"), int64(2147483647), ""}, // max int32
{sql.RawBytes("2147483648"), int64(2147483648), ""}, // too big for int32
{sql.RawBytes("9223372036854775807"), int64(9223372036854775807), ""}, // max int64
{sql.RawBytes("9223372036854775808"), uint64(9223372036854775808), ""}, // too big for int64
{sql.RawBytes("18446744073709551615"), uint64(18446744073709551615), ""}, // max uint64
{sql.RawBytes("18446744073709551616"), float64(18446744073709552000), ""}, // too big for uint64
{sql.RawBytes("18446744073709552333"), float64(18446744073709552000), ""}, // too big for uint64
{sql.RawBytes(""), nil, "unconvertible value"},
}
for _, cases := range testCases {
got, err := ParseValue(cases.rawByte)

if err != nil && cases.err == "" {
t.Errorf("for %q got unexpected error: %q", string(cases.rawByte), err.Error())
} else if err != nil && !strings.HasPrefix(err.Error(), cases.err) {
t.Errorf("for %q wanted error %q, got %q", string(cases.rawByte), cases.err, err.Error())
} else if err == nil && cases.err != "" {
t.Errorf("for %q did not get expected error: %s", string(cases.rawByte), cases.err)
} else if got != cases.output {
t.Errorf("for %q wanted %#v (%T), got %#v (%T)", string(cases.rawByte), cases.output, cases.output, got, got)
}
}
}