diff --git a/tsdb/engine.go b/tsdb/engine.go index 25a010297f3..9d3548f2b67 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -55,6 +55,7 @@ type Engine interface { CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error DeleteSeriesRange(itr SeriesIterator, min, max int64) error + DeleteSeriesRangeWithPredicate(itr SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index cdcc6060fc2..4c1bb65ed20 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1253,6 +1253,12 @@ func (e *Engine) WritePoints(points []models.Point) error { // DeleteSeriesRange removes the values between min and max (inclusive) from all series func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error { + return e.DeleteSeriesRangeWithPredicate(itr, min, max, nil) +} + +// DeleteSeriesRangeWithPredicate removes the values between min and max (inclusive) from all series +// for which predicate() returns true. If predicate() is nil, then all values in range are removed. +func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error { var disableOnce bool // Ensure that the index does not compact away the measurement or series we're @@ -1277,6 +1283,8 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro return err } else if elem == nil { break + } else if predicate != nil && !predicate(elem.Name(), elem.Tags()) { + continue } if elem.Expr() != nil { diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index a0836427012..aefb2db83f0 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1189,6 +1189,131 @@ func TestEngine_DeleteSeriesRange(t *testing.T) { } } +func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) { + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { + // Create a few points. + p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted + p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted + p3 := MustParsePointString("cpu,host=B value=1.3 3000000000") + p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") + p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted + p6 := MustParsePointString("mem,host=B value=1.3 1000000000") + p7 := MustParsePointString("mem,host=C value=1.3 1000000000") + p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted + + e, err := NewEngine(index) + if err != nil { + t.Fatal(err) + } + + // mock the planner so compactions don't run during the test + e.CompactionPlan = &mockPlanner{} + if err := e.Open(); err != nil { + t.Fatal(err) + } + defer e.Close() + + for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { + if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { + t.Fatalf("create series index error: %v", err) + } + } + + if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + if err := e.WriteSnapshot(); err != nil { + t.Fatalf("failed to snapshot: %s", err.Error()) + } + + keys := e.FileStore.Keys() + if exp, got := 6, len(keys); exp != got { + t.Fatalf("series count mismatch: exp %v, got %v", exp, got) + } + + itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}} + predicate := func(name []byte, tags models.Tags) bool { + if bytes.Equal(name, []byte("mem")) { + return true + } + if bytes.Equal(name, []byte("cpu")) { + for _, tag := range tags { + if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) { + return true + } + } + } + return false + } + if err := e.DeleteSeriesRangeWithPredicate(itr, math.MinInt64, math.MaxInt64, predicate); err != nil { + t.Fatalf("failed to delete series: %v", err) + } + + keys = e.FileStore.Keys() + if exp, got := 3, len(keys); exp != got { + t.Fatalf("series count mismatch: exp %v, got %v", exp, got) + } + + exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"} + for _, exp := range exps { + if _, ok := keys[exp]; !ok { + t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys) + } + } + + // Check that the series still exists in the index + indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} + iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) + if err != nil { + t.Fatalf("iterator error: %v", err) + } + defer iter.Close() + + elem, err := iter.Next() + if err != nil { + t.Fatal(err) + } + if elem.SeriesID == 0 { + t.Fatalf("series index mismatch: EOF, exp 2 series") + } + + // Lookup series. + name, tags := e.sfile.Series(elem.SeriesID) + if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { + t.Fatalf("series mismatch: got %s, exp %s", got, exp) + } + + if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) { + t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags) + } + iter.Close() + + // Deleting remaining series should remove them from the series. + itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}} + if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil { + t.Fatalf("failed to delete series: %v", err) + } + + indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} + if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { + t.Fatalf("iterator error: %v", err) + } + if iter == nil { + return + } + + defer iter.Close() + if elem, err = iter.Next(); err != nil { + t.Fatal(err) + } + if elem.SeriesID != 0 { + t.Fatalf("got an undeleted series id, but series should be dropped from index") + } + }) + } +} + func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { diff --git a/tsdb/shard.go b/tsdb/shard.go index 907584c78be..0b80e0d0fcd 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -696,6 +696,16 @@ func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error { return engine.DeleteSeriesRange(itr, min, max) } +// DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive) +// for which predicate() returns true. If predicate() is nil, then all values in range are deleted. +func (s *Shard) DeleteSeriesRangeWithPredicate(itr SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error { + engine, err := s.engine() + if err != nil { + return err + } + return engine.DeleteSeriesRangeWithPredicate(itr, min, max, predicate) +} + // DeleteMeasurement deletes a measurement and all underlying series. func (s *Shard) DeleteMeasurement(name []byte) error { engine, err := s.engine()