diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d9d3e25520..dc90ab3bf92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Config option `[cluster]` has been replaced with `[coordinator]` * Support for config options `[collectd]` and `[opentsdb]` has been removed; use `[[collectd]]` and `[[opentsdb]]` instead. * Config option `data-logging-enabled` within the `[data]` section, has been renamed to `trace-logging-enabled`, and defaults to `false`. +* `max-series-per-database` was added with a default of 1M but can be disabled by setting it to `0`. With this release the systemd configuration files for InfluxDB will use the system configured default for logging and will no longer write files to `/var/log/influxdb` by default. On most systems, the logs will be directed to the systemd journal and can be accessed by `journalctl -u influxdb.service`. Consult the systemd journald documentation for configuring journald. @@ -44,6 +45,7 @@ With this release the systemd configuration files for InfluxDB will use the syst - [#7011](https://github.com/influxdata/influxdb/issues/7011): Create man pages for commands. - [#7050](https://github.com/influxdata/influxdb/pull/7050): Update go package library dependencies. - [#5750](https://github.com/influxdata/influxdb/issues/5750): Support wildcards in aggregate functions. +- [#7095](https://github.com/influxdata/influxdb/pull/7095): Add MaxSeriesPerDatabase config setting. ### Bugfixes diff --git a/tsdb/config.go b/tsdb/config.go index c7e5a5ed58a..c02b6a7865c 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -36,6 +36,9 @@ const ( // DefaultMaxPointsPerBlock is the maximum number of points in an encoded // block in a TSM file DefaultMaxPointsPerBlock = 1000 + + // DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database. + DefaultMaxSeriesPerDatabase = 1000000 ) // Config holds the configuration for the tsbd package. @@ -57,6 +60,13 @@ type Config struct { CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"` MaxPointsPerBlock int `toml:"max-points-per-block"` + // Limits + + // MaxSeriesPerDatabase is the maximum number of series a node can hold per database. + // When this limit is exceeded, writes return a 'max series per database exceeded' error. + // A value of 0 disables the limit. + MaxSeriesPerDatabase int `toml:"max-series-per-database"` + TraceLoggingEnabled bool `toml:"trace-logging-enabled"` } @@ -74,6 +84,8 @@ func NewConfig() Config { CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration), CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration), + MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase, + TraceLoggingEnabled: false, } } diff --git a/tsdb/shard.go b/tsdb/shard.go index f63ea72c7d0..39ca5819980 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -451,6 +451,10 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate, key := string(p.Key()) ss := s.index.Series(key) if ss == nil { + if s.options.Config.MaxSeriesPerDatabase > 0 && len(s.index.series)+1 > s.options.Config.MaxSeriesPerDatabase { + return nil, fmt.Errorf("max series per database exceeded: %s", key) + } + ss = NewSeries(key, p.Tags()) atomic.AddInt64(&s.stats.SeriesCreated, 1) } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 2c4dab9a4ae..714048364cf 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1,6 +1,7 @@ package tsdb_test import ( + "fmt" "io/ioutil" "os" "path" @@ -96,6 +97,59 @@ func TestShardWriteAndIndex(t *testing.T) { } } +func TestMaxSeriesLimit(t *testing.T) { + tmpDir, _ := ioutil.TempDir("", "shard_test") + defer os.RemoveAll(tmpDir) + tmpShard := path.Join(tmpDir, "shard") + tmpWal := path.Join(tmpDir, "wal") + + index := tsdb.NewDatabaseIndex("db") + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = filepath.Join(tmpDir, "wal") + opts.Config.MaxSeriesPerDatabase = 1000 + + sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + + if err := sh.Open(); err != nil { + t.Fatalf("error opening shard: %s", err.Error()) + } + + // Writing 1K series should succeed. + points := []models.Point{} + + for i := 0; i < 1000; i++ { + pt := models.MustNewPoint( + "cpu", + map[string]string{"host": fmt.Sprintf("server%d", i)}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + points = append(points, pt) + } + + err := sh.WritePoints(points) + if err != nil { + t.Fatalf(err.Error()) + } + + // Writing one more series should exceed the series limit. + pt := models.MustNewPoint( + "cpu", + map[string]string{"host": "server9999"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + + err = sh.WritePoints([]models.Point{pt}) + if err == nil { + t.Fatal("expected error") + } else if err.Error() != "max series per database exceeded: cpu,host=server9999" { + t.Fatalf("unexpected error messag:\n\texp = max series per database exceeded: cpu,host=server9999\n\tgot = %s", err.Error()) + } + + sh.Close() +} + func TestShardWriteAddNewField(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "shard_test") defer os.RemoveAll(tmpDir)