-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: mysql: type conversion follow-up #9966
Merged
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
9f16cbe
mysql: drop mysql.parseValue which is basically v2.ParseValue
fxedel 76a2a43
mysql: Parse slave status by column type (fixes #6671)
fxedel 4359e2b
Parse some wsrep variables as float (see #5055)
fxedel c619286
mysql: fix requested changes
fxedel 34ef7b9
mysql: fix convert test
fxedel 6f78334
Add errors to accumulator
fxedel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
package mysql | ||
|
||
import ( | ||
"bytes" | ||
"database/sql" | ||
"fmt" | ||
"strconv" | ||
|
@@ -657,11 +656,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu | |
|
||
func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) { | ||
if m.MetricVersion < 2 { | ||
v, ok := v1.ParseValue(value) | ||
if ok { | ||
return v, nil | ||
} | ||
return v, fmt.Errorf("could not parse value: %q", string(value)) | ||
return v1.ParseValue(value) | ||
} | ||
return v2.ConvertGlobalVariables(key, value) | ||
} | ||
|
@@ -692,35 +687,53 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu | |
// scanning keys and values separately | ||
|
||
// get columns names, and create an array with its length | ||
cols, err := rows.Columns() | ||
cols, err := rows.ColumnTypes() | ||
if err != nil { | ||
return err | ||
} | ||
vals := make([]interface{}, len(cols)) | ||
vals := make([]sql.RawBytes, len(cols)) | ||
valPtrs := make([]interface{}, len(cols)) | ||
// fill the array with sql.Rawbytes | ||
for i := range vals { | ||
vals[i] = &sql.RawBytes{} | ||
vals[i] = sql.RawBytes{} | ||
valPtrs[i] = &vals[i] | ||
} | ||
if err = rows.Scan(vals...); err != nil { | ||
if err = rows.Scan(valPtrs...); err != nil { | ||
return err | ||
} | ||
|
||
// range over columns, and try to parse values | ||
for i, col := range cols { | ||
colName := col.Name() | ||
|
||
if m.MetricVersion >= 2 { | ||
col = strings.ToLower(col) | ||
colName = strings.ToLower(colName) | ||
} | ||
|
||
colValue := vals[i] | ||
|
||
if m.GatherAllSlaveChannels && | ||
(strings.ToLower(col) == "channel_name" || strings.ToLower(col) == "connection_name") { | ||
(strings.ToLower(colName) == "channel_name" || strings.ToLower(colName) == "connection_name") { | ||
// Since the default channel name is empty, we need this block | ||
channelName := "default" | ||
if len(*vals[i].(*sql.RawBytes)) > 0 { | ||
channelName = string(*vals[i].(*sql.RawBytes)) | ||
if len(colValue) > 0 { | ||
channelName = string(colValue) | ||
} | ||
tags["channel"] = channelName | ||
} else if value, ok := m.parseValue(*vals[i].(*sql.RawBytes)); ok { | ||
fields["slave_"+col] = value | ||
continue | ||
} | ||
|
||
if colValue == nil || len(colValue) == 0 { | ||
continue | ||
} | ||
|
||
value, err := m.parseValueByDatabaseTypeName(colValue, col.DatabaseTypeName()) | ||
if err != nil { | ||
m.Log.Debugf("Error parsing %s=%q: %v", colName, string(colValue), err) | ||
continue | ||
} | ||
|
||
fields["slave_"+colName] = value | ||
} | ||
acc.AddFields("mysql", fields, tags) | ||
|
||
|
@@ -1343,10 +1356,16 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu | |
if err := rows.Scan(&key, &val); err != nil { | ||
return err | ||
} | ||
|
||
key = strings.ToLower(key) | ||
if value, ok := m.parseValue(val); ok { | ||
fields[key] = value | ||
value, err := m.parseValue(val) | ||
if err != nil { | ||
m.Log.Debugf("Error parsing %s=%q: %v", key, string(val), err) | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. |
||
} | ||
|
||
fields[key] = value | ||
|
||
// Send 20 fields at a time | ||
if len(fields) >= 20 { | ||
acc.AddFields("mysql_innodb", fields, tags) | ||
|
@@ -1901,34 +1920,29 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula | |
return nil | ||
} | ||
|
||
func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) { | ||
func (m *Mysql) parseValueByDatabaseTypeName(value sql.RawBytes, databaseTypeName string) (interface{}, error) { | ||
if m.MetricVersion < 2 { | ||
return v1.ParseValue(value) | ||
} | ||
return parseValue(value) | ||
} | ||
|
||
// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1 | ||
func parseValue(value sql.RawBytes) (interface{}, bool) { | ||
if bytes.EqualFold(value, []byte("YES")) || bytes.Equal(value, []byte("ON")) { | ||
return 1, true | ||
} | ||
|
||
if bytes.EqualFold(value, []byte("NO")) || bytes.Equal(value, []byte("OFF")) { | ||
return 0, true | ||
switch databaseTypeName { | ||
case "INT": | ||
return v2.ParseInt(value) | ||
case "BIGINT": | ||
return v2.ParseUint(value) | ||
case "VARCHAR": | ||
return v2.ParseString(value) | ||
default: | ||
return v2.ParseValue(value) | ||
} | ||
} | ||
|
||
if val, err := strconv.ParseInt(string(value), 10, 64); err == nil { | ||
return val, true | ||
} | ||
if val, err := strconv.ParseFloat(string(value), 64); err == nil { | ||
return val, true | ||
func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, error) { | ||
if m.MetricVersion < 2 { | ||
return v1.ParseValue(value) | ||
} | ||
|
||
if len(string(value)) > 0 { | ||
return string(value), true | ||
} | ||
return nil, false | ||
return v2.ParseValue(value) | ||
} | ||
|
||
// findThreadState can be used to find thread state by command and plain state | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is really an error please use
acc.AddError()
to make those visible. In any case, I think debugging is too invisible as you can only see those when starting telegraf in debug mode. So eitheracc.AddError()
orm.Log.Errorf()
here.