Skip to content

Commit

Permalink
Add ability to delete many series with predicate
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobmarble committed Mar 28, 2018
1 parent 4044d41 commit 470ee7f
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 0 deletions.
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Engine interface {
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
DeleteSeriesRangeWithPredicate(itr SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error

MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
Expand Down
8 changes: 8 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,12 @@ func (e *Engine) WritePoints(points []models.Point) error {

// DeleteSeriesRange removes the values between min and max (inclusive) from all series
func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error {
return e.DeleteSeriesRangeWithPredicate(itr, min, max, nil)
}

// DeleteSeriesRangeWithPredicate removes the values between min and max (inclusive) from all series
// for which predicate() returns true. If predicate() is nil, then all values in range are removed.
func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error {
var disableOnce bool

// Ensure that the index does not compact away the measurement or series we're
Expand All @@ -1277,6 +1283,8 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
return err
} else if elem == nil {
break
} else if predicate != nil && !predicate(elem.Name(), elem.Tags()) {
continue
}

if elem.Expr() != nil {
Expand Down
125 changes: 125 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,131 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
}
}

func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted
p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted
p3 := MustParsePointString("cpu,host=B value=1.3 3000000000")
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted
p6 := MustParsePointString("mem,host=B value=1.3 1000000000")
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted

e, err := NewEngine(index)
if err != nil {
t.Fatal(err)
}

// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if err := e.Open(); err != nil {
t.Fatal(err)
}
defer e.Close()

for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
t.Fatalf("create series index error: %v", err)
}
}

if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}

keys := e.FileStore.Keys()
if exp, got := 6, len(keys); exp != got {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}

itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}}
predicate := func(name []byte, tags models.Tags) bool {
if bytes.Equal(name, []byte("mem")) {
return true
}
if bytes.Equal(name, []byte("cpu")) {
for _, tag := range tags {
if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) {
return true
}
}
}
return false
}
if err := e.DeleteSeriesRangeWithPredicate(itr, math.MinInt64, math.MaxInt64, predicate); err != nil {
t.Fatalf("failed to delete series: %v", err)
}

keys = e.FileStore.Keys()
if exp, got := 3, len(keys); exp != got {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}

exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"}
for _, exp := range exps {
if _, ok := keys[exp]; !ok {
t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys)
}
}

// Check that the series still exists in the index
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
if err != nil {
t.Fatalf("iterator error: %v", err)
}
defer iter.Close()

elem, err := iter.Next()
if err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
t.Fatalf("series index mismatch: EOF, exp 2 series")
}

// Lookup series.
name, tags := e.sfile.Series(elem.SeriesID)
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
}

if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) {
t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags)
}
iter.Close()

// Deleting remaining series should remove them from the series.
itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}}
if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil {
t.Fatalf("failed to delete series: %v", err)
}

indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
t.Fatalf("iterator error: %v", err)
}
if iter == nil {
return
}

defer iter.Close()
if elem, err = iter.Next(); err != nil {
t.Fatal(err)
}
if elem.SeriesID != 0 {
t.Fatalf("got an undeleted series id, but series should be dropped from index")
}
})
}
}

func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,16 @@ func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error {
return engine.DeleteSeriesRange(itr, min, max)
}

// DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive)
// for which predicate() returns true. If predicate() is nil, then all values in range are deleted.
func (s *Shard) DeleteSeriesRangeWithPredicate(itr SeriesIterator, min, max int64, predicate func(name []byte, tags models.Tags) bool) error {
engine, err := s.engine()
if err != nil {
return err
}
return engine.DeleteSeriesRangeWithPredicate(itr, min, max, predicate)
}

// DeleteMeasurement deletes a measurement and all underlying series.
func (s *Shard) DeleteMeasurement(name []byte) error {
engine, err := s.engine()
Expand Down

0 comments on commit 470ee7f

Please sign in to comment.