Skip to content

Commit

Permalink
Merge pull request #7095 from influxdata/dgn-cardinality-limits
Browse files Browse the repository at this point in the history
feat #6679: add series limit config setting
  • Loading branch information
dgnorton authored Aug 5, 2016
2 parents c6034c1 + 43809b0 commit 064db3c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## v1.0.0 [unreleased]

### Breaking changes

* `max-series-per-database` was added with a default of 1M but can be disabled by setting it to `0`. Existing databases with series that exceed this limit will continue to load but writes that would create new series will fail.

### Release Notes

* Config option `[cluster]` has been replaced with `[coordinator]`
Expand Down Expand Up @@ -46,6 +50,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#7050](https://github.com/influxdata/influxdb/pull/7050): Update go package library dependencies.
- [#5750](https://github.com/influxdata/influxdb/issues/5750): Support wildcards in aggregate functions.
- [#7605](https://github.com/influxdata/influxdb/issues/7605): Remove IF EXISTS/IF NOT EXISTS from influxql language.
- [#7095](https://github.com/influxdata/influxdb/pull/7095): Add MaxSeriesPerDatabase config setting.

### Bugfixes

Expand Down
12 changes: 12 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
// DefaultMaxPointsPerBlock is the maximum number of points in an encoded
// block in a TSM file
DefaultMaxPointsPerBlock = 1000

// DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database.
DefaultMaxSeriesPerDatabase = 1000000
)

// Config holds the configuration for the tsbd package.
Expand All @@ -57,6 +60,13 @@ type Config struct {
CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"`
MaxPointsPerBlock int `toml:"max-points-per-block"`

// Limits

// MaxSeriesPerDatabase is the maximum number of series a node can hold per database.
// When this limit is exceeded, writes return a 'max series per database exceeded' error.
// A value of 0 disables the limit.
MaxSeriesPerDatabase int `toml:"max-series-per-database"`

TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
}

Expand All @@ -74,6 +84,8 @@ func NewConfig() Config {
CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration),
CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration),

MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,

TraceLoggingEnabled: false,
}
}
Expand Down
4 changes: 4 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
key := string(p.Key())
ss := s.index.Series(key)
if ss == nil {
if s.options.Config.MaxSeriesPerDatabase > 0 && len(s.index.series)+1 > s.options.Config.MaxSeriesPerDatabase {
return nil, fmt.Errorf("max series per database exceeded: %s", key)
}

ss = NewSeries(key, p.Tags())
atomic.AddInt64(&s.stats.SeriesCreated, 1)
}
Expand Down
54 changes: 54 additions & 0 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsdb_test

import (
"fmt"
"io/ioutil"
"os"
"path"
Expand Down Expand Up @@ -96,6 +97,59 @@ func TestShardWriteAndIndex(t *testing.T) {
}
}

func TestMaxSeriesLimit(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")

index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.Config.MaxSeriesPerDatabase = 1000

sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)

if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}

// Writing 1K series should succeed.
points := []models.Point{}

for i := 0; i < 1000; i++ {
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": fmt.Sprintf("server%d", i)},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
points = append(points, pt)
}

err := sh.WritePoints(points)
if err != nil {
t.Fatalf(err.Error())
}

// Writing one more series should exceed the series limit.
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": "server9999"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)

err = sh.WritePoints([]models.Point{pt})
if err == nil {
t.Fatal("expected error")
} else if err.Error() != "max series per database exceeded: cpu,host=server9999" {
t.Fatalf("unexpected error messag:\n\texp = max series per database exceeded: cpu,host=server9999\n\tgot = %s", err.Error())
}

sh.Close()
}

func TestShardWriteAddNewField(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
Expand Down

0 comments on commit 064db3c

Please sign in to comment.