Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mysql: type conversion follow-up #9966

Merged
merged 6 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 41 additions & 31 deletions plugins/inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mysql

import (
"bytes"
"database/sql"
"fmt"
"strconv"
Expand Down Expand Up @@ -657,11 +656,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu

func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
if m.MetricVersion < 2 {
v, ok := v1.ParseValue(value)
if ok {
return v, nil
}
return v, fmt.Errorf("could not parse value: %q", string(value))
return v1.ParseValue(value)
}
return v2.ConvertGlobalVariables(key, value)
}
Expand Down Expand Up @@ -692,7 +687,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
// scanning keys and values separately

// get columns names, and create an array with its length
cols, err := rows.Columns()
cols, err := rows.ColumnTypes()
if err != nil {
return err
}
Expand All @@ -706,20 +701,31 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
}
// range over columns, and try to parse values
for i, col := range cols {
colName := col.Name()

if m.MetricVersion >= 2 {
col = strings.ToLower(col)
colName = strings.ToLower(colName)
}

colValue := *vals[i].(*sql.RawBytes)

if m.GatherAllSlaveChannels &&
(strings.ToLower(col) == "channel_name" || strings.ToLower(col) == "connection_name") {
(strings.ToLower(colName) == "channel_name" || strings.ToLower(colName) == "connection_name") {
// Since the default channel name is empty, we need this block
channelName := "default"
if len(*vals[i].(*sql.RawBytes)) > 0 {
channelName = string(*vals[i].(*sql.RawBytes))
if len(colValue) > 0 {
channelName = string(colValue)
}
tags["channel"] = channelName
} else if value, ok := m.parseValue(*vals[i].(*sql.RawBytes)); ok {
fields["slave_"+col] = value
continue
}

if colValue == nil || len(colValue) == 0 {
continue
}

if value, err := m.parseValueByDatabaseTypeName(colValue, col.DatabaseTypeName()); err != nil {
fields["slave_"+colName] = value
}
}
acc.AddFields("mysql", fields, tags)
Expand Down Expand Up @@ -1901,34 +1907,38 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
return nil
}

func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) {
func (m *Mysql) parseValueByDatabaseTypeName(value sql.RawBytes, databaseTypeName string) (interface{}, error) {
if m.MetricVersion < 2 {
return v1.ParseValue(value)
}
return parseValue(value)
}

// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
func parseValue(value sql.RawBytes) (interface{}, bool) {
if bytes.EqualFold(value, []byte("YES")) || bytes.Equal(value, []byte("ON")) {
return 1, true
switch databaseTypeName {
case "INT":
return v2.ParseInt(value)
case "BIGINT":
return v2.ParseUint(value)
case "VARCHAR":
return v2.ParseString(value)
default:
return v2.ParseValue(value)
}
}

if bytes.EqualFold(value, []byte("NO")) || bytes.Equal(value, []byte("OFF")) {
return 0, true
}
func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) {
var data interface{}
var err error

if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
return val, true
}
if val, err := strconv.ParseFloat(string(value), 64); err == nil {
return val, true
if m.MetricVersion < 2 {
data, err = v1.ParseValue(value)
} else {
data, err = v2.ParseValue(value)
}

if len(string(value)) > 0 {
return string(value), true
if err != nil {
return nil, false
}
return nil, false

return data, true
}

// findThreadState can be used to find thread state by command and plain state
Expand Down
27 changes: 1 addition & 26 deletions plugins/inputs/mysql/mysql_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mysql

import (
"database/sql"
"fmt"
"testing"

Expand Down Expand Up @@ -178,31 +177,7 @@ func TestMysqlDNSAddTimeout(t *testing.T) {
}
}
}
func TestParseValue(t *testing.T) {
testCases := []struct {
rawByte sql.RawBytes
output interface{}
boolValue bool
}{
{sql.RawBytes("123"), int64(123), true},
{sql.RawBytes("abc"), "abc", true},
{sql.RawBytes("10.1"), 10.1, true},
{sql.RawBytes("ON"), 1, true},
{sql.RawBytes("OFF"), 0, true},
{sql.RawBytes("NO"), 0, true},
{sql.RawBytes("YES"), 1, true},
{sql.RawBytes("No"), 0, true},
{sql.RawBytes("Yes"), 1, true},
{sql.RawBytes("-794"), int64(-794), true},
{sql.RawBytes("18446744073709552333"), float64(18446744073709552000), true},
{sql.RawBytes(""), nil, false},
}
for _, cases := range testCases {
if got, ok := parseValue(cases.rawByte); got != cases.output && ok != cases.boolValue {
t.Errorf("for %s wanted %t, got %t", string(cases.rawByte), cases.output, got)
}
}
}

func TestNewNamespace(t *testing.T) {
testCases := []struct {
words []string
Expand Down
8 changes: 4 additions & 4 deletions plugins/inputs/mysql/v1/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ var Mappings = []*Mapping{
},
}

func ParseValue(value sql.RawBytes) (float64, bool) {
func ParseValue(value sql.RawBytes) (float64, error) {
if bytes.Equal(value, []byte("Yes")) || bytes.Equal(value, []byte("ON")) {
return 1, true
return 1, nil
}

if bytes.Equal(value, []byte("No")) || bytes.Equal(value, []byte("OFF")) {
return 0, true
return 0, nil
}
n, err := strconv.ParseFloat(string(value), 64)
return n, err == nil
return n, err
}
12 changes: 10 additions & 2 deletions plugins/inputs/mysql/v2/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func ParseUint(value sql.RawBytes) (interface{}, error) {
return strconv.ParseUint(string(value), 10, 64)
}

func ParseFloat(value sql.RawBytes) (interface{}, error) {
return strconv.ParseFloat(string(value), 64)
}

func ParseBoolAsInteger(value sql.RawBytes) (interface{}, error) {
if bytes.EqualFold(value, []byte("YES")) || bytes.EqualFold(value, []byte("ON")) {
return int64(1), nil
Expand Down Expand Up @@ -86,11 +90,15 @@ var GlobalStatusConversions = map[string]ConversionFunc{
"innodb_data_pending_fsyncs": ParseUint,
"ssl_ctx_verify_depth": ParseUint,
"ssl_verify_depth": ParseUint,

// see https://galeracluster.com/library/documentation/galera-status-variables.html
"wsrep_local_index": ParseUint,
"wsrep_local_send_queue_avg": ParseFloat,
}

// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
var GlobalVariableConversions = map[string]ConversionFunc{
// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
"delay_key_write": ParseString, // ON, OFF, ALL
"enforce_gtid_consistency": ParseString, // ON, OFF, WARN
"event_scheduler": ParseString, // YES, NO, DISABLED
Expand Down
38 changes: 38 additions & 0 deletions plugins/inputs/mysql/v2/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,41 @@ func TestCovertGlobalVariables(t *testing.T) {
})
}
}

func TestParseValue(t *testing.T) {
testCases := []struct {
rawByte sql.RawBytes
output interface{}
err string
}{
{sql.RawBytes("123"), int64(123), ""},
{sql.RawBytes("abc"), "abc", ""},
{sql.RawBytes("10.1"), 10.1, ""},
{sql.RawBytes("ON"), 1, ""},
{sql.RawBytes("OFF"), 0, ""},
{sql.RawBytes("NO"), 0, ""},
{sql.RawBytes("YES"), 1, ""},
{sql.RawBytes("No"), 0, ""},
{sql.RawBytes("Yes"), 1, ""},
{sql.RawBytes("-794"), int64(-794), ""},
{sql.RawBytes("2147483647"), int64(2147483647), ""}, // max int32
{sql.RawBytes("2147483648"), int64(2147483648), ""}, // too big for int32
{sql.RawBytes("9223372036854775807"), int64(9223372036854775807), ""}, // max int64
{sql.RawBytes("9223372036854775808"), uint64(9223372036854775808), ""}, // too big for int64
{sql.RawBytes("18446744073709551615"), uint64(18446744073709551615), ""}, // max uint64
{sql.RawBytes("18446744073709551616"), float64(18446744073709552000), ""}, // too big for uint64
{sql.RawBytes("18446744073709552333"), float64(18446744073709552000), ""}, // too big for uint64
{sql.RawBytes(""), nil, "foo"},
}
for _, cases := range testCases {
got, err := ParseValue(cases.rawByte)

if err != nil && cases.err == "" {
t.Errorf("for %s got unexpected error: %s", string(cases.rawByte), err)
} else if err == nil && cases.err != "" {
t.Errorf("for %s did not get expected error: %s", string(cases.rawByte), cases.err)
} else if got != cases.output {
t.Errorf("for %s wanted %#v (%T), got %#v (%T)", string(cases.rawByte), cases.output, cases.output, got, got)
}
}
}