Skip to content

Commit

Permalink
fix(error): SELECT INTO doesn't return error with unsupported value (#…
Browse files Browse the repository at this point in the history
…20429)

When a SELECT INTO query generates an illegal value that cannot be inserted,
like +/- Inf, it should return an error, rather than failing silently.
This adds a boolean parameter to the [data] section of influxdb.conf:
* strict-error-handling
When false, the default, the old behavior is preserved.  When true,
unsupported values will return an error from SELECT INTO queries

Fixes #20426
  • Loading branch information
davidby-influx authored Dec 31, 2020
1 parent 2e26dc6 commit 9e33be2
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
11 changes: 6 additions & 5 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,12 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
MetaClient: s.MetaClient,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
},
Monitor: s.Monitor,
PointsWriter: s.PointsWriter,
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
StrictErrorHandling: s.TSDBStore.EngineOptions.Config.StrictErrorHandling,
Monitor: s.Monitor,
PointsWriter: s.PointsWriter,
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
}
s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
Expand Down
20 changes: 13 additions & 7 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type StatementExecutor struct {
WritePointsInto(*IntoWriteRequest) error
}

// Disallow INF values in SELECT INTO and other previously ignored errors
StrictErrorHandling bool

// Select statement limits
MaxSelectPointN int
MaxSelectSeriesN int
Expand Down Expand Up @@ -573,7 +576,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen

// Write points back into system for INTO statements.
if stmt.Target != nil {
n, err := e.writeInto(pointsWriter, stmt, row)
n, err := e.writeInto(pointsWriter, stmt, row, e.StrictErrorHandling)
if err != nil {
return err
}
Expand Down Expand Up @@ -1188,7 +1191,7 @@ func (w *BufferedPointsWriter) Len() int { return len(w.buf) }
// Cap returns the capacity (in points) of the buffer.
func (w *BufferedPointsWriter) Cap() int { return cap(w.buf) }

func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) (n int64, err error) {
func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row, strictErrorHandling bool) (n int64, err error) {
if stmt.Target.Measurement.Database == "" {
return 0, errNoDatabaseInTarget
}
Expand All @@ -1205,7 +1208,7 @@ func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectState
name = row.Name
}

points, err := convertRowToPoints(name, row)
points, err := convertRowToPoints(name, row, strictErrorHandling)
if err != nil {
return 0, err
}
Expand All @@ -1224,7 +1227,7 @@ func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectState
var errNoDatabaseInTarget = errors.New("no database in target")

// convertRowToPoints will convert a query result Row into Points that can be written back in.
func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
func convertRowToPoints(measurementName string, row *models.Row, strictErrorHandling bool) ([]models.Point, error) {
// figure out which parts of the result are the time and which are the fields
timeIndex := -1
fieldIndexes := make(map[string]int)
Expand Down Expand Up @@ -1256,13 +1259,16 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point

p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time))
if err != nil {
// Drop points that can't be stored
continue
if !strictErrorHandling {
// Drop points that can't be stored
continue
} else {
return nil, err
}
}

points = append(points, p)
}

return points, nil
}

Expand Down
4 changes: 4 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
# log any sensitive data contained within a query.
# query-log-enabled = true

# Provides more error checking. For example, SELECT INTO will err out inserting an +/-Inf value
# rather than silently failing.
# strict-error-handling = false

# Validates incoming writes to ensure keys only have valid unicode characters.
# This setting will incur a small overhead because every key must be checked.
# validate-keys = false
Expand Down
7 changes: 6 additions & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type Config struct {
// Enables unicode validation on series keys on write.
ValidateKeys bool `toml:"validate-keys"`

// Enables strict error handling. For example, forces SELECT INTO to err out on INF values.
StrictErrorHandling bool `toml:"strict-error-handling"`

// Query logging
QueryLogEnabled bool `toml:"query-log-enabled"`

Expand Down Expand Up @@ -155,7 +158,8 @@ func NewConfig() Config {
Engine: DefaultEngine,
Index: DefaultIndex,

QueryLogEnabled: true,
StrictErrorHandling: false,
QueryLogEnabled: true,

CacheMaxMemorySize: toml.Size(DefaultCacheMaxMemorySize),
CacheSnapshotMemorySize: toml.Size(DefaultCacheSnapshotMemorySize),
Expand Down Expand Up @@ -229,6 +233,7 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
"dir": c.Dir,
"wal-dir": c.WALDir,
"wal-fsync-delay": c.WALFsyncDelay,
"strict-error-handling": c.StrictErrorHandling,
"cache-max-memory-size": c.CacheMaxMemorySize,
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
Expand Down

0 comments on commit 9e33be2

Please sign in to comment.