Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to delete many series with predicate #9643

Merged
merged 1 commit into from
Mar 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Engine interface {
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
DeleteSeriesRangeWithPredicate(itr SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error

MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
Expand Down
8 changes: 8 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,12 @@ func (e *Engine) WritePoints(points []models.Point) error {

// DeleteSeriesRange removes the values between min and max (inclusive) from all series
func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error {
return e.DeleteSeriesRangeWithPredicate(itr, min, max, nil)
}

// DeleteSeriesRangeWithPredicate removes the values between min and max (inclusive) from all series
// for which predicate() returns true. If predicate() is nil, then all values in range are removed.
func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error {
var disableOnce bool

// Ensure that the index does not compact away the measurement or series we're
Expand All @@ -1277,6 +1283,8 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
return err
} else if elem == nil {
break
} else if predicate != nil && !predicate(elem.Name(), elem.Tags()) {
continue
}

if elem.Expr() != nil {
Expand Down
125 changes: 125 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,131 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
}
}

func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted
p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted
p3 := MustParsePointString("cpu,host=B value=1.3 3000000000")
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted
p6 := MustParsePointString("mem,host=B value=1.3 1000000000")
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted

e, err := NewEngine(index)
if err != nil {
t.Fatal(err)
}

// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if err := e.Open(); err != nil {
t.Fatal(err)
}
defer e.Close()

for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
t.Fatalf("create series index error: %v", err)
}
}

if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}

keys := e.FileStore.Keys()
if exp, got := 6, len(keys); exp != got {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}

itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}}
predicate := func(name []byte, tags models.Tags) bool {
if bytes.Equal(name, []byte("mem")) {
return true
}
if bytes.Equal(name, []byte("cpu")) {
for _, tag := range tags {
if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) {
return true
}
}
}
return false
}
if err := e.DeleteSeriesRangeWithPredicate(itr, math.MinInt64, math.MaxInt64, predicate); err != nil {
t.Fatalf("failed to delete series: %v", err)
}

keys = e.FileStore.Keys()
if exp, got := 3, len(keys); exp != got {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}

exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"}
for _, exp := range exps {
if _, ok := keys[exp]; !ok {
t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys)
}
}

// Check that the series still exists in the index
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
if err != nil {
t.Fatalf("iterator error: %v", err)
}
defer iter.Close()

elem, err := iter.Next()
if err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
t.Fatalf("series index mismatch: EOF, exp 2 series")
}

// Lookup series.
name, tags := e.sfile.Series(elem.SeriesID)
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
}

if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) {
t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags)
}
iter.Close()

// Deleting remaining series should remove them from the series.
itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}}
if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
t.Fatalf("failed to delete series: %v", err)
}

indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
t.Fatalf("iterator error: %v", err)
}
if iter == nil {
return
}

defer iter.Close()
if elem, err = iter.Next(); err != nil {
t.Fatal(err)
}
if elem.SeriesID != 0 {
t.Fatalf("got an undeleted series id, but series should be dropped from index")
}
})
}
}

func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,16 @@ func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error {
return engine.DeleteSeriesRange(itr, min, max)
}

// DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive)
// for which predicate() returns true. If predicate() is nil, then all values in range are deleted.
func (s *Shard) DeleteSeriesRangeWithPredicate(itr SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error {
engine, err := s.engine()
if err != nil {
return err
}
return engine.DeleteSeriesRangeWithPredicate(itr, min, max, predicate)
}

// DeleteMeasurement deletes a measurement and all underlying series.
func (s *Shard) DeleteMeasurement(name []byte) error {
engine, err := s.engine()
Expand Down