Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: hard limit on field size while parsing line protocol #22525

Merged
merged 1 commit into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/influxd/launcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,11 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
Flag: "storage-validate-keys",
Desc: "Validates incoming writes to ensure keys only have valid unicode characters.",
},
{
DestP: &o.StorageConfig.Data.SkipFieldSizeValidation,
Flag: "storage-no-validate-field-size",
Desc: "Skip field-size validation on incoming writes.",
},
{
DestP: &o.StorageConfig.Data.CacheMaxMemorySize,
Flag: "storage-cache-max-memory-size",
Expand Down
56 changes: 56 additions & 0 deletions cmd/influxd/launcher/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
nethttp "net/http"
"strings"
"testing"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -54,6 +56,60 @@ func TestStorage_WriteAndQuery(t *testing.T) {
}
}

// Ensure the server will write all points possible with exception of
// - field type conflict
// - field too large
func TestStorage_PartialWrite(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l.ShutdownOrFail(t, ctx)

// Initial write of integer.
l.WritePointsOrFail(t, `cpu value=1i 946684800000000000`)

// Write mixed-field types.
err := l.WritePoints("cpu value=2i 946684800000000001\ncpu value=3 946684800000000002\ncpu value=4i 946684800000000003")
require.Error(t, err)

// Write oversized field value.
err = l.WritePoints(fmt.Sprintf(`cpu str="%s" 946684800000000004`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)))
require.Error(t, err)

// Write biggest field value.
l.WritePointsOrFail(t, fmt.Sprintf(`cpu str="%s" 946684800000000005`, strings.Repeat("a", tsdb.MaxFieldValueLength)))

// Ensure the valid points were written.
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z) |> keep(columns: ["_time","_field"])`

exp := `,result,table,_time,_field` + "\r\n" +
`,_result,0,2000-01-01T00:00:00.000000005Z,str` + "\r\n" + // str=max-length string
`,_result,1,2000-01-01T00:00:00Z,value` + "\r\n" + // value=1
`,_result,1,2000-01-01T00:00:00.000000001Z,value` + "\r\n" + // value=2
`,_result,1,2000-01-01T00:00:00.000000003Z,value` + "\r\n\r\n" // value=4

buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
require.NoError(t, err)
require.Equal(t, exp, string(buf))
}

func TestStorage_DisableMaxFieldValueSize(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
o.StorageConfig.Data.SkipFieldSizeValidation = true
})
defer l.ShutdownOrFail(t, ctx)

// Write a normally-oversized field value.
l.WritePointsOrFail(t, fmt.Sprintf(`cpu str="%s" 946684800000000000`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)))

// Check that the point can be queried.
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z) |> keep(columns: ["_value"])`
exp := `,result,table,_value` + "\r\n" +
fmt.Sprintf(`,_result,0,%s`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)) + "\r\n\r\n"

buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
require.NoError(t, err)
require.Equal(t, exp, string(buf))
}

func TestLauncher_WriteAndQuery(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l.ShutdownOrFail(t, ctx)
Expand Down
3 changes: 3 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type Config struct {
// Enables unicode validation on series keys on write.
ValidateKeys bool `toml:"validate-keys"`

// When true, skips size validation on fields
SkipFieldSizeValidation bool `toml:"skip-field-size-validation"`

// Query logging
QueryLogEnabled bool `toml:"query-log-enabled"`

Expand Down
5 changes: 2 additions & 3 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,8 @@ type EngineOptions struct {
// nil will allow all combinations to pass.
ShardFilter func(database, rp string, id uint64) bool

Config Config
SeriesIDSets SeriesIDSets
FieldValidator FieldValidator
Config Config
SeriesIDSets SeriesIDSets

OnNewEngine func(Engine)

Expand Down
31 changes: 22 additions & 9 deletions tsdb/field_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,31 @@ import (
"github.com/influxdata/influxql"
)

// FieldValidator should return a PartialWriteError if the point should not be written.
type FieldValidator interface {
Validate(mf *MeasurementFields, point models.Point) error
}

// defaultFieldValidator ensures that points do not use different types for fields that already exist.
type defaultFieldValidator struct{}
const MaxFieldValueLength = 1048576

// Validate will return a PartialWriteError if the point has inconsistent fields.
func (defaultFieldValidator) Validate(mf *MeasurementFields, point models.Point) error {
// ValidateFields will return a PartialWriteError if:
// - the point has inconsistent fields, or
// - the point has fields that are too long
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) error {
pointSize := point.StringSize()
iter := point.FieldIterator()
for iter.Next() {
if !skipSizeValidation {
// Check for size of field too large. Note it is much cheaper to check the whole point size
// than checking the StringValue size (StringValue potentially takes an allocation if it must
// unescape the string, and must at least parse the string)
if pointSize > MaxFieldValueLength && iter.Type() == models.String {
if sz := len(iter.StringValue()); sz > MaxFieldValueLength {
return PartialWriteError{
Reason: fmt.Sprintf(
"input field \"%s\" on measurement \"%s\" is too long, %d > %d",
iter.FieldKey(), point.Name(), sz, MaxFieldValueLength),
Dropped: 1,
}
}
}
}

// Skip fields name "time", they are illegal.
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
Expand Down
5 changes: 1 addition & 4 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ type Shard struct {
func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard {
db, rp := decodeStorePath(path)
logger := zap.NewNop()
if opt.FieldValidator == nil {
opt.FieldValidator = defaultFieldValidator{}
}

s := &Shard{
id: id,
Expand Down Expand Up @@ -646,7 +643,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
mf := engine.MeasurementFields(name)

// Check with the field validator.
if err := s.options.FieldValidator.Validate(mf, p); err != nil {
if err := ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); err != nil {
switch err := err.(type) {
case PartialWriteError:
if reason == "" {
Expand Down