From 465614c173700fe751812679de16a39e69667af5 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Fri, 29 Aug 2014 18:17:07 -0400 Subject: [PATCH 1/7] Modify merge to work with regex --- cluster/cluster_configuration.go | 1 + common/heap/heap.go | 58 ++++++++ common/heap/heap_test.go | 58 ++++++++ coordinator/coordinator.go | 19 +++ datastore/point_iterator.go | 47 +++++-- engine/common_merge_engine.go | 232 ++++--------------------------- engine/engine.go | 14 +- engine/join_engine.go | 2 +- engine/merge_engine.go | 6 +- engine/merger.go | 191 +++++++++++++++++++++++++ engine/series_heap.go | 79 +++++++++++ engine/stream.go | 68 +++++++++ integration/data_test.go | 47 +++++++ parser/frees.c | 5 +- parser/from_clause.go | 3 + parser/parser.go | 25 +++- parser/parser_test.go | 8 ++ parser/query.yacc | 18 ++- parser/query_types.h | 4 +- parser/test_memory_leaks.sh | 3 + 20 files changed, 652 insertions(+), 236 deletions(-) create mode 100644 common/heap/heap.go create mode 100644 common/heap/heap_test.go create mode 100644 engine/merger.go create mode 100644 engine/series_heap.go create mode 100644 engine/stream.go diff --git a/cluster/cluster_configuration.go b/cluster/cluster_configuration.go index 1126cf3c02b..b0bf2115287 100644 --- a/cluster/cluster_configuration.go +++ b/cluster/cluster_configuration.go @@ -901,6 +901,7 @@ func (self *ClusterConfiguration) GetShardsForQuery(querySpec *parser.QuerySpec) if err != nil { return nil, err } + log.Debug("Querying %d shards for query", len(shards)) shards = self.getShardRange(querySpec, shards) if querySpec.IsAscending() { SortShardsByTimeAscending(shards) diff --git a/common/heap/heap.go b/common/heap/heap.go new file mode 100644 index 00000000000..c5f0e941594 --- /dev/null +++ b/common/heap/heap.go @@ -0,0 +1,58 @@ +// Implement a heap structure by providing three methods, Initialize, +// BubbleDown and BubbleUp. Initialize is O(n), while both Bubble +// methods are O(log n). The methods take any type that implement the +// sort.Interface interface. You can get a reverse Heap (a Max heap) +// by using sort.Reverse() on a sort.Interface to reverse the result +// of the Less() method. +package heap + +import "sort" + +func Initialize(s sort.Interface) { + l := s.Len() + // start with the right most node + for n := l - 1; n >= 0; n-- { + BubbleDown(s, n) + } +} + +func parent(s sort.Interface, i int) int { + if i%2 == 0 { + return (i - 1) / 2 + } + + return i / 2 +} + +func BubbleUp(s sort.Interface, i int) { + p := parent(s, i) + if p < 0 { + return + } + + if s.Less(i, p) { + s.Swap(i, p) + BubbleUp(s, p) + } +} + +func BubbleDown(s sort.Interface, i int) { + // length + l := s.Len() + lc := i*2 + 1 + if lc >= l { + return + } + + rc := i*2 + 2 + // the smaller child + sc := lc + if rc < l && s.Less(rc, lc) { + sc = rc + } + + if s.Less(sc, i) { + s.Swap(sc, i) + BubbleDown(s, sc) + } +} diff --git a/common/heap/heap_test.go b/common/heap/heap_test.go new file mode 100644 index 00000000000..ecc332f9e44 --- /dev/null +++ b/common/heap/heap_test.go @@ -0,0 +1,58 @@ +package heap + +import ( + "math/rand" + "sort" + "testing" + "time" + + "launchpad.net/gocheck" +) + +type HeapSuite struct{} + +// Hook up gocheck into the gotest runner. +func Test(t *testing.T) { + gocheck.TestingT(t) +} + +var _ = gocheck.Suite(&HeapSuite{}) + +func (_ *HeapSuite) SetUpSuite(c *gocheck.C) { + rand.Seed(time.Now().Unix()) +} + +func randomInts(bubbleUp bool) []int { + is := make([]int, 0, 100) + for i := 0; i < 100; i++ { + is = is[:i+1] + is[i] = rand.Intn(1000) + if bubbleUp { + BubbleUp(sort.IntSlice(is), i) + } + } + return is +} + +func (_ *HeapSuite) TestHeapSort(c *gocheck.C) { + for _, bubble := range []bool{true, false} { + is := randomInts(bubble) + temp := make([]int, len(is)) + copy(temp, is) + + if !bubble { + Initialize(sort.IntSlice(is)) + } + + sorted := make([]int, len(is)) + for i := range sorted { + sorted[i] = is[0] + l := len(is) + is, is[0] = is[:l-1], is[l-1] + BubbleDown(sort.IntSlice(is), 0) + } + + sort.Ints(temp) + c.Assert(sorted, gocheck.DeepEquals, temp) + } +} diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index d0a8815b32e..e1cb755a0ef 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -268,8 +268,27 @@ func (self *Coordinator) queryShards(querySpec *parser.QuerySpec, shards []*clus return nil } +func (self *Coordinator) expandRegex(spec *parser.QuerySpec) { + q := spec.SelectQuery() + if q == nil { + return + } + + if f := q.FromClause; f.Type == parser.FromClauseMergeFun { + series := self.clusterConfiguration.MetaStore.GetSeriesForDatabaseAndRegex(spec.Database(), q.FromClause.Regex) + f.Type = parser.FromClauseMerge + f.Regex = nil + for _, s := range series { + f.Names = append(f.Names, &parser.TableName{ + Name: &parser.Value{Name: s, Type: parser.ValueTableName}, + }) + } + } +} + // We call this function only if we have a Select query (not continuous) or Delete query func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, p engine.Processor) error { + self.expandRegex(querySpec) shards, processor, err := self.getShardsAndProcessor(querySpec, p) if err != nil { return err diff --git a/datastore/point_iterator.go b/datastore/point_iterator.go index 25741cfffa6..4fbd54a3816 100644 --- a/datastore/point_iterator.go +++ b/datastore/point_iterator.go @@ -24,6 +24,12 @@ type PointIterator struct { asc bool } +// Creates a new point iterator using the given column iterator, +// metadata columns, start and end time as well as the ascending +// flag. The iterator returned is already placed at the first point, +// there's no need to call Next() after the call to NewPointIterator, +// but the user should check Valid() to make sure the iterator is +// pointing at a valid point. func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startTime, endTime time.Time, asc bool) PointIterator { pi := PointIterator{ valid: true, @@ -41,6 +47,9 @@ func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startT return pi } +// public api + +// Advance the iterator to the next point func (pi *PointIterator) Next() { valueBuffer := proto.NewBuffer(nil) pi.valid = false @@ -127,6 +136,30 @@ func (pi *PointIterator) Next() { pi.point.SequenceNumber = proto.Uint64(next.sequence) } +// Returns true if the iterator is pointing at a valid +// location. Behavior of Point() is undefined if Valid() is false. +func (pi *PointIterator) Valid() bool { + return pi.valid +} + +// Returns the point that the iterator is pointing to. +func (pi *PointIterator) Point() *protocol.Point { return pi.point } + +// Returns an error if the iterator became invalid due to an error as +// opposed to reaching the end time. +func (pi *PointIterator) Error() error { return pi.err } + +// Close the iterator and free any resources used by the +// iterator. Behavior of the iterator is undefined if the iterator is +// used after it was closed. +func (pi *PointIterator) Close() { + for _, itr := range pi.itrs { + itr.Close() + } +} + +// private api + func (pi *PointIterator) getIteratorNextValue() error { for i, it := range pi.itrs { if pi.rawColumnValues[i].value != nil { @@ -165,21 +198,7 @@ func (pi *PointIterator) getIteratorNextValue() error { return nil } -func (pi *PointIterator) Valid() bool { - return pi.valid -} - func (pi *PointIterator) setError(err error) { pi.err = err pi.valid = false } - -func (pi *PointIterator) Point() *protocol.Point { return pi.point } - -func (pi *PointIterator) Error() error { return pi.err } - -func (pi *PointIterator) Close() { - for _, itr := range pi.itrs { - itr.Close() - } -} diff --git a/engine/common_merge_engine.go b/engine/common_merge_engine.go index 4434dcb110d..44d0da8802a 100644 --- a/engine/common_merge_engine.go +++ b/engine/common_merge_engine.go @@ -2,224 +2,48 @@ package engine import "github.com/influxdb/influxdb/protocol" -type seriesMergeState struct { - name string - series []*protocol.Series - fields []string - done bool -} - -func (self *seriesMergeState) hasPoints() bool { - return len(self.series) > 0 && len(self.series[0].Points) > 0 -} - -func isEarlier(first *seriesMergeState, other *seriesMergeState) bool { - return *first.series[0].Points[0].Timestamp < *other.series[0].Points[0].Timestamp -} - -func isLater(first *seriesMergeState, other *seriesMergeState) bool { - return *first.series[0].Points[0].Timestamp > *other.series[0].Points[0].Timestamp -} - -func (self *seriesMergeState) flush(state *CommonMergeEngine) (bool, error) { - for _, s := range self.series { - s := state.mergeColumnsInSeries(s) - ok, err := state.next.Yield(s) - if !ok || err != nil { - return ok, err - } - } - self.series = nil - return true, nil -} - -// update the state, the points belong to this seriesMergeState (i.e. the name of the timeseries matches) -func (self *seriesMergeState) updateState(p *protocol.Series) { - if *p.Name != self.name { - return - } - - // setup the fields - if self.fields == nil { - self.fields = p.Fields - } - - // data for current table is exhausted - if len(p.Points) == 0 { - self.done = true - } else { - self.series = append(self.series, p) - } -} - type CommonMergeEngine struct { - next Processor - leftGoFirst func(_, _ *seriesMergeState) bool - fields map[string]int - left *seriesMergeState - right *seriesMergeState - mergeColumns bool -} - -// set the fields of the other time series to null making sure that -// the order of the null values match the order of the field -// definitions, i.e. left timeseries first followed by values from the -// right timeseries -func (self *CommonMergeEngine) mergeColumnsInSeries(s *protocol.Series) *protocol.Series { - if !self.mergeColumns { - return s - } - - newSeries := &protocol.Series{ - Name: s.Name, - Fields: self.getFields(), - Points: make([]*protocol.Point, len(s.Points)), - } - - for idx, p := range s.Points { - newPoint := &protocol.Point{ - Timestamp: p.Timestamp, - SequenceNumber: p.SequenceNumber, - } - newValues := make([]*protocol.FieldValue, len(self.fields)) - oldValues := p.Values - for idx, f := range s.Fields { - newIdx := self.fields[f] - newValues[newIdx] = oldValues[idx] - } - newPoint.Values = newValues - newSeries.Points[idx] = newPoint - } - return newSeries -} - -// set the fields of the other time series to null making sure that -// the order of the null values match the order of the field -// definitions, i.e. left timeseries first followed by values from the -// right timeseries -func (self *CommonMergeEngine) getFields() []string { - fields := make([]string, len(self.fields)) - for f, i := range self.fields { - fields[i] = f - } - return fields -} - -func (self *CommonMergeEngine) yieldNextPoints() (bool, error) { - // see which point should be returned next and remove it from the - // series - for self.left.hasPoints() && self.right.hasPoints() { - // state is the state of the series from which the next point - // will be fetched - next := self.right - if self.leftGoFirst(self.left, self.right) { - next = self.left - } - - s := next.removeAndGetFirstPoint() - ok, err := self.next.Yield(self.mergeColumnsInSeries(s)) - if err != nil || !ok { - return ok, err - } - } - return true, nil -} - -// if `other` state is done (i.e. we'll receive no more points for its -// timeseries) then we know that we won't get any points that are -// older than what's in `self` so we can safely flush all `self` -// points. -func (self *CommonMergeEngine) flushIfNecessary() (bool, error) { - if self.left.done && len(self.left.series) == 0 { - if ok, err := self.right.flush(self); err != nil || !ok { - return ok, err - } - } - if self.right.done && len(self.right.series) == 0 { - if ok, err := self.left.flush(self); err != nil || !ok { - return ok, err - } - } - return true, nil -} - -func (self *CommonMergeEngine) updateState(p *protocol.Series) { - self.left.updateState(p) - self.right.updateState(p) - - // create the fields map - if self.fields == nil && self.left.fields != nil && self.right.fields != nil { - self.fields = make(map[string]int) - - i := 0 - for _, s := range []*seriesMergeState{self.left, self.right} { - for _, f := range s.fields { - if _, ok := self.fields[f]; ok { - continue - } - self.fields[f] = i - i++ - } - } - } -} - -func (self *seriesMergeState) removeAndGetFirstPoint() *protocol.Series { - s := &protocol.Series{ - Name: self.series[0].Name, - Fields: self.series[0].Fields, - Points: []*protocol.Point{self.series[0].Points[0]}, - } - // get rid of that point, or get rid of the entire series - // if this is the last point - if len(self.series[0].Points) == 1 { - self.series = self.series[1:] - } else { - self.series[0].Points = self.series[0].Points[1:] - } - return s + merger *Merger + streams map[string]StreamUpdate + next Processor } // returns a yield function that will sort points from table1 and // table2 no matter what the order in which they are received. -func NewCommonMergeEngine(table1, table2 string, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine { - state1 := &seriesMergeState{ - name: table1, +func NewCommonMergeEngine(tables []string, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine { + cme := &CommonMergeEngine{ + streams: make(map[string]StreamUpdate, len(tables)), + next: next, } - state2 := &seriesMergeState{ - name: table2, + streams := make([]StreamQuery, len(tables)) + for i, t := range tables { + s := NewStream() + streams[i] = s + cme.streams[t] = s } + h := &SeriesHeap{Ascending: ascending} + cme.merger = NewCME("Engine", streams, h, next, mergeColumns) + return cme +} - whoGoFirst := isEarlier - if !ascending { - whoGoFirst = isLater +func (cme *CommonMergeEngine) Close() error { + for _, s := range cme.streams { + s.Close() } - return &CommonMergeEngine{ - next: next, - left: state1, - right: state2, - leftGoFirst: whoGoFirst, - mergeColumns: mergeColumns, + _, err := cme.merger.Update() + if err != nil { + return err } + return cme.next.Close() } -func (e *CommonMergeEngine) Close() error { - e.Yield(&protocol.Series{Name: &e.left.name, Fields: []string{}}) - e.Yield(&protocol.Series{Name: &e.right.name, Fields: []string{}}) - return e.next.Close() +func (cme *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) { + stream := cme.streams[*s.Name] + stream.Yield(s) + return cme.merger.Update() } -func (e *CommonMergeEngine) Name() string { +func (cme *CommonMergeEngine) Name() string { return "CommonMergeEngine" } - -func (e *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) { - e.updateState(s) - - if ok, err := e.flushIfNecessary(); !ok || err != nil { - return ok, err - } - - return e.yieldNextPoints() -} diff --git a/engine/engine.go b/engine/engine.go index c2227f4b246..3158fa9b03a 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -15,10 +15,18 @@ func NewQueryEngine(next Processor, query *parser.SelectQuery) (Processor, error } fromClause := query.GetFromClause() - if fromClause.Type == parser.FromClauseMerge { - engine = NewMergeEngine(fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name, query.Ascending, engine) - } else if fromClause.Type == parser.FromClauseInnerJoin { + + switch fromClause.Type { + case parser.FromClauseInnerJoin: engine = NewJoinEngine(query, engine) + case parser.FromClauseMerge: + tables := make([]string, len(fromClause.Names)) + for i, name := range fromClause.Names { + tables[i] = name.Name.Name + } + engine = NewMergeEngine(tables, query.Ascending, engine) + case parser.FromClauseMergeFun: + panic("QueryEngine cannot be called with merge function") } if err != nil { diff --git a/engine/join_engine.go b/engine/join_engine.go index 8c0b27cc3c6..4953f183906 100644 --- a/engine/join_engine.go +++ b/engine/join_engine.go @@ -26,7 +26,7 @@ func NewJoinEngine(query *parser.SelectQuery, next Processor) Processor { table2: table2, query: query, } - mergeEngine := NewCommonMergeEngine(table1, table2, false, query.Ascending, joinEngine) + mergeEngine := NewCommonMergeEngine([]string{table1, table2}, false, query.Ascending, joinEngine) return mergeEngine } diff --git a/engine/merge_engine.go b/engine/merge_engine.go index bb748ac326b..33cf903dc4c 100644 --- a/engine/merge_engine.go +++ b/engine/merge_engine.go @@ -7,12 +7,12 @@ type MergeEngine struct { next Processor } -func NewMergeEngine(table1, table2 string, ascending bool, next Processor) Processor { - name := table1 + "_merge_" + table2 +func NewMergeEngine(tables []string, ascending bool, next Processor) Processor { + name := "merged" me := &MergeEngine{name: name, next: next} - return NewCommonMergeEngine(table1, table2, true, ascending, me) + return NewCommonMergeEngine(tables, true, ascending, me) } func (me *MergeEngine) Yield(s *protocol.Series) (bool, error) { diff --git a/engine/merger.go b/engine/merger.go new file mode 100644 index 00000000000..29e9b36aa80 --- /dev/null +++ b/engine/merger.go @@ -0,0 +1,191 @@ +package engine + +import ( + "code.google.com/p/log4go" + "github.com/influxdb/influxdb/protocol" +) + +// Merger merges a number of StreamQuery into one stream of points +// where the output stream of points has a monotic timestamp order +// (increasing or decreasing depending on the SeriesHeap that is +// passed to NewCME) +type Merger struct { + name string + s []StreamQuery + size int + h *SeriesHeap + n Processor + lastStreamIdx int + initializing bool + mergeColumns bool + fields map[string]struct{} + resultFields []string + resultFieldsPerStream map[int][]int +} + +// Creates a new merger that will merge the given slice of StreamQuery +// and yields the result to the processor `n`. `name` is used to +// identify the merger in the logs since it's being used in multiple +// places. The SeriesHeap `h` is used to keep track of the next point +// (whether it's the smallest or largest timestamp depends on the +// SeriesHeap). If `mergeColumns` is true, the resulting time series +// will have the fields from all StreamQueries, i.e. if the first +// stream yields `column0` and `column1` while the second stream +// yields `column2` and `column3` then the result time series will +// have all 4 columns with two columns set to `nil` depending on which +// side the point came from. +func NewCME(name string, s []StreamQuery, h *SeriesHeap, n Processor, mergeColumns bool) *Merger { + log4go.Debug("%sMerger: created with %d streams", name, len(s)) + return &Merger{ + name: name, + s: s, + h: h, + n: n, + lastStreamIdx: 0, + mergeColumns: mergeColumns, + initializing: true, + fields: make(map[string]struct{}), + resultFieldsPerStream: make(map[int][]int), + } +} + +// public api + +// Consume and yield as many points as we can until one of the streams +// runs out of points (i.e. the call to HasPoint() returns false) +func (cme *Merger) Update() (bool, error) { + if cme.initializing { + return cme.initialize() + } + return cme.tryYieldNextPoint() +} + +// private api + +// initialize the state of the Merger by reading one point from each +// StreamQuery. if `mergeColumns` is true the union of the fields +// received on the first point from each StreamQuery will be the +// fields of the result Series +func (cme *Merger) initialize() (bool, error) { + for cme.h.Size() != len(cme.s) { + stream := cme.s[cme.lastStreamIdx] + if !stream.HasPoint() && !stream.Closed() { + log4go.Debug("%sMerger: data not ready for stream %d, still initializing", cme.name, cme.lastStreamIdx) + return true, nil + } + + if stream.HasPoint() { + p := stream.Next() + cme.h.Add(cme.lastStreamIdx, p) + for _, f := range p.Fields { + cme.fields[f] = struct{}{} + } + cme.lastStreamIdx++ + } else if stream.Closed() { + s := len(cme.s) + cme.s[cme.lastStreamIdx] = cme.s[s-1] + cme.s = cme.s[:s-1] + } + + } + + if cme.mergeColumns { + // finished initialization + cme.resultFields = make([]string, 0, len(cme.fields)) + for f := range cme.fields { + cme.resultFields = append(cme.resultFields, f) + } + } + + log4go.Debug("%sMerger initialization finished", cme.name) + cme.initializing = false + cme.size = len(cme.s) + return cme.yieldNextPoint() +} + +// Try to get more data points from the StreamQueries and if +// successful call yieldNextPoint() +func (cme *Merger) tryYieldNextPoint() (bool, error) { + stream := cme.s[cme.lastStreamIdx] + // If the stream has new points, added to the heap + if stream.HasPoint() { + cme.h.Add(cme.lastStreamIdx, stream.Next()) + } else if stream.Closed() { + cme.size-- + } + + // If all streams have yielded one point. Then we can get the next + // point with the smallest (or largest) timestamp and yield it to the + // next processor. + if cme.h.Size() != cme.size { + return true, nil + } + + return cme.yieldNextPoint() +} + +// yield as many points as we can to the Processor `n` +func (cme *Merger) yieldNextPoint() (bool, error) { + for { + var s *protocol.Series + cme.lastStreamIdx, s = cme.h.Next() + log4go.Debug("cme.lastStreamIdx: %d, s: %s", cme.lastStreamIdx, s) + cme.fixFields(s) + log4go.Debug("%sMerger yielding to %s: %s", cme.name, cme.n.Name(), s) + ok, err := cme.n.Yield(s) + if !ok || err != nil { + return ok, err + } + + stream := cme.s[cme.lastStreamIdx] + if stream.HasPoint() { + s := stream.Next() + log4go.Debug("%sMerger received %s from %d", s, cme.lastStreamIdx) + cme.h.Add(cme.lastStreamIdx, s) + continue + } else if stream.Closed() { + cme.size-- + if cme.size != 0 { + continue + } + } + + return true, nil + } +} + +// modify the series to have the union of the columns from all +// StreamQueries +func (cme *Merger) fixFields(s *protocol.Series) { + if !cme.mergeColumns { + return + } + + idx := cme.lastStreamIdx + mapping := cme.resultFieldsPerStream[idx] + if mapping == nil { + for _, f := range cme.resultFields { + index := -1 + for i, sf := range s.Fields { + if sf == f { + index = i + break + } + } + mapping = append(mapping, index) + cme.resultFieldsPerStream[idx] = mapping + } + } + + s.Fields = cme.resultFields + p := s.Points[0] + originalValues := p.Values + p.Values = nil + for _, i := range mapping { + if i == -1 { + p.Values = append(p.Values, nil) + continue + } + p.Values = append(p.Values, originalValues[i]) + } +} diff --git a/engine/series_heap.go b/engine/series_heap.go new file mode 100644 index 00000000000..627df759fc5 --- /dev/null +++ b/engine/series_heap.go @@ -0,0 +1,79 @@ +package engine + +import ( + "sort" + + "github.com/influxdb/influxdb/common/heap" + "github.com/influxdb/influxdb/protocol" +) + +type Value struct { + streamId int + s *protocol.Series +} + +type ValueSlice []Value + +func (vs ValueSlice) Len() int { + return len(vs) +} + +func (vs ValueSlice) Less(i, j int) bool { + return vs[i].s.Points[0].GetTimestamp() < vs[j].s.Points[0].GetTimestamp() +} + +func (vs ValueSlice) Swap(i, j int) { + vs[i], vs[j] = vs[j], vs[i] +} + +// A heap that keeps holds one point series' (series that have one +// point only) and their stream ids. See http://en.wikipedia.org/wiki/Heap_(data_structure) +// for more info on heaps. The heap is used by the Merger to emit +// points from multiple streams in monotic order. +type SeriesHeap struct { + Ascending bool + values []Value +} + +// returns the number of values in the heap so far +func (sh *SeriesHeap) Size() int { + return len(sh.values) +} + +// Add another one point series with the given stream id. TODO: This +// is slightly inefficient way to construct the initial value slice, +// if we had a value slice we can construct the heap in O(n) instead +// of O(n logn) which is required if we construct the heap using +// multiple calls to Add() +func (sh *SeriesHeap) Add(streamId int, s *protocol.Series) { + sh.values = append(sh.values, Value{ + s: s, + streamId: streamId, + }) + l := sh.Size() + if sh.Ascending { + heap.BubbleUp(ValueSlice(sh.values), l-1) + } else { + heap.BubbleUp(sort.Reverse(ValueSlice(sh.values)), l-1) + } +} + +// Get and remove the next one point series that has smallest (or +// largest) timestmap, according to the Ascending field. TODO: This is +// slightly inefficient since we remove a value from the values slice +// and do a BubbleDown which is inefficient since the next value from +// the stream will be added immediately after and will cause a +// BubbleUp. In big O() notation this step doesn't change much, it +// only adds a contant to the upper bound. +func (sh *SeriesHeap) Next() (int, *protocol.Series) { + idx := 0 + s := sh.Size() + v := sh.values[idx] + sh.values, sh.values[0] = sh.values[:s-1], sh.values[s-1] + if sh.Ascending { + heap.BubbleDown(ValueSlice(sh.values), 0) + } else { + heap.BubbleDown(sort.Reverse(ValueSlice(sh.values)), 0) + } + return v.streamId, v.s +} diff --git a/engine/stream.go b/engine/stream.go new file mode 100644 index 00000000000..4154d215eba --- /dev/null +++ b/engine/stream.go @@ -0,0 +1,68 @@ +package engine + +import "github.com/influxdb/influxdb/protocol" + +// See documentation of Stream +type StreamQuery interface { + // interface that query the state + HasPoint() bool + // Returns a series with one point only + Next() *protocol.Series + Closed() bool +} + +// See documentation of Stream +type StreamUpdate interface { + // interface that updates the state + Close() + Yield(*protocol.Series) +} + +// A stream keep track of the state of a stream of points. A stream is +// defined as an ordered stream of points that have the same set of +// fields. For simplicity this will be the stream of points for one +// time series. This will be extended to include stream of points from +// multiple series that are merged at the shard level and multiple +// streams are merged together again at the coordinator level. +type Stream struct { + s *protocol.Series + closed bool +} + +func NewStream() *Stream { + return &Stream{closed: false} +} + +func (stream *Stream) Yield(s *protocol.Series) { + if stream.s != nil { + stream.s.Points = append(stream.s.Points, s.Points...) + return + } + stream.s = s +} + +func (stream *Stream) Close() { + stream.closed = true +} + +func (stream *Stream) HasPoint() bool { + return stream.s != nil && len(stream.s.Points) > 0 +} + +func (stream *Stream) Next() *protocol.Series { + var p []*protocol.Point + p, stream.s.Points = stream.s.Points[:1:1], stream.s.Points[1:] + s := &protocol.Series{ + Name: stream.s.Name, + Fields: stream.s.Fields, + Points: p, + } + if len(stream.s.Points) == 0 { + stream.s = nil + } + return s +} + +func (stream *Stream) Closed() bool { + return stream.closed +} diff --git a/integration/data_test.go b/integration/data_test.go index 8c7a32a3460..2a3c5a86f9f 100644 --- a/integration/data_test.go +++ b/integration/data_test.go @@ -267,6 +267,53 @@ func (self *DataTestSuite) TestModeWithNils(c *C) { c.Assert(maps[0]["m2"], Equals, nil) } +func (self *DataTestSuite) TestMergeRegex(c *C) { + data := ` +[ + { + "name": "test_merge_1", + "columns": ["time", "value"], + "points": [ + [1401321600000, "m11"], + [1401321800000, "m12"] + ] + }, + { + "name": "test_merge_2", + "columns": ["time", "value"], + "points": [ + [1401321700000, "m21"], + [1401321900000, "m22"] + ] + }, + { + "name": "test_merge_3", + "columns": ["time", "value"], + "points": [ + [1401321500000, "m31"], + [1401322000000, "m32"] + ] + } + ]` + self.client.WriteJsonData(data, c, influxdb.Millisecond) + serieses := self.client.RunQuery("select * from merge(/.*/)", c, "m") + c.Assert(serieses, HasLen, 1) + maps := ToMap(serieses[0]) + c.Assert(maps, HasLen, 6) + t := make([]float64, len(maps)) + for i, m := range maps { + t[i] = m["time"].(float64) + } + c.Assert(t, DeepEquals, []float64{ + 1401322000000, + 1401321900000, + 1401321800000, + 1401321700000, + 1401321600000, + 1401321500000, + }) +} + func (self *DataTestSuite) TestMergingOldData(c *C) { data := ` [ diff --git a/parser/frees.c b/parser/frees.c index be992917a2a..51ca0a70559 100644 --- a/parser/frees.c +++ b/parser/frees.c @@ -29,7 +29,10 @@ free_table_name_array(table_name_array *array) void free_from_clause(from_clause *f) { - free_table_name_array(f->names); + if (f->names) + free_table_name_array(f->names); + if (f->regex_value) + free_value(f->regex_value); free(f); } diff --git a/parser/from_clause.go b/parser/from_clause.go index 41c88507b08..c058abf7cbc 100644 --- a/parser/from_clause.go +++ b/parser/from_clause.go @@ -5,6 +5,7 @@ package parser import "C" import ( "bytes" + "regexp" "strings" ) import "fmt" @@ -15,6 +16,7 @@ const ( FromClauseArray FromClauseType = C.FROM_ARRAY FromClauseMerge FromClauseType = C.FROM_MERGE FromClauseInnerJoin FromClauseType = C.FROM_INNER_JOIN + FromClauseMergeFun FromClauseType = C.FROM_MERGE_FUNCTION ) func (self *TableName) GetAlias() string { @@ -39,6 +41,7 @@ type TableName struct { type FromClause struct { Type FromClauseType Names []*TableName + Regex *regexp.Regexp } func (self *FromClause) GetString() string { diff --git a/parser/parser.go b/parser/parser.go index 41761e88d80..200b9da9e5c 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -468,11 +468,28 @@ func GetTableNameArray(array *C.table_name_array) ([]*TableName, error) { } func GetFromClause(fromClause *C.from_clause) (*FromClause, error) { - arr, err := GetTableNameArray(fromClause.names) - if err != nil { - return nil, err + t := FromClauseType(fromClause.from_clause_type) + var arr []*TableName + var regex *regexp.Regexp + + switch t { + case FromClauseMergeFun: + val, err := GetValue(fromClause.regex_value) + if err != nil { + return nil, err + } + if val.Type != ValueRegex { + return nil, fmt.Errorf("merge() accepts regex only") + } + regex = val.compiledRegex + default: + var err error + arr, err = GetTableNameArray(fromClause.names) + if err != nil { + return nil, err + } } - return &FromClause{FromClauseType(fromClause.from_clause_type), arr}, nil + return &FromClause{t, arr, regex}, nil } func GetIntoClause(intoClause *C.into_clause) (*IntoClause, error) { diff --git a/parser/parser_test.go b/parser/parser_test.go index 6086bc160ac..adf72016b5c 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -262,6 +262,14 @@ func (self *QueryParserSuite) TestParseFromWithMergedTable(c *C) { c.Assert(fromClause.Names[1].Name.Name, Equals, "user.signups") } +func (self *QueryParserSuite) TestParseFromWithMergeRegex(c *C) { + q, err := ParseSelectQuery("select count(*) from merge(/.*/) where time>now()-1d;") + c.Assert(err, IsNil) + fromClause := q.GetFromClause() + c.Assert(fromClause.Type, Equals, FromClauseMergeFun) + c.Assert(fromClause.Regex, NotNil) +} + func (self *QueryParserSuite) TestMultipleAggregateFunctions(c *C) { q, err := ParseSelectQuery("select first(bar), last(bar) from foo") c.Assert(err, IsNil) diff --git a/parser/query.yacc b/parser/query.yacc index 6e149b55734..fe2a086ce77 100644 --- a/parser/query.yacc +++ b/parser/query.yacc @@ -379,7 +379,7 @@ ALIAS_CLAUSE: FROM_CLAUSE: FROM TABLE_VALUE { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); $$->names = malloc(sizeof(table_name_array)); $$->names->elems = malloc(sizeof(table_name*)); $$->names->size = 1; @@ -391,14 +391,14 @@ FROM_CLAUSE: | FROM SIMPLE_TABLE_VALUES { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); $$->names = $2; $$->from_clause_type = FROM_ARRAY; } | FROM SIMPLE_TABLE_VALUE { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); $$->names = malloc(sizeof(table_name_array)); $$->names->elems = malloc(sizeof(table_name*)); $$->names->size = 1; @@ -410,7 +410,7 @@ FROM_CLAUSE: | FROM SIMPLE_TABLE_VALUE MERGE SIMPLE_TABLE_VALUE { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); $$->names = malloc(sizeof(table_name_array)); $$->names->elems = malloc(2 * sizeof(table_name*)); $$->names->size = 2; @@ -423,9 +423,17 @@ FROM_CLAUSE: $$->from_clause_type = FROM_MERGE; } | + FROM MERGE '(' REGEX_VALUE ')' + { + $$ = calloc(1, sizeof(from_clause)); + $$->from_clause_type = FROM_MERGE_FUNCTION; + $$->regex_value = $4; + } + | FROM SIMPLE_TABLE_VALUE ALIAS_CLAUSE INNER JOIN SIMPLE_TABLE_VALUE ALIAS_CLAUSE { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); + $$->regex_value = NULL; $$->names = malloc(sizeof(table_name_array)); $$->names->elems = malloc(2 * sizeof(value*)); $$->names->size = 2; diff --git a/parser/query_types.h b/parser/query_types.h index adc15f10c50..85a5eb03b00 100644 --- a/parser/query_types.h +++ b/parser/query_types.h @@ -72,11 +72,13 @@ typedef struct { enum { FROM_ARRAY, FROM_MERGE, - FROM_INNER_JOIN + FROM_INNER_JOIN, + FROM_MERGE_FUNCTION } from_clause_type; // in case of merge or join, it's guaranteed that the names array // will have two table names only and they aren't regex. table_name_array *names; + value *regex_value; /* regex merge */ } from_clause; typedef struct { diff --git a/parser/test_memory_leaks.sh b/parser/test_memory_leaks.sh index 58d6a2989b7..a3b87545726 100755 --- a/parser/test_memory_leaks.sh +++ b/parser/test_memory_leaks.sh @@ -14,6 +14,9 @@ int main(int argc, char **argv) { q = parse_query("select * from foo where time < -1s"); close_queries(&q); + q = parse_query("select * from merge(/.*/) where time < -1s"); + close_queries(&q); + // test partial regex q = parse_query("list series /"); close_queries(&q); From a9d92301f65dfc90246087f92e9b0998f32048e3 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Wed, 10 Sep 2014 13:23:45 -0400 Subject: [PATCH 2/7] More logging --- datastore/shard.go | 1 + datastore/storage_key.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/datastore/shard.go b/datastore/shard.go index fcfcf605977..5bdb212fd0a 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -344,6 +344,7 @@ func (self *Shard) getIterators(fields []*metastore.Field, start, end time.Time, tmicro := common.TimeToMicroseconds(t) sk := newStorageKey(field.Id, tmicro, seq) + log.Debug("Initializing iterator to %v", sk.bytes()) iterators[i].Seek(sk.bytes()) if !isAscendingQuery && iterators[i].Valid() { diff --git a/datastore/storage_key.go b/datastore/storage_key.go index 12c0e7c9970..f5232e9d357 100644 --- a/datastore/storage_key.go +++ b/datastore/storage_key.go @@ -7,6 +7,8 @@ import ( "math" "time" + "code.google.com/p/log4go" + "github.com/influxdb/influxdb/common" ) @@ -47,6 +49,7 @@ func parseKey(b []byte) (storageKey, error) { sk.timestamp = convertUintTimestampToInt64(t) binary.Read(buf, binary.BigEndian, &sk.seq) sk.bytesBuf = b + log4go.Debug("Parsed %v to %v", b, sk) return sk, nil } From 78b99f60c60b49962762946f99b35b3ff8bc7902 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Thu, 11 Sep 2014 17:33:21 -0400 Subject: [PATCH 3/7] Merge points at the shard level --- cluster/shard.go | 4 +- cluster/shard_id_inserter.go | 31 +++++ coordinator/coordinator.go | 6 +- datastore/point_iterator.go | 4 +- datastore/point_iterator_stream.go | 35 +++++ datastore/shard.go | 208 +++++++++++++++++++---------- engine/common_merge_engine.go | 14 +- engine/engine.go | 8 +- engine/join_engine.go | 4 +- engine/merge_engine.go | 4 +- engine/merger.go | 9 ++ protocol/protocol.proto | 1 + 12 files changed, 237 insertions(+), 91 deletions(-) create mode 100644 cluster/shard_id_inserter.go create mode 100644 datastore/point_iterator_stream.go diff --git a/cluster/shard.go b/cluster/shard.go index 6f75bf424cb..a4e7e76feb3 100644 --- a/cluster/shard.go +++ b/cluster/shard.go @@ -238,7 +238,7 @@ func (self *ShardData) getProcessor(querySpec *parser.QuerySpec, processor engin // We should aggregate at the shard level if self.ShouldAggregateLocally(querySpec) { log.Debug("creating a query engine") - processor, err = engine.NewQueryEngine(processor, query) + processor, err = engine.NewQueryEngine(processor, query, nil) if err != nil { return nil, err } @@ -300,6 +300,8 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan<- *p.Res var processor engine.Processor = NewResponseChannelProcessor(NewResponseChannelWrapper(response)) var err error + processor = NewShardIdInserterProcessor(self.Id(), processor) + processor, err = self.getProcessor(querySpec, processor) if err != nil { response <- &p.Response{ diff --git a/cluster/shard_id_inserter.go b/cluster/shard_id_inserter.go new file mode 100644 index 00000000000..b62c60afaa6 --- /dev/null +++ b/cluster/shard_id_inserter.go @@ -0,0 +1,31 @@ +package cluster + +import ( + "fmt" + + "github.com/influxdb/influxdb/engine" + "github.com/influxdb/influxdb/protocol" +) + +// A processor to set the ShardId on the series to `id` +type ShardIdInserterProcessor struct { + id uint32 + next engine.Processor +} + +func NewShardIdInserterProcessor(id uint32, next engine.Processor) ShardIdInserterProcessor { + return ShardIdInserterProcessor{id, next} +} + +func (sip ShardIdInserterProcessor) Yield(s *protocol.Series) (bool, error) { + s.ShardId = &sip.id + return sip.next.Yield(s) +} + +func (sip ShardIdInserterProcessor) Close() error { + return sip.next.Close() +} + +func (sip ShardIdInserterProcessor) Name() string { + return fmt.Sprintf("ShardIdInserterProcessor (%d)", sip.id) +} diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index e1cb755a0ef..5b6f77e3b23 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -237,7 +237,11 @@ func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writ if !shouldAggregateLocally { // if we should aggregate in the coordinator (i.e. aggregation // isn't happening locally at the shard level), create an engine - writer, err = engine.NewQueryEngine(writer, q) + shardIds := make([]uint32, len(shards)) + for i, s := range shards { + shardIds[i] = s.Id() + } + writer, err = engine.NewQueryEngine(writer, q, shardIds) return shards, writer, err } diff --git a/datastore/point_iterator.go b/datastore/point_iterator.go index 4fbd54a3816..4458b3c966d 100644 --- a/datastore/point_iterator.go +++ b/datastore/point_iterator.go @@ -30,7 +30,7 @@ type PointIterator struct { // there's no need to call Next() after the call to NewPointIterator, // but the user should check Valid() to make sure the iterator is // pointing at a valid point. -func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startTime, endTime time.Time, asc bool) PointIterator { +func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startTime, endTime time.Time, asc bool) *PointIterator { pi := PointIterator{ valid: true, err: nil, @@ -44,7 +44,7 @@ func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startT // seek to the first point pi.Next() - return pi + return &pi } // public api diff --git a/datastore/point_iterator_stream.go b/datastore/point_iterator_stream.go new file mode 100644 index 00000000000..16f94f4cf05 --- /dev/null +++ b/datastore/point_iterator_stream.go @@ -0,0 +1,35 @@ +package datastore + +import "github.com/influxdb/influxdb/protocol" + +// PointIteratorStream is a struct that implements the StreamQuery +// interface and is used by the shard with the Merger to merge the +// data points locally to form a monotic stream of points (increasing +// or decreasing timestamps) +type PointIteratorStream struct { + pi *PointIterator + name string + fields []string +} + +// Returns true if the point iterator is still valid +func (pis PointIteratorStream) HasPoint() bool { + return pis.pi.Valid() +} + +// Returns the next point from the point iterator +func (pis PointIteratorStream) Next() *protocol.Series { + p := pis.pi.Point() + s := &protocol.Series{ + Name: &pis.name, + Fields: pis.fields, + Points: []*protocol.Point{p}, + } + pis.pi.Next() + return s +} + +// Returns true if the point iterator is not valid +func (pis PointIteratorStream) Closed() bool { + return !pis.pi.Valid() +} diff --git a/datastore/shard.go b/datastore/shard.go index 5bdb212fd0a..47b916bb7d1 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -102,12 +102,57 @@ func (self *Shard) Query(querySpec *parser.QuerySpec, processor engine.Processor return self.executeDeleteQuery(querySpec, processor) } - seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() - if !self.hasReadAccess(querySpec) { return errors.New("User does not have access to one or more of the series requested.") } + switch t := querySpec.SelectQuery().FromClause.Type; t { + case parser.FromClauseArray: + log.Debug("Shard %s: running a regular query", self.db.Path()) + return self.executeArrayQuery(querySpec, processor) + case parser.FromClauseMerge, parser.FromClauseInnerJoin: + log.Debug("Shard %s: running a merge query", self.db.Path()) + return self.executeMergeQuery(querySpec, processor, t) + default: + panic(fmt.Errorf("Unknown from clause type %s", t)) + } +} + +func (self *Shard) IsClosed() bool { + return self.closed +} + +func (self *Shard) executeMergeQuery(querySpec *parser.QuerySpec, processor engine.Processor, t parser.FromClauseType) error { + seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() + iterators := make([]*PointIterator, len(seriesAndColumns)) + streams := make([]engine.StreamQuery, len(iterators)) + i := 0 + var err error + for s, c := range seriesAndColumns { + c, iterators[i], err = self.getPointIteratorForSeries(querySpec, s.Name, c) + if err != nil { + log.Error(err) + return err + } + streams[i] = PointIteratorStream{ + pi: iterators[i], + name: s.Name, + fields: c, + } + i++ + } + + h := &engine.SeriesHeap{Ascending: querySpec.IsAscending()} + merger := engine.NewCME("Shard", streams, h, processor, t == parser.FromClauseMerge) + if _, err := merger.Update(); err != nil { + return err + } + return nil +} + +func (self *Shard) executeArrayQuery(querySpec *parser.QuerySpec, processor engine.Processor) error { + seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() + for series, columns := range seriesAndColumns { if regex, ok := series.GetCompiledRegex(); ok { seriesNames := self.metaStore.GetSeriesForDatabaseAndRegex(querySpec.Database(), regex) @@ -127,37 +172,27 @@ func (self *Shard) Query(querySpec *parser.QuerySpec, processor engine.Processor } } } - return nil -} -func (self *Shard) IsClosed() bool { - return self.closed + return nil } -func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName string, columns []string, processor engine.Processor) error { - fields, err := self.getFieldsForSeries(querySpec.Database(), seriesName, columns) +func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, name string, columns []string, processor engine.Processor) error { + if querySpec.IsSinglePointQuery() { + log.Debug("Running single query for series %s", name) + return self.executeSinglePointQuery(querySpec, name, columns, processor) + } + var pi *PointIterator + var err error + columns, pi, err = self.getPointIteratorForSeries(querySpec, name, columns) if err != nil { - log.Error("Error looking up fields for %s: %s", seriesName, err) return err } - - if querySpec.IsSinglePointQuery() { - log.Debug("Running single query for series %s, fields %v", seriesName, fields) - return self.executeSinglePointQuery(querySpec, seriesName, fields, processor) - } - - startTime := querySpec.GetStartTime() - endTime := querySpec.GetEndTime() + defer pi.Close() query := querySpec.SelectQuery() + aliases := query.GetTableAliases(name) - aliases := query.GetTableAliases(seriesName) - - fieldNames, iterators := self.getIterators(fields, startTime, endTime, query.Ascending) - seriesOutgoing := &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} - pi := NewPointIterator(iterators, fields, querySpec.GetStartTime(), querySpec.GetEndTime(), query.Ascending) - defer pi.Close() - + seriesOutgoing := &protocol.Series{Name: protocol.String(name), Fields: columns, Points: make([]*protocol.Point, 0, self.pointBatchSize)} for pi.Valid() { p := pi.Point() seriesOutgoing.Points = append(seriesOutgoing.Points, p) @@ -170,7 +205,7 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName return err } } - seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} + seriesOutgoing = &protocol.Series{Name: protocol.String(name), Fields: columns, Points: make([]*protocol.Point, 0, self.pointBatchSize)} } pi.Next() @@ -193,6 +228,79 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName return nil } +func (self *Shard) executeSinglePointQuery(querySpec *parser.QuerySpec, name string, columns []string, p engine.Processor) error { + fields, err := self.getFieldsForSeries(querySpec.Database(), name, columns) + if err != nil { + log.Error("Error looking up fields for %s: %s", name, err) + return err + } + + query := querySpec.SelectQuery() + fieldCount := len(fields) + fieldNames := make([]string, 0, fieldCount) + point := &protocol.Point{Values: make([]*protocol.FieldValue, 0, fieldCount)} + timestamp := common.TimeToMicroseconds(query.GetStartTime()) + sequenceNumber, err := query.GetSinglePointQuerySequenceNumber() + if err != nil { + return err + } + + // set the timestamp and sequence number + point.SequenceNumber = &sequenceNumber + point.SetTimestampInMicroseconds(timestamp) + + for _, field := range fields { + sk := newStorageKey(field.Id, timestamp, sequenceNumber) + data, err := self.db.Get(sk.bytes()) + if err != nil { + return err + } + + if data == nil { + continue + } + + fieldValue := &protocol.FieldValue{} + err = proto.Unmarshal(data, fieldValue) + if err != nil { + return err + } + fieldNames = append(fieldNames, field.Name) + point.Values = append(point.Values, fieldValue) + } + + result := &protocol.Series{Name: &name, Fields: fieldNames, Points: []*protocol.Point{point}} + + if len(result.Points) > 0 { + _, err := p.Yield(result) + return err + } + return nil +} + +func (self *Shard) getPointIteratorForSeries(querySpec *parser.QuerySpec, name string, columns []string) ([]string, *PointIterator, error) { + fields, err := self.getFieldsForSeries(querySpec.Database(), name, columns) + if err != nil { + log.Error("Error looking up fields for %s: %s", name, err) + return nil, nil, err + } + + startTime := querySpec.GetStartTime() + endTime := querySpec.GetEndTime() + + query := querySpec.SelectQuery() + + iterators := self.getIterators(fields, startTime, endTime, query.Ascending) + pi := NewPointIterator(iterators, fields, querySpec.GetStartTime(), querySpec.GetEndTime(), query.Ascending) + + columns = make([]string, len(fields)) + for i := range fields { + columns[i] = fields[i].Name + } + + return columns, pi, nil +} + func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor engine.Processor) error { query := querySpec.DeleteQuery() series := query.GetFromClause() @@ -282,57 +390,11 @@ func (self *Shard) close() { self.db = nil } -func (self *Shard) executeSinglePointQuery(querySpec *parser.QuerySpec, series string, fields []*metastore.Field, p engine.Processor) error { - query := querySpec.SelectQuery() - fieldCount := len(fields) - fieldNames := make([]string, 0, fieldCount) - point := &protocol.Point{Values: make([]*protocol.FieldValue, 0, fieldCount)} - timestamp := common.TimeToMicroseconds(query.GetStartTime()) - sequenceNumber, err := query.GetSinglePointQuerySequenceNumber() - if err != nil { - return err - } - - // set the timestamp and sequence number - point.SequenceNumber = &sequenceNumber - point.SetTimestampInMicroseconds(timestamp) - - for _, field := range fields { - sk := newStorageKey(field.Id, timestamp, sequenceNumber) - data, err := self.db.Get(sk.bytes()) - if err != nil { - return err - } - - if data == nil { - continue - } - - fieldValue := &protocol.FieldValue{} - err = proto.Unmarshal(data, fieldValue) - if err != nil { - return err - } - fieldNames = append(fieldNames, field.Name) - point.Values = append(point.Values, fieldValue) - } - - result := &protocol.Series{Name: &series, Fields: fieldNames, Points: []*protocol.Point{point}} - - if len(result.Points) > 0 { - _, err := p.Yield(result) - return err - } - return nil -} - -func (self *Shard) getIterators(fields []*metastore.Field, start, end time.Time, isAscendingQuery bool) (fieldNames []string, iterators []storage.Iterator) { +func (self *Shard) getIterators(fields []*metastore.Field, start, end time.Time, isAscendingQuery bool) (iterators []storage.Iterator) { iterators = make([]storage.Iterator, len(fields)) - fieldNames = make([]string, len(fields)) // start the iterators to go through the series data for i, field := range fields { - fieldNames[i] = field.Name iterators[i] = self.db.Iterator() t := start @@ -353,7 +415,7 @@ func (self *Shard) getIterators(fields []*metastore.Field, start, end time.Time, if err := iterators[i].Error(); err != nil { log.Error("Error while getting iterators: %s", err) - return nil, nil + return nil } } return diff --git a/engine/common_merge_engine.go b/engine/common_merge_engine.go index 44d0da8802a..93051f6aed8 100644 --- a/engine/common_merge_engine.go +++ b/engine/common_merge_engine.go @@ -4,22 +4,22 @@ import "github.com/influxdb/influxdb/protocol" type CommonMergeEngine struct { merger *Merger - streams map[string]StreamUpdate + streams map[uint32]StreamUpdate next Processor } // returns a yield function that will sort points from table1 and // table2 no matter what the order in which they are received. -func NewCommonMergeEngine(tables []string, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine { +func NewCommonMergeEngine(shards []uint32, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine { cme := &CommonMergeEngine{ - streams: make(map[string]StreamUpdate, len(tables)), + streams: make(map[uint32]StreamUpdate, len(shards)), next: next, } - streams := make([]StreamQuery, len(tables)) - for i, t := range tables { + streams := make([]StreamQuery, len(shards)) + for i, sh := range shards { s := NewStream() streams[i] = s - cme.streams[t] = s + cme.streams[sh] = s } h := &SeriesHeap{Ascending: ascending} cme.merger = NewCME("Engine", streams, h, next, mergeColumns) @@ -39,7 +39,7 @@ func (cme *CommonMergeEngine) Close() error { } func (cme *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) { - stream := cme.streams[*s.Name] + stream := cme.streams[s.GetShardId()] stream.Yield(s) return cme.merger.Update() } diff --git a/engine/engine.go b/engine/engine.go index 3158fa9b03a..d46f822bb65 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -2,7 +2,7 @@ package engine import "github.com/influxdb/influxdb/parser" -func NewQueryEngine(next Processor, query *parser.SelectQuery) (Processor, error) { +func NewQueryEngine(next Processor, query *parser.SelectQuery, shards []uint32) (Processor, error) { limit := query.Limit var engine Processor = NewPassthroughEngineWithLimit(next, 1, limit) @@ -18,14 +18,16 @@ func NewQueryEngine(next Processor, query *parser.SelectQuery) (Processor, error switch fromClause.Type { case parser.FromClauseInnerJoin: - engine = NewJoinEngine(query, engine) + engine = NewJoinEngine(shards, query, engine) case parser.FromClauseMerge: tables := make([]string, len(fromClause.Names)) for i, name := range fromClause.Names { tables[i] = name.Name.Name } - engine = NewMergeEngine(tables, query.Ascending, engine) + engine = NewMergeEngine(shards, query.Ascending, engine) case parser.FromClauseMergeFun: + // At this point the regex should be expanded to the list of + // tables that will be queries panic("QueryEngine cannot be called with merge function") } diff --git a/engine/join_engine.go b/engine/join_engine.go index 4953f183906..774308195f0 100644 --- a/engine/join_engine.go +++ b/engine/join_engine.go @@ -14,7 +14,7 @@ type JoinEngine struct { lastFields1, lastFields2 []string } -func NewJoinEngine(query *parser.SelectQuery, next Processor) Processor { +func NewJoinEngine(shards []uint32, query *parser.SelectQuery, next Processor) Processor { table1 := query.GetFromClause().Names[0].GetAlias() table2 := query.GetFromClause().Names[1].GetAlias() name := table1 + "_join_" + table2 @@ -26,7 +26,7 @@ func NewJoinEngine(query *parser.SelectQuery, next Processor) Processor { table2: table2, query: query, } - mergeEngine := NewCommonMergeEngine([]string{table1, table2}, false, query.Ascending, joinEngine) + mergeEngine := NewCommonMergeEngine(shards, false, query.Ascending, joinEngine) return mergeEngine } diff --git a/engine/merge_engine.go b/engine/merge_engine.go index 33cf903dc4c..1962d63830e 100644 --- a/engine/merge_engine.go +++ b/engine/merge_engine.go @@ -7,12 +7,12 @@ type MergeEngine struct { next Processor } -func NewMergeEngine(tables []string, ascending bool, next Processor) Processor { +func NewMergeEngine(shards []uint32, ascending bool, next Processor) Processor { name := "merged" me := &MergeEngine{name: name, next: next} - return NewCommonMergeEngine(tables, true, ascending, me) + return NewCommonMergeEngine(shards, true, ascending, me) } func (me *MergeEngine) Yield(s *protocol.Series) (bool, error) { diff --git a/engine/merger.go b/engine/merger.go index 29e9b36aa80..e8b78a763fc 100644 --- a/engine/merger.go +++ b/engine/merger.go @@ -126,6 +126,15 @@ func (cme *Merger) tryYieldNextPoint() (bool, error) { // yield as many points as we can to the Processor `n` func (cme *Merger) yieldNextPoint() (bool, error) { + // If we consumed all the input data points, return + // immediately. This can be the case for example if we finished + // initialization and the first call to yieldNextPoint() consumed + // all the data points. Without this check the call to the heap's + // Next() method will cause a panic + if cme.size == 0 { + return true, nil + } + for { var s *protocol.Series cme.lastStreamIdx, s = cme.h.Next() diff --git a/protocol/protocol.proto b/protocol/protocol.proto index 5ea92c19ec1..8c8de9d94ce 100644 --- a/protocol/protocol.proto +++ b/protocol/protocol.proto @@ -19,6 +19,7 @@ message Series { required string name = 2; repeated string fields = 3; repeated uint64 fieldIds = 4; + optional uint32 shard_id = 5; } message QueryResponseChunk { From 783967780488dbc23369359dea03ce1638593c74 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Thu, 11 Sep 2014 17:56:26 -0400 Subject: [PATCH 4/7] Return the merged data using the alias Since we return an error if the same series has more than one alias this fixes #270 --- datastore/shard.go | 6 +++++- integration/data_test.go | 18 ++++++++++++++++++ integration/legacy_data_test.go | 26 -------------------------- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/datastore/shard.go b/datastore/shard.go index 47b916bb7d1..6dc94a92f64 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -134,9 +134,13 @@ func (self *Shard) executeMergeQuery(querySpec *parser.QuerySpec, processor engi log.Error(err) return err } + aliases := querySpec.SelectQuery().GetTableAliases(s.Name) + if len(aliases) > 1 { + return fmt.Errorf("Cannot have the same table joined more than once") + } streams[i] = PointIteratorStream{ pi: iterators[i], - name: s.Name, + name: aliases[0], fields: c, } i++ diff --git a/integration/data_test.go b/integration/data_test.go index 2a3c5a86f9f..cdce02a9465 100644 --- a/integration/data_test.go +++ b/integration/data_test.go @@ -1182,6 +1182,24 @@ func (self *DataTestSuite) TestWhereConditionWithExpression(c *C) { c.Assert(maps[0]["b"], Equals, 1.0) } +func (self *DataTestSuite) JoinedWithSelf(c *C) (Fun, Fun) { + return func(client Client) { + client.WriteJsonData(` +[ + { + "name": "t", + "columns": ["time", "value"], + "points":[ + [1381346706000, 3], + [1381346701000, 1] + ] + } +]`, c, influxdb.Millisecond) + }, func(client Client) { + client.RunInvalidQuery("select * from t as foo inner join t as bar", c, "m") + } +} + // issue #740 and #781 func (self *DataTestSuite) TestJoiningDifferentFields(c *C) { // TODO: why do we get a different error if we remove all but the first values diff --git a/integration/legacy_data_test.go b/integration/legacy_data_test.go index 0eab7b0afd3..06f866ab977 100644 --- a/integration/legacy_data_test.go +++ b/integration/legacy_data_test.go @@ -1649,32 +1649,6 @@ func (self *DataTestSuite) QueryWithJoinedTablesWithWhereClause(c *C) (Fun, Fun) } } -func (self *DataTestSuite) JoinedWithSelf(c *C) (Fun, Fun) { - return func(client Client) { - createEngine(client, c, `[ - { - "points": [ - { "values": [{ "int64_value": 3 }], "timestamp": 1381346706000000 }, - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000 } - ], - "name": "t", - "fields": ["value"] - } - ]`) - }, func(client Client) { - runQuery(client, "select * from t as foo inner join t as bar", c, `[ - { - "points": [ - { "values": [{ "int64_value": 3 }, { "int64_value": 3 }], "timestamp": 1381346706000000 }, - { "values": [{ "int64_value": 1 }, { "int64_value": 1 }], "timestamp": 1381346701000000 } - ], - "name": "foo_join_bar", - "fields": ["foo.value", "bar.value"] - } - ]`) - } -} - func (self *DataTestSuite) QueryWithMergedTablesWithPointsAppend(c *C) (Fun, Fun) { return func(client Client) { createEngine(client, c, `[ From e8be7575c84a532a0a212fc1b2826d7bab5df1a3 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Thu, 9 Oct 2014 17:43:17 -0400 Subject: [PATCH 5/7] rename a test file --- integration/{test_missing_points.go => missing_points_test.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename integration/{test_missing_points.go => missing_points_test.go} (100%) diff --git a/integration/test_missing_points.go b/integration/missing_points_test.go similarity index 100% rename from integration/test_missing_points.go rename to integration/missing_points_test.go From 911e1b90556f3e92fefdf3b4a6e45dd773475904 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Fri, 10 Oct 2014 02:54:01 -0400 Subject: [PATCH 6/7] Use the standard heap implementation --- common/heap/heap.go | 58 ------------------------- common/heap/heap_test.go | 58 ------------------------- datastore/shard.go | 2 +- engine/common_merge_engine.go | 2 +- engine/merger.go | 4 +- engine/series_heap.go | 82 +++++++++++++++++++++++------------ 6 files changed, 58 insertions(+), 148 deletions(-) delete mode 100644 common/heap/heap.go delete mode 100644 common/heap/heap_test.go diff --git a/common/heap/heap.go b/common/heap/heap.go deleted file mode 100644 index c5f0e941594..00000000000 --- a/common/heap/heap.go +++ /dev/null @@ -1,58 +0,0 @@ -// Implement a heap structure by providing three methods, Initialize, -// BubbleDown and BubbleUp. Initialize is O(n), while both Bubble -// methods are O(log n). The methods take any type that implement the -// sort.Interface interface. You can get a reverse Heap (a Max heap) -// by using sort.Reverse() on a sort.Interface to reverse the result -// of the Less() method. -package heap - -import "sort" - -func Initialize(s sort.Interface) { - l := s.Len() - // start with the right most node - for n := l - 1; n >= 0; n-- { - BubbleDown(s, n) - } -} - -func parent(s sort.Interface, i int) int { - if i%2 == 0 { - return (i - 1) / 2 - } - - return i / 2 -} - -func BubbleUp(s sort.Interface, i int) { - p := parent(s, i) - if p < 0 { - return - } - - if s.Less(i, p) { - s.Swap(i, p) - BubbleUp(s, p) - } -} - -func BubbleDown(s sort.Interface, i int) { - // length - l := s.Len() - lc := i*2 + 1 - if lc >= l { - return - } - - rc := i*2 + 2 - // the smaller child - sc := lc - if rc < l && s.Less(rc, lc) { - sc = rc - } - - if s.Less(sc, i) { - s.Swap(sc, i) - BubbleDown(s, sc) - } -} diff --git a/common/heap/heap_test.go b/common/heap/heap_test.go deleted file mode 100644 index ecc332f9e44..00000000000 --- a/common/heap/heap_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package heap - -import ( - "math/rand" - "sort" - "testing" - "time" - - "launchpad.net/gocheck" -) - -type HeapSuite struct{} - -// Hook up gocheck into the gotest runner. -func Test(t *testing.T) { - gocheck.TestingT(t) -} - -var _ = gocheck.Suite(&HeapSuite{}) - -func (_ *HeapSuite) SetUpSuite(c *gocheck.C) { - rand.Seed(time.Now().Unix()) -} - -func randomInts(bubbleUp bool) []int { - is := make([]int, 0, 100) - for i := 0; i < 100; i++ { - is = is[:i+1] - is[i] = rand.Intn(1000) - if bubbleUp { - BubbleUp(sort.IntSlice(is), i) - } - } - return is -} - -func (_ *HeapSuite) TestHeapSort(c *gocheck.C) { - for _, bubble := range []bool{true, false} { - is := randomInts(bubble) - temp := make([]int, len(is)) - copy(temp, is) - - if !bubble { - Initialize(sort.IntSlice(is)) - } - - sorted := make([]int, len(is)) - for i := range sorted { - sorted[i] = is[0] - l := len(is) - is, is[0] = is[:l-1], is[l-1] - BubbleDown(sort.IntSlice(is), 0) - } - - sort.Ints(temp) - c.Assert(sorted, gocheck.DeepEquals, temp) - } -} diff --git a/datastore/shard.go b/datastore/shard.go index 6dc94a92f64..1d8d94d8a16 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -146,7 +146,7 @@ func (self *Shard) executeMergeQuery(querySpec *parser.QuerySpec, processor engi i++ } - h := &engine.SeriesHeap{Ascending: querySpec.IsAscending()} + h := engine.NewSeriesHeap(querySpec.IsAscending()) merger := engine.NewCME("Shard", streams, h, processor, t == parser.FromClauseMerge) if _, err := merger.Update(); err != nil { return err diff --git a/engine/common_merge_engine.go b/engine/common_merge_engine.go index 93051f6aed8..05b6b59d270 100644 --- a/engine/common_merge_engine.go +++ b/engine/common_merge_engine.go @@ -21,7 +21,7 @@ func NewCommonMergeEngine(shards []uint32, mergeColumns bool, ascending bool, ne streams[i] = s cme.streams[sh] = s } - h := &SeriesHeap{Ascending: ascending} + h := NewSeriesHeap(ascending) cme.merger = NewCME("Engine", streams, h, next, mergeColumns) return cme } diff --git a/engine/merger.go b/engine/merger.go index e8b78a763fc..85ae75af2d9 100644 --- a/engine/merger.go +++ b/engine/merger.go @@ -13,7 +13,7 @@ type Merger struct { name string s []StreamQuery size int - h *SeriesHeap + h SeriesHeap n Processor lastStreamIdx int initializing bool @@ -34,7 +34,7 @@ type Merger struct { // yields `column2` and `column3` then the result time series will // have all 4 columns with two columns set to `nil` depending on which // side the point came from. -func NewCME(name string, s []StreamQuery, h *SeriesHeap, n Processor, mergeColumns bool) *Merger { +func NewCME(name string, s []StreamQuery, h SeriesHeap, n Processor, mergeColumns bool) *Merger { log4go.Debug("%sMerger: created with %d streams", name, len(s)) return &Merger{ name: name, diff --git a/engine/series_heap.go b/engine/series_heap.go index 627df759fc5..9850677d6d0 100644 --- a/engine/series_heap.go +++ b/engine/series_heap.go @@ -1,9 +1,8 @@ package engine import ( - "sort" + "container/heap" - "github.com/influxdb/influxdb/common/heap" "github.com/influxdb/influxdb/protocol" ) @@ -12,32 +11,61 @@ type Value struct { s *protocol.Series } -type ValueSlice []Value +type MinValueSlice struct { + values []Value +} + +func (mvs *MinValueSlice) Len() int { + return len(mvs.values) +} + +func (mvs *MinValueSlice) Less(i, j int) bool { + return mvs.values[i].s.Points[0].GetTimestamp() < mvs.values[j].s.Points[0].GetTimestamp() +} + +func (mvs *MinValueSlice) Swap(i, j int) { + mvs.values[i], mvs.values[j] = mvs.values[j], mvs.values[i] +} + +func (mvs *MinValueSlice) Push(x interface{}) { + mvs.values = append(mvs.values, x.(Value)) +} -func (vs ValueSlice) Len() int { - return len(vs) +func (mvs *MinValueSlice) Pop() interface{} { + l := len(mvs.values) + var v interface{} + v, mvs.values = mvs.values[l-1], mvs.values[:l-1] + return v } -func (vs ValueSlice) Less(i, j int) bool { - return vs[i].s.Points[0].GetTimestamp() < vs[j].s.Points[0].GetTimestamp() +type MaxValueSlice struct { + *MinValueSlice } -func (vs ValueSlice) Swap(i, j int) { - vs[i], vs[j] = vs[j], vs[i] +func (mvs MaxValueSlice) Less(i, j int) bool { + return mvs.values[i].s.Points[0].GetTimestamp() > + mvs.values[j].s.Points[0].GetTimestamp() } -// A heap that keeps holds one point series' (series that have one +// A heap that holds one point series' (series that have one // point only) and their stream ids. See http://en.wikipedia.org/wiki/Heap_(data_structure) // for more info on heaps. The heap is used by the Merger to emit // points from multiple streams in monotic order. type SeriesHeap struct { Ascending bool - values []Value + values *MinValueSlice +} + +func NewSeriesHeap(asc bool) SeriesHeap { + return SeriesHeap{ + Ascending: asc, + values: &MinValueSlice{}, + } } // returns the number of values in the heap so far -func (sh *SeriesHeap) Size() int { - return len(sh.values) +func (sh SeriesHeap) Size() int { + return sh.values.Len() } // Add another one point series with the given stream id. TODO: This @@ -45,17 +73,17 @@ func (sh *SeriesHeap) Size() int { // if we had a value slice we can construct the heap in O(n) instead // of O(n logn) which is required if we construct the heap using // multiple calls to Add() -func (sh *SeriesHeap) Add(streamId int, s *protocol.Series) { - sh.values = append(sh.values, Value{ - s: s, - streamId: streamId, - }) - l := sh.Size() +func (sh SeriesHeap) Add(streamId int, s *protocol.Series) { + var vs heap.Interface if sh.Ascending { - heap.BubbleUp(ValueSlice(sh.values), l-1) + vs = sh.values } else { - heap.BubbleUp(sort.Reverse(ValueSlice(sh.values)), l-1) + vs = MaxValueSlice{sh.values} } + heap.Push(vs, Value{ + s: s, + streamId: streamId, + }) } // Get and remove the next one point series that has smallest (or @@ -65,15 +93,13 @@ func (sh *SeriesHeap) Add(streamId int, s *protocol.Series) { // the stream will be added immediately after and will cause a // BubbleUp. In big O() notation this step doesn't change much, it // only adds a contant to the upper bound. -func (sh *SeriesHeap) Next() (int, *protocol.Series) { - idx := 0 - s := sh.Size() - v := sh.values[idx] - sh.values, sh.values[0] = sh.values[:s-1], sh.values[s-1] +func (sh SeriesHeap) Next() (int, *protocol.Series) { + var vs heap.Interface if sh.Ascending { - heap.BubbleDown(ValueSlice(sh.values), 0) + vs = sh.values } else { - heap.BubbleDown(sort.Reverse(ValueSlice(sh.values)), 0) + vs = MaxValueSlice{sh.values} } + v := heap.Remove(vs, 0).(Value) return v.streamId, v.s } From 44643de073c0d5cefa386ea56dd51ce7c34ee970 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Fri, 10 Oct 2014 16:29:36 -0400 Subject: [PATCH 7/7] make sure we close the point iterators --- datastore/shard.go | 1 + 1 file changed, 1 insertion(+) diff --git a/datastore/shard.go b/datastore/shard.go index 1d8d94d8a16..bd79e7f9825 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -134,6 +134,7 @@ func (self *Shard) executeMergeQuery(querySpec *parser.QuerySpec, processor engi log.Error(err) return err } + defer iterators[i].Close() aliases := querySpec.SelectQuery().GetTableAliases(s.Name) if len(aliases) > 1 { return fmt.Errorf("Cannot have the same table joined more than once")