diff --git a/CHANGELOG.md b/CHANGELOG.md index f3fc2b7b530..dc2517e6fb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## v1.0.0 [unreleased] +### Breaking changes + +* `max-series-per-database` was added with a default of 1M but can be disabled by setting it to `0`. Existing databases with series that exceed this limit will continue to load but writes that would create new series will fail. + ### Release Notes * Config option `[cluster]` has been replaced with `[coordinator]` @@ -46,6 +50,7 @@ With this release the systemd configuration files for InfluxDB will use the syst - [#7050](https://github.com/influxdata/influxdb/pull/7050): Update go package library dependencies. - [#5750](https://github.com/influxdata/influxdb/issues/5750): Support wildcards in aggregate functions. - [#7605](https://github.com/influxdata/influxdb/issues/7605): Remove IF EXISTS/IF NOT EXISTS from influxql language. +- [#7095](https://github.com/influxdata/influxdb/pull/7095): Add MaxSeriesPerDatabase config setting. ### Bugfixes diff --git a/tsdb/config.go b/tsdb/config.go index c7e5a5ed58a..c02b6a7865c 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -36,6 +36,9 @@ const ( // DefaultMaxPointsPerBlock is the maximum number of points in an encoded // block in a TSM file DefaultMaxPointsPerBlock = 1000 + + // DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database. + DefaultMaxSeriesPerDatabase = 1000000 ) // Config holds the configuration for the tsbd package. @@ -57,6 +60,13 @@ type Config struct { CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"` MaxPointsPerBlock int `toml:"max-points-per-block"` + // Limits + + // MaxSeriesPerDatabase is the maximum number of series a node can hold per database. + // When this limit is exceeded, writes return a 'max series per database exceeded' error. + // A value of 0 disables the limit. + MaxSeriesPerDatabase int `toml:"max-series-per-database"` + TraceLoggingEnabled bool `toml:"trace-logging-enabled"` } @@ -74,6 +84,8 @@ func NewConfig() Config { CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration), CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration), + MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase, + TraceLoggingEnabled: false, } } diff --git a/tsdb/shard.go b/tsdb/shard.go index f63ea72c7d0..39ca5819980 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -451,6 +451,10 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate, key := string(p.Key()) ss := s.index.Series(key) if ss == nil { + if s.options.Config.MaxSeriesPerDatabase > 0 && len(s.index.series)+1 > s.options.Config.MaxSeriesPerDatabase { + return nil, fmt.Errorf("max series per database exceeded: %s", key) + } + ss = NewSeries(key, p.Tags()) atomic.AddInt64(&s.stats.SeriesCreated, 1) } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 2c4dab9a4ae..714048364cf 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1,6 +1,7 @@ package tsdb_test import ( + "fmt" "io/ioutil" "os" "path" @@ -96,6 +97,59 @@ func TestShardWriteAndIndex(t *testing.T) { } } +func TestMaxSeriesLimit(t *testing.T) { + tmpDir, _ := ioutil.TempDir("", "shard_test") + defer os.RemoveAll(tmpDir) + tmpShard := path.Join(tmpDir, "shard") + tmpWal := path.Join(tmpDir, "wal") + + index := tsdb.NewDatabaseIndex("db") + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = filepath.Join(tmpDir, "wal") + opts.Config.MaxSeriesPerDatabase = 1000 + + sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + + if err := sh.Open(); err != nil { + t.Fatalf("error opening shard: %s", err.Error()) + } + + // Writing 1K series should succeed. + points := []models.Point{} + + for i := 0; i < 1000; i++ { + pt := models.MustNewPoint( + "cpu", + map[string]string{"host": fmt.Sprintf("server%d", i)}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + points = append(points, pt) + } + + err := sh.WritePoints(points) + if err != nil { + t.Fatalf(err.Error()) + } + + // Writing one more series should exceed the series limit. + pt := models.MustNewPoint( + "cpu", + map[string]string{"host": "server9999"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + + err = sh.WritePoints([]models.Point{pt}) + if err == nil { + t.Fatal("expected error") + } else if err.Error() != "max series per database exceeded: cpu,host=server9999" { + t.Fatalf("unexpected error messag:\n\texp = max series per database exceeded: cpu,host=server9999\n\tgot = %s", err.Error()) + } + + sh.Close() +} + func TestShardWriteAddNewField(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "shard_test") defer os.RemoveAll(tmpDir)