Skip to content

Commit

Permalink
Deprecate DeduplicateSeriesSet() in favor of NewMergeSeriesSet().
Browse files Browse the repository at this point in the history
Federation makes use of dedupedSeriesSet to merge SeriesSets for every
query into one output stream. If many match[] arguments are provided,
many dedupedSeriesSet objects will get chained. This has the downside of
causing a potential O(n*k) running time, where n is the number of series
and k the number of match[] arguments.

In the mean time, the storage package provides a mergeSeriesSet that
accomplishes the same with an O(n*log(k)) running time by making use of
a binary heap. Let's just get rid of dedupedSeriesSet and change all
existing callers to use mergeSeriesSet.
  • Loading branch information
EdSchouten committed Dec 10, 2017
1 parent e0d917e commit bb724f1
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 83 deletions.
6 changes: 4 additions & 2 deletions storage/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) {
}
seriesSets = append(seriesSets, set)
}
return newMergeSeriesSet(seriesSets), nil
return NewMergeSeriesSet(seriesSets), nil
}

// LabelValues returns all potential values for a label name.
Expand Down Expand Up @@ -300,7 +300,9 @@ type mergeSeriesSet struct {
sets []SeriesSet
}

func newMergeSeriesSet(sets []SeriesSet) SeriesSet {
// NewMergeSeriesSet returns a new series set that merges (deduplicates)
// series returned by the input series sets when iterating.
func NewMergeSeriesSet(sets []SeriesSet) SeriesSet {
// Sets need to be pre-advanced, so we can introspect the label of the
// series under the cursor.
var h seriesSetHeap
Expand Down
2 changes: 1 addition & 1 deletion storage/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestMergeSeriesSet(t *testing.T) {
),
},
} {
merged := newMergeSeriesSet(tc.input)
merged := NewMergeSeriesSet(tc.input)
for merged.Next() {
require.True(t, tc.expected.Next())
actualSeries := merged.At()
Expand Down
70 changes: 0 additions & 70 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,73 +110,3 @@ type SeriesIterator interface {
// Err returns the current error.
Err() error
}

// dedupedSeriesSet takes two series sets and returns them deduplicated.
// The input sets must be sorted and identical if two series exist in both, i.e.
// if their label sets are equal, the datapoints must be equal as well.
type dedupedSeriesSet struct {
a, b SeriesSet

cur Series
adone, bdone bool
}

// DeduplicateSeriesSet merges two SeriesSet and removes duplicates.
// If two series exist in both sets, their datapoints must be equal.
func DeduplicateSeriesSet(a, b SeriesSet) SeriesSet {
if a == nil {
return b
}
if b == nil {
return a
}

s := &dedupedSeriesSet{a: a, b: b}
s.adone = !s.a.Next()
s.bdone = !s.b.Next()

return s
}

func (s *dedupedSeriesSet) At() Series {
return s.cur
}

func (s *dedupedSeriesSet) Err() error {
if s.a.Err() != nil {
return s.a.Err()
}
return s.b.Err()
}

func (s *dedupedSeriesSet) compare() int {
if s.adone {
return 1
}
if s.bdone {
return -1
}
return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
}

func (s *dedupedSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil {
return false
}

d := s.compare()

// Both sets contain the current series. Chain them into a single one.
if d > 0 {
s.cur = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
s.cur = s.a.At()
s.adone = !s.a.Next()
} else {
s.cur = s.a.At()
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
}
return true
}
7 changes: 3 additions & 4 deletions web/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,18 +394,17 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
}
defer q.Close()

var set storage.SeriesSet

var sets []storage.SeriesSet
for _, mset := range matcherSets {
s, err := q.Select(mset...)
if err != nil {
return nil, &apiError{errorExec, err}
}
set = storage.DeduplicateSeriesSet(set, s)
sets = append(sets, s)
}

set := storage.NewMergeSeriesSet(sets)
metrics := []labels.Labels{}

for set.Next() {
metrics = append(metrics, set.At().Labels())
}
Expand Down
9 changes: 3 additions & 6 deletions web/federate.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,18 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {

vec := make(promql.Vector, 0, 8000)

var set storage.SeriesSet

var sets []storage.SeriesSet
for _, mset := range matcherSets {
s, err := q.Select(mset...)
if err != nil {
federationErrors.Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
set = storage.DeduplicateSeriesSet(set, s)
}
if set == nil {
return
sets = append(sets, s)
}

set := storage.NewMergeSeriesSet(sets)
for set.Next() {
s := set.At()

Expand Down

0 comments on commit bb724f1

Please sign in to comment.