diff --git a/cluster/cluster_configuration.go b/cluster/cluster_configuration.go index 1126cf3c02b..b0bf2115287 100644 --- a/cluster/cluster_configuration.go +++ b/cluster/cluster_configuration.go @@ -901,6 +901,7 @@ func (self *ClusterConfiguration) GetShardsForQuery(querySpec *parser.QuerySpec) if err != nil { return nil, err } + log.Debug("Querying %d shards for query", len(shards)) shards = self.getShardRange(querySpec, shards) if querySpec.IsAscending() { SortShardsByTimeAscending(shards) diff --git a/cluster/shard.go b/cluster/shard.go index 6f75bf424cb..a4e7e76feb3 100644 --- a/cluster/shard.go +++ b/cluster/shard.go @@ -238,7 +238,7 @@ func (self *ShardData) getProcessor(querySpec *parser.QuerySpec, processor engin // We should aggregate at the shard level if self.ShouldAggregateLocally(querySpec) { log.Debug("creating a query engine") - processor, err = engine.NewQueryEngine(processor, query) + processor, err = engine.NewQueryEngine(processor, query, nil) if err != nil { return nil, err } @@ -300,6 +300,8 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan<- *p.Res var processor engine.Processor = NewResponseChannelProcessor(NewResponseChannelWrapper(response)) var err error + processor = NewShardIdInserterProcessor(self.Id(), processor) + processor, err = self.getProcessor(querySpec, processor) if err != nil { response <- &p.Response{ diff --git a/cluster/shard_id_inserter.go b/cluster/shard_id_inserter.go new file mode 100644 index 00000000000..b62c60afaa6 --- /dev/null +++ b/cluster/shard_id_inserter.go @@ -0,0 +1,31 @@ +package cluster + +import ( + "fmt" + + "github.com/influxdb/influxdb/engine" + "github.com/influxdb/influxdb/protocol" +) + +// A processor to set the ShardId on the series to `id` +type ShardIdInserterProcessor struct { + id uint32 + next engine.Processor +} + +func NewShardIdInserterProcessor(id uint32, next engine.Processor) ShardIdInserterProcessor { + return ShardIdInserterProcessor{id, next} +} + +func (sip ShardIdInserterProcessor) Yield(s *protocol.Series) (bool, error) { + s.ShardId = &sip.id + return sip.next.Yield(s) +} + +func (sip ShardIdInserterProcessor) Close() error { + return sip.next.Close() +} + +func (sip ShardIdInserterProcessor) Name() string { + return fmt.Sprintf("ShardIdInserterProcessor (%d)", sip.id) +} diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index d0a8815b32e..5b6f77e3b23 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -237,7 +237,11 @@ func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writ if !shouldAggregateLocally { // if we should aggregate in the coordinator (i.e. aggregation // isn't happening locally at the shard level), create an engine - writer, err = engine.NewQueryEngine(writer, q) + shardIds := make([]uint32, len(shards)) + for i, s := range shards { + shardIds[i] = s.Id() + } + writer, err = engine.NewQueryEngine(writer, q, shardIds) return shards, writer, err } @@ -268,8 +272,27 @@ func (self *Coordinator) queryShards(querySpec *parser.QuerySpec, shards []*clus return nil } +func (self *Coordinator) expandRegex(spec *parser.QuerySpec) { + q := spec.SelectQuery() + if q == nil { + return + } + + if f := q.FromClause; f.Type == parser.FromClauseMergeFun { + series := self.clusterConfiguration.MetaStore.GetSeriesForDatabaseAndRegex(spec.Database(), q.FromClause.Regex) + f.Type = parser.FromClauseMerge + f.Regex = nil + for _, s := range series { + f.Names = append(f.Names, &parser.TableName{ + Name: &parser.Value{Name: s, Type: parser.ValueTableName}, + }) + } + } +} + // We call this function only if we have a Select query (not continuous) or Delete query func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, p engine.Processor) error { + self.expandRegex(querySpec) shards, processor, err := self.getShardsAndProcessor(querySpec, p) if err != nil { return err diff --git a/datastore/point_iterator.go b/datastore/point_iterator.go index 25741cfffa6..4458b3c966d 100644 --- a/datastore/point_iterator.go +++ b/datastore/point_iterator.go @@ -24,7 +24,13 @@ type PointIterator struct { asc bool } -func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startTime, endTime time.Time, asc bool) PointIterator { +// Creates a new point iterator using the given column iterator, +// metadata columns, start and end time as well as the ascending +// flag. The iterator returned is already placed at the first point, +// there's no need to call Next() after the call to NewPointIterator, +// but the user should check Valid() to make sure the iterator is +// pointing at a valid point. +func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startTime, endTime time.Time, asc bool) *PointIterator { pi := PointIterator{ valid: true, err: nil, @@ -38,9 +44,12 @@ func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startT // seek to the first point pi.Next() - return pi + return &pi } +// public api + +// Advance the iterator to the next point func (pi *PointIterator) Next() { valueBuffer := proto.NewBuffer(nil) pi.valid = false @@ -127,6 +136,30 @@ func (pi *PointIterator) Next() { pi.point.SequenceNumber = proto.Uint64(next.sequence) } +// Returns true if the iterator is pointing at a valid +// location. Behavior of Point() is undefined if Valid() is false. +func (pi *PointIterator) Valid() bool { + return pi.valid +} + +// Returns the point that the iterator is pointing to. +func (pi *PointIterator) Point() *protocol.Point { return pi.point } + +// Returns an error if the iterator became invalid due to an error as +// opposed to reaching the end time. +func (pi *PointIterator) Error() error { return pi.err } + +// Close the iterator and free any resources used by the +// iterator. Behavior of the iterator is undefined if the iterator is +// used after it was closed. +func (pi *PointIterator) Close() { + for _, itr := range pi.itrs { + itr.Close() + } +} + +// private api + func (pi *PointIterator) getIteratorNextValue() error { for i, it := range pi.itrs { if pi.rawColumnValues[i].value != nil { @@ -165,21 +198,7 @@ func (pi *PointIterator) getIteratorNextValue() error { return nil } -func (pi *PointIterator) Valid() bool { - return pi.valid -} - func (pi *PointIterator) setError(err error) { pi.err = err pi.valid = false } - -func (pi *PointIterator) Point() *protocol.Point { return pi.point } - -func (pi *PointIterator) Error() error { return pi.err } - -func (pi *PointIterator) Close() { - for _, itr := range pi.itrs { - itr.Close() - } -} diff --git a/datastore/point_iterator_stream.go b/datastore/point_iterator_stream.go new file mode 100644 index 00000000000..16f94f4cf05 --- /dev/null +++ b/datastore/point_iterator_stream.go @@ -0,0 +1,35 @@ +package datastore + +import "github.com/influxdb/influxdb/protocol" + +// PointIteratorStream is a struct that implements the StreamQuery +// interface and is used by the shard with the Merger to merge the +// data points locally to form a monotic stream of points (increasing +// or decreasing timestamps) +type PointIteratorStream struct { + pi *PointIterator + name string + fields []string +} + +// Returns true if the point iterator is still valid +func (pis PointIteratorStream) HasPoint() bool { + return pis.pi.Valid() +} + +// Returns the next point from the point iterator +func (pis PointIteratorStream) Next() *protocol.Series { + p := pis.pi.Point() + s := &protocol.Series{ + Name: &pis.name, + Fields: pis.fields, + Points: []*protocol.Point{p}, + } + pis.pi.Next() + return s +} + +// Returns true if the point iterator is not valid +func (pis PointIteratorStream) Closed() bool { + return !pis.pi.Valid() +} diff --git a/datastore/shard.go b/datastore/shard.go index fcfcf605977..bd79e7f9825 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -102,12 +102,62 @@ func (self *Shard) Query(querySpec *parser.QuerySpec, processor engine.Processor return self.executeDeleteQuery(querySpec, processor) } - seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() - if !self.hasReadAccess(querySpec) { return errors.New("User does not have access to one or more of the series requested.") } + switch t := querySpec.SelectQuery().FromClause.Type; t { + case parser.FromClauseArray: + log.Debug("Shard %s: running a regular query", self.db.Path()) + return self.executeArrayQuery(querySpec, processor) + case parser.FromClauseMerge, parser.FromClauseInnerJoin: + log.Debug("Shard %s: running a merge query", self.db.Path()) + return self.executeMergeQuery(querySpec, processor, t) + default: + panic(fmt.Errorf("Unknown from clause type %s", t)) + } +} + +func (self *Shard) IsClosed() bool { + return self.closed +} + +func (self *Shard) executeMergeQuery(querySpec *parser.QuerySpec, processor engine.Processor, t parser.FromClauseType) error { + seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() + iterators := make([]*PointIterator, len(seriesAndColumns)) + streams := make([]engine.StreamQuery, len(iterators)) + i := 0 + var err error + for s, c := range seriesAndColumns { + c, iterators[i], err = self.getPointIteratorForSeries(querySpec, s.Name, c) + if err != nil { + log.Error(err) + return err + } + defer iterators[i].Close() + aliases := querySpec.SelectQuery().GetTableAliases(s.Name) + if len(aliases) > 1 { + return fmt.Errorf("Cannot have the same table joined more than once") + } + streams[i] = PointIteratorStream{ + pi: iterators[i], + name: aliases[0], + fields: c, + } + i++ + } + + h := engine.NewSeriesHeap(querySpec.IsAscending()) + merger := engine.NewCME("Shard", streams, h, processor, t == parser.FromClauseMerge) + if _, err := merger.Update(); err != nil { + return err + } + return nil +} + +func (self *Shard) executeArrayQuery(querySpec *parser.QuerySpec, processor engine.Processor) error { + seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() + for series, columns := range seriesAndColumns { if regex, ok := series.GetCompiledRegex(); ok { seriesNames := self.metaStore.GetSeriesForDatabaseAndRegex(querySpec.Database(), regex) @@ -127,37 +177,27 @@ func (self *Shard) Query(querySpec *parser.QuerySpec, processor engine.Processor } } } - return nil -} -func (self *Shard) IsClosed() bool { - return self.closed + return nil } -func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName string, columns []string, processor engine.Processor) error { - fields, err := self.getFieldsForSeries(querySpec.Database(), seriesName, columns) +func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, name string, columns []string, processor engine.Processor) error { + if querySpec.IsSinglePointQuery() { + log.Debug("Running single query for series %s", name) + return self.executeSinglePointQuery(querySpec, name, columns, processor) + } + var pi *PointIterator + var err error + columns, pi, err = self.getPointIteratorForSeries(querySpec, name, columns) if err != nil { - log.Error("Error looking up fields for %s: %s", seriesName, err) return err } - - if querySpec.IsSinglePointQuery() { - log.Debug("Running single query for series %s, fields %v", seriesName, fields) - return self.executeSinglePointQuery(querySpec, seriesName, fields, processor) - } - - startTime := querySpec.GetStartTime() - endTime := querySpec.GetEndTime() + defer pi.Close() query := querySpec.SelectQuery() + aliases := query.GetTableAliases(name) - aliases := query.GetTableAliases(seriesName) - - fieldNames, iterators := self.getIterators(fields, startTime, endTime, query.Ascending) - seriesOutgoing := &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} - pi := NewPointIterator(iterators, fields, querySpec.GetStartTime(), querySpec.GetEndTime(), query.Ascending) - defer pi.Close() - + seriesOutgoing := &protocol.Series{Name: protocol.String(name), Fields: columns, Points: make([]*protocol.Point, 0, self.pointBatchSize)} for pi.Valid() { p := pi.Point() seriesOutgoing.Points = append(seriesOutgoing.Points, p) @@ -170,7 +210,7 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName return err } } - seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} + seriesOutgoing = &protocol.Series{Name: protocol.String(name), Fields: columns, Points: make([]*protocol.Point, 0, self.pointBatchSize)} } pi.Next() @@ -193,6 +233,79 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName return nil } +func (self *Shard) executeSinglePointQuery(querySpec *parser.QuerySpec, name string, columns []string, p engine.Processor) error { + fields, err := self.getFieldsForSeries(querySpec.Database(), name, columns) + if err != nil { + log.Error("Error looking up fields for %s: %s", name, err) + return err + } + + query := querySpec.SelectQuery() + fieldCount := len(fields) + fieldNames := make([]string, 0, fieldCount) + point := &protocol.Point{Values: make([]*protocol.FieldValue, 0, fieldCount)} + timestamp := common.TimeToMicroseconds(query.GetStartTime()) + sequenceNumber, err := query.GetSinglePointQuerySequenceNumber() + if err != nil { + return err + } + + // set the timestamp and sequence number + point.SequenceNumber = &sequenceNumber + point.SetTimestampInMicroseconds(timestamp) + + for _, field := range fields { + sk := newStorageKey(field.Id, timestamp, sequenceNumber) + data, err := self.db.Get(sk.bytes()) + if err != nil { + return err + } + + if data == nil { + continue + } + + fieldValue := &protocol.FieldValue{} + err = proto.Unmarshal(data, fieldValue) + if err != nil { + return err + } + fieldNames = append(fieldNames, field.Name) + point.Values = append(point.Values, fieldValue) + } + + result := &protocol.Series{Name: &name, Fields: fieldNames, Points: []*protocol.Point{point}} + + if len(result.Points) > 0 { + _, err := p.Yield(result) + return err + } + return nil +} + +func (self *Shard) getPointIteratorForSeries(querySpec *parser.QuerySpec, name string, columns []string) ([]string, *PointIterator, error) { + fields, err := self.getFieldsForSeries(querySpec.Database(), name, columns) + if err != nil { + log.Error("Error looking up fields for %s: %s", name, err) + return nil, nil, err + } + + startTime := querySpec.GetStartTime() + endTime := querySpec.GetEndTime() + + query := querySpec.SelectQuery() + + iterators := self.getIterators(fields, startTime, endTime, query.Ascending) + pi := NewPointIterator(iterators, fields, querySpec.GetStartTime(), querySpec.GetEndTime(), query.Ascending) + + columns = make([]string, len(fields)) + for i := range fields { + columns[i] = fields[i].Name + } + + return columns, pi, nil +} + func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor engine.Processor) error { query := querySpec.DeleteQuery() series := query.GetFromClause() @@ -282,57 +395,11 @@ func (self *Shard) close() { self.db = nil } -func (self *Shard) executeSinglePointQuery(querySpec *parser.QuerySpec, series string, fields []*metastore.Field, p engine.Processor) error { - query := querySpec.SelectQuery() - fieldCount := len(fields) - fieldNames := make([]string, 0, fieldCount) - point := &protocol.Point{Values: make([]*protocol.FieldValue, 0, fieldCount)} - timestamp := common.TimeToMicroseconds(query.GetStartTime()) - sequenceNumber, err := query.GetSinglePointQuerySequenceNumber() - if err != nil { - return err - } - - // set the timestamp and sequence number - point.SequenceNumber = &sequenceNumber - point.SetTimestampInMicroseconds(timestamp) - - for _, field := range fields { - sk := newStorageKey(field.Id, timestamp, sequenceNumber) - data, err := self.db.Get(sk.bytes()) - if err != nil { - return err - } - - if data == nil { - continue - } - - fieldValue := &protocol.FieldValue{} - err = proto.Unmarshal(data, fieldValue) - if err != nil { - return err - } - fieldNames = append(fieldNames, field.Name) - point.Values = append(point.Values, fieldValue) - } - - result := &protocol.Series{Name: &series, Fields: fieldNames, Points: []*protocol.Point{point}} - - if len(result.Points) > 0 { - _, err := p.Yield(result) - return err - } - return nil -} - -func (self *Shard) getIterators(fields []*metastore.Field, start, end time.Time, isAscendingQuery bool) (fieldNames []string, iterators []storage.Iterator) { +func (self *Shard) getIterators(fields []*metastore.Field, start, end time.Time, isAscendingQuery bool) (iterators []storage.Iterator) { iterators = make([]storage.Iterator, len(fields)) - fieldNames = make([]string, len(fields)) // start the iterators to go through the series data for i, field := range fields { - fieldNames[i] = field.Name iterators[i] = self.db.Iterator() t := start @@ -344,6 +411,7 @@ func (self *Shard) getIterators(fields []*metastore.Field, start, end time.Time, tmicro := common.TimeToMicroseconds(t) sk := newStorageKey(field.Id, tmicro, seq) + log.Debug("Initializing iterator to %v", sk.bytes()) iterators[i].Seek(sk.bytes()) if !isAscendingQuery && iterators[i].Valid() { @@ -352,7 +420,7 @@ func (self *Shard) getIterators(fields []*metastore.Field, start, end time.Time, if err := iterators[i].Error(); err != nil { log.Error("Error while getting iterators: %s", err) - return nil, nil + return nil } } return diff --git a/datastore/storage_key.go b/datastore/storage_key.go index 12c0e7c9970..f5232e9d357 100644 --- a/datastore/storage_key.go +++ b/datastore/storage_key.go @@ -7,6 +7,8 @@ import ( "math" "time" + "code.google.com/p/log4go" + "github.com/influxdb/influxdb/common" ) @@ -47,6 +49,7 @@ func parseKey(b []byte) (storageKey, error) { sk.timestamp = convertUintTimestampToInt64(t) binary.Read(buf, binary.BigEndian, &sk.seq) sk.bytesBuf = b + log4go.Debug("Parsed %v to %v", b, sk) return sk, nil } diff --git a/engine/common_merge_engine.go b/engine/common_merge_engine.go index 4434dcb110d..05b6b59d270 100644 --- a/engine/common_merge_engine.go +++ b/engine/common_merge_engine.go @@ -2,224 +2,48 @@ package engine import "github.com/influxdb/influxdb/protocol" -type seriesMergeState struct { - name string - series []*protocol.Series - fields []string - done bool -} - -func (self *seriesMergeState) hasPoints() bool { - return len(self.series) > 0 && len(self.series[0].Points) > 0 -} - -func isEarlier(first *seriesMergeState, other *seriesMergeState) bool { - return *first.series[0].Points[0].Timestamp < *other.series[0].Points[0].Timestamp -} - -func isLater(first *seriesMergeState, other *seriesMergeState) bool { - return *first.series[0].Points[0].Timestamp > *other.series[0].Points[0].Timestamp -} - -func (self *seriesMergeState) flush(state *CommonMergeEngine) (bool, error) { - for _, s := range self.series { - s := state.mergeColumnsInSeries(s) - ok, err := state.next.Yield(s) - if !ok || err != nil { - return ok, err - } - } - self.series = nil - return true, nil -} - -// update the state, the points belong to this seriesMergeState (i.e. the name of the timeseries matches) -func (self *seriesMergeState) updateState(p *protocol.Series) { - if *p.Name != self.name { - return - } - - // setup the fields - if self.fields == nil { - self.fields = p.Fields - } - - // data for current table is exhausted - if len(p.Points) == 0 { - self.done = true - } else { - self.series = append(self.series, p) - } -} - type CommonMergeEngine struct { - next Processor - leftGoFirst func(_, _ *seriesMergeState) bool - fields map[string]int - left *seriesMergeState - right *seriesMergeState - mergeColumns bool -} - -// set the fields of the other time series to null making sure that -// the order of the null values match the order of the field -// definitions, i.e. left timeseries first followed by values from the -// right timeseries -func (self *CommonMergeEngine) mergeColumnsInSeries(s *protocol.Series) *protocol.Series { - if !self.mergeColumns { - return s - } - - newSeries := &protocol.Series{ - Name: s.Name, - Fields: self.getFields(), - Points: make([]*protocol.Point, len(s.Points)), - } - - for idx, p := range s.Points { - newPoint := &protocol.Point{ - Timestamp: p.Timestamp, - SequenceNumber: p.SequenceNumber, - } - newValues := make([]*protocol.FieldValue, len(self.fields)) - oldValues := p.Values - for idx, f := range s.Fields { - newIdx := self.fields[f] - newValues[newIdx] = oldValues[idx] - } - newPoint.Values = newValues - newSeries.Points[idx] = newPoint - } - return newSeries -} - -// set the fields of the other time series to null making sure that -// the order of the null values match the order of the field -// definitions, i.e. left timeseries first followed by values from the -// right timeseries -func (self *CommonMergeEngine) getFields() []string { - fields := make([]string, len(self.fields)) - for f, i := range self.fields { - fields[i] = f - } - return fields -} - -func (self *CommonMergeEngine) yieldNextPoints() (bool, error) { - // see which point should be returned next and remove it from the - // series - for self.left.hasPoints() && self.right.hasPoints() { - // state is the state of the series from which the next point - // will be fetched - next := self.right - if self.leftGoFirst(self.left, self.right) { - next = self.left - } - - s := next.removeAndGetFirstPoint() - ok, err := self.next.Yield(self.mergeColumnsInSeries(s)) - if err != nil || !ok { - return ok, err - } - } - return true, nil -} - -// if `other` state is done (i.e. we'll receive no more points for its -// timeseries) then we know that we won't get any points that are -// older than what's in `self` so we can safely flush all `self` -// points. -func (self *CommonMergeEngine) flushIfNecessary() (bool, error) { - if self.left.done && len(self.left.series) == 0 { - if ok, err := self.right.flush(self); err != nil || !ok { - return ok, err - } - } - if self.right.done && len(self.right.series) == 0 { - if ok, err := self.left.flush(self); err != nil || !ok { - return ok, err - } - } - return true, nil -} - -func (self *CommonMergeEngine) updateState(p *protocol.Series) { - self.left.updateState(p) - self.right.updateState(p) - - // create the fields map - if self.fields == nil && self.left.fields != nil && self.right.fields != nil { - self.fields = make(map[string]int) - - i := 0 - for _, s := range []*seriesMergeState{self.left, self.right} { - for _, f := range s.fields { - if _, ok := self.fields[f]; ok { - continue - } - self.fields[f] = i - i++ - } - } - } -} - -func (self *seriesMergeState) removeAndGetFirstPoint() *protocol.Series { - s := &protocol.Series{ - Name: self.series[0].Name, - Fields: self.series[0].Fields, - Points: []*protocol.Point{self.series[0].Points[0]}, - } - // get rid of that point, or get rid of the entire series - // if this is the last point - if len(self.series[0].Points) == 1 { - self.series = self.series[1:] - } else { - self.series[0].Points = self.series[0].Points[1:] - } - return s + merger *Merger + streams map[uint32]StreamUpdate + next Processor } // returns a yield function that will sort points from table1 and // table2 no matter what the order in which they are received. -func NewCommonMergeEngine(table1, table2 string, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine { - state1 := &seriesMergeState{ - name: table1, +func NewCommonMergeEngine(shards []uint32, mergeColumns bool, ascending bool, next Processor) *CommonMergeEngine { + cme := &CommonMergeEngine{ + streams: make(map[uint32]StreamUpdate, len(shards)), + next: next, } - state2 := &seriesMergeState{ - name: table2, + streams := make([]StreamQuery, len(shards)) + for i, sh := range shards { + s := NewStream() + streams[i] = s + cme.streams[sh] = s } + h := NewSeriesHeap(ascending) + cme.merger = NewCME("Engine", streams, h, next, mergeColumns) + return cme +} - whoGoFirst := isEarlier - if !ascending { - whoGoFirst = isLater +func (cme *CommonMergeEngine) Close() error { + for _, s := range cme.streams { + s.Close() } - return &CommonMergeEngine{ - next: next, - left: state1, - right: state2, - leftGoFirst: whoGoFirst, - mergeColumns: mergeColumns, + _, err := cme.merger.Update() + if err != nil { + return err } + return cme.next.Close() } -func (e *CommonMergeEngine) Close() error { - e.Yield(&protocol.Series{Name: &e.left.name, Fields: []string{}}) - e.Yield(&protocol.Series{Name: &e.right.name, Fields: []string{}}) - return e.next.Close() +func (cme *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) { + stream := cme.streams[s.GetShardId()] + stream.Yield(s) + return cme.merger.Update() } -func (e *CommonMergeEngine) Name() string { +func (cme *CommonMergeEngine) Name() string { return "CommonMergeEngine" } - -func (e *CommonMergeEngine) Yield(s *protocol.Series) (bool, error) { - e.updateState(s) - - if ok, err := e.flushIfNecessary(); !ok || err != nil { - return ok, err - } - - return e.yieldNextPoints() -} diff --git a/engine/engine.go b/engine/engine.go index c2227f4b246..d46f822bb65 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -2,7 +2,7 @@ package engine import "github.com/influxdb/influxdb/parser" -func NewQueryEngine(next Processor, query *parser.SelectQuery) (Processor, error) { +func NewQueryEngine(next Processor, query *parser.SelectQuery, shards []uint32) (Processor, error) { limit := query.Limit var engine Processor = NewPassthroughEngineWithLimit(next, 1, limit) @@ -15,10 +15,20 @@ func NewQueryEngine(next Processor, query *parser.SelectQuery) (Processor, error } fromClause := query.GetFromClause() - if fromClause.Type == parser.FromClauseMerge { - engine = NewMergeEngine(fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name, query.Ascending, engine) - } else if fromClause.Type == parser.FromClauseInnerJoin { - engine = NewJoinEngine(query, engine) + + switch fromClause.Type { + case parser.FromClauseInnerJoin: + engine = NewJoinEngine(shards, query, engine) + case parser.FromClauseMerge: + tables := make([]string, len(fromClause.Names)) + for i, name := range fromClause.Names { + tables[i] = name.Name.Name + } + engine = NewMergeEngine(shards, query.Ascending, engine) + case parser.FromClauseMergeFun: + // At this point the regex should be expanded to the list of + // tables that will be queries + panic("QueryEngine cannot be called with merge function") } if err != nil { diff --git a/engine/join_engine.go b/engine/join_engine.go index 8c0b27cc3c6..774308195f0 100644 --- a/engine/join_engine.go +++ b/engine/join_engine.go @@ -14,7 +14,7 @@ type JoinEngine struct { lastFields1, lastFields2 []string } -func NewJoinEngine(query *parser.SelectQuery, next Processor) Processor { +func NewJoinEngine(shards []uint32, query *parser.SelectQuery, next Processor) Processor { table1 := query.GetFromClause().Names[0].GetAlias() table2 := query.GetFromClause().Names[1].GetAlias() name := table1 + "_join_" + table2 @@ -26,7 +26,7 @@ func NewJoinEngine(query *parser.SelectQuery, next Processor) Processor { table2: table2, query: query, } - mergeEngine := NewCommonMergeEngine(table1, table2, false, query.Ascending, joinEngine) + mergeEngine := NewCommonMergeEngine(shards, false, query.Ascending, joinEngine) return mergeEngine } diff --git a/engine/merge_engine.go b/engine/merge_engine.go index bb748ac326b..1962d63830e 100644 --- a/engine/merge_engine.go +++ b/engine/merge_engine.go @@ -7,12 +7,12 @@ type MergeEngine struct { next Processor } -func NewMergeEngine(table1, table2 string, ascending bool, next Processor) Processor { - name := table1 + "_merge_" + table2 +func NewMergeEngine(shards []uint32, ascending bool, next Processor) Processor { + name := "merged" me := &MergeEngine{name: name, next: next} - return NewCommonMergeEngine(table1, table2, true, ascending, me) + return NewCommonMergeEngine(shards, true, ascending, me) } func (me *MergeEngine) Yield(s *protocol.Series) (bool, error) { diff --git a/engine/merger.go b/engine/merger.go new file mode 100644 index 00000000000..85ae75af2d9 --- /dev/null +++ b/engine/merger.go @@ -0,0 +1,200 @@ +package engine + +import ( + "code.google.com/p/log4go" + "github.com/influxdb/influxdb/protocol" +) + +// Merger merges a number of StreamQuery into one stream of points +// where the output stream of points has a monotic timestamp order +// (increasing or decreasing depending on the SeriesHeap that is +// passed to NewCME) +type Merger struct { + name string + s []StreamQuery + size int + h SeriesHeap + n Processor + lastStreamIdx int + initializing bool + mergeColumns bool + fields map[string]struct{} + resultFields []string + resultFieldsPerStream map[int][]int +} + +// Creates a new merger that will merge the given slice of StreamQuery +// and yields the result to the processor `n`. `name` is used to +// identify the merger in the logs since it's being used in multiple +// places. The SeriesHeap `h` is used to keep track of the next point +// (whether it's the smallest or largest timestamp depends on the +// SeriesHeap). If `mergeColumns` is true, the resulting time series +// will have the fields from all StreamQueries, i.e. if the first +// stream yields `column0` and `column1` while the second stream +// yields `column2` and `column3` then the result time series will +// have all 4 columns with two columns set to `nil` depending on which +// side the point came from. +func NewCME(name string, s []StreamQuery, h SeriesHeap, n Processor, mergeColumns bool) *Merger { + log4go.Debug("%sMerger: created with %d streams", name, len(s)) + return &Merger{ + name: name, + s: s, + h: h, + n: n, + lastStreamIdx: 0, + mergeColumns: mergeColumns, + initializing: true, + fields: make(map[string]struct{}), + resultFieldsPerStream: make(map[int][]int), + } +} + +// public api + +// Consume and yield as many points as we can until one of the streams +// runs out of points (i.e. the call to HasPoint() returns false) +func (cme *Merger) Update() (bool, error) { + if cme.initializing { + return cme.initialize() + } + return cme.tryYieldNextPoint() +} + +// private api + +// initialize the state of the Merger by reading one point from each +// StreamQuery. if `mergeColumns` is true the union of the fields +// received on the first point from each StreamQuery will be the +// fields of the result Series +func (cme *Merger) initialize() (bool, error) { + for cme.h.Size() != len(cme.s) { + stream := cme.s[cme.lastStreamIdx] + if !stream.HasPoint() && !stream.Closed() { + log4go.Debug("%sMerger: data not ready for stream %d, still initializing", cme.name, cme.lastStreamIdx) + return true, nil + } + + if stream.HasPoint() { + p := stream.Next() + cme.h.Add(cme.lastStreamIdx, p) + for _, f := range p.Fields { + cme.fields[f] = struct{}{} + } + cme.lastStreamIdx++ + } else if stream.Closed() { + s := len(cme.s) + cme.s[cme.lastStreamIdx] = cme.s[s-1] + cme.s = cme.s[:s-1] + } + + } + + if cme.mergeColumns { + // finished initialization + cme.resultFields = make([]string, 0, len(cme.fields)) + for f := range cme.fields { + cme.resultFields = append(cme.resultFields, f) + } + } + + log4go.Debug("%sMerger initialization finished", cme.name) + cme.initializing = false + cme.size = len(cme.s) + return cme.yieldNextPoint() +} + +// Try to get more data points from the StreamQueries and if +// successful call yieldNextPoint() +func (cme *Merger) tryYieldNextPoint() (bool, error) { + stream := cme.s[cme.lastStreamIdx] + // If the stream has new points, added to the heap + if stream.HasPoint() { + cme.h.Add(cme.lastStreamIdx, stream.Next()) + } else if stream.Closed() { + cme.size-- + } + + // If all streams have yielded one point. Then we can get the next + // point with the smallest (or largest) timestamp and yield it to the + // next processor. + if cme.h.Size() != cme.size { + return true, nil + } + + return cme.yieldNextPoint() +} + +// yield as many points as we can to the Processor `n` +func (cme *Merger) yieldNextPoint() (bool, error) { + // If we consumed all the input data points, return + // immediately. This can be the case for example if we finished + // initialization and the first call to yieldNextPoint() consumed + // all the data points. Without this check the call to the heap's + // Next() method will cause a panic + if cme.size == 0 { + return true, nil + } + + for { + var s *protocol.Series + cme.lastStreamIdx, s = cme.h.Next() + log4go.Debug("cme.lastStreamIdx: %d, s: %s", cme.lastStreamIdx, s) + cme.fixFields(s) + log4go.Debug("%sMerger yielding to %s: %s", cme.name, cme.n.Name(), s) + ok, err := cme.n.Yield(s) + if !ok || err != nil { + return ok, err + } + + stream := cme.s[cme.lastStreamIdx] + if stream.HasPoint() { + s := stream.Next() + log4go.Debug("%sMerger received %s from %d", s, cme.lastStreamIdx) + cme.h.Add(cme.lastStreamIdx, s) + continue + } else if stream.Closed() { + cme.size-- + if cme.size != 0 { + continue + } + } + + return true, nil + } +} + +// modify the series to have the union of the columns from all +// StreamQueries +func (cme *Merger) fixFields(s *protocol.Series) { + if !cme.mergeColumns { + return + } + + idx := cme.lastStreamIdx + mapping := cme.resultFieldsPerStream[idx] + if mapping == nil { + for _, f := range cme.resultFields { + index := -1 + for i, sf := range s.Fields { + if sf == f { + index = i + break + } + } + mapping = append(mapping, index) + cme.resultFieldsPerStream[idx] = mapping + } + } + + s.Fields = cme.resultFields + p := s.Points[0] + originalValues := p.Values + p.Values = nil + for _, i := range mapping { + if i == -1 { + p.Values = append(p.Values, nil) + continue + } + p.Values = append(p.Values, originalValues[i]) + } +} diff --git a/engine/series_heap.go b/engine/series_heap.go new file mode 100644 index 00000000000..9850677d6d0 --- /dev/null +++ b/engine/series_heap.go @@ -0,0 +1,105 @@ +package engine + +import ( + "container/heap" + + "github.com/influxdb/influxdb/protocol" +) + +type Value struct { + streamId int + s *protocol.Series +} + +type MinValueSlice struct { + values []Value +} + +func (mvs *MinValueSlice) Len() int { + return len(mvs.values) +} + +func (mvs *MinValueSlice) Less(i, j int) bool { + return mvs.values[i].s.Points[0].GetTimestamp() < mvs.values[j].s.Points[0].GetTimestamp() +} + +func (mvs *MinValueSlice) Swap(i, j int) { + mvs.values[i], mvs.values[j] = mvs.values[j], mvs.values[i] +} + +func (mvs *MinValueSlice) Push(x interface{}) { + mvs.values = append(mvs.values, x.(Value)) +} + +func (mvs *MinValueSlice) Pop() interface{} { + l := len(mvs.values) + var v interface{} + v, mvs.values = mvs.values[l-1], mvs.values[:l-1] + return v +} + +type MaxValueSlice struct { + *MinValueSlice +} + +func (mvs MaxValueSlice) Less(i, j int) bool { + return mvs.values[i].s.Points[0].GetTimestamp() > + mvs.values[j].s.Points[0].GetTimestamp() +} + +// A heap that holds one point series' (series that have one +// point only) and their stream ids. See http://en.wikipedia.org/wiki/Heap_(data_structure) +// for more info on heaps. The heap is used by the Merger to emit +// points from multiple streams in monotic order. +type SeriesHeap struct { + Ascending bool + values *MinValueSlice +} + +func NewSeriesHeap(asc bool) SeriesHeap { + return SeriesHeap{ + Ascending: asc, + values: &MinValueSlice{}, + } +} + +// returns the number of values in the heap so far +func (sh SeriesHeap) Size() int { + return sh.values.Len() +} + +// Add another one point series with the given stream id. TODO: This +// is slightly inefficient way to construct the initial value slice, +// if we had a value slice we can construct the heap in O(n) instead +// of O(n logn) which is required if we construct the heap using +// multiple calls to Add() +func (sh SeriesHeap) Add(streamId int, s *protocol.Series) { + var vs heap.Interface + if sh.Ascending { + vs = sh.values + } else { + vs = MaxValueSlice{sh.values} + } + heap.Push(vs, Value{ + s: s, + streamId: streamId, + }) +} + +// Get and remove the next one point series that has smallest (or +// largest) timestmap, according to the Ascending field. TODO: This is +// slightly inefficient since we remove a value from the values slice +// and do a BubbleDown which is inefficient since the next value from +// the stream will be added immediately after and will cause a +// BubbleUp. In big O() notation this step doesn't change much, it +// only adds a contant to the upper bound. +func (sh SeriesHeap) Next() (int, *protocol.Series) { + var vs heap.Interface + if sh.Ascending { + vs = sh.values + } else { + vs = MaxValueSlice{sh.values} + } + v := heap.Remove(vs, 0).(Value) + return v.streamId, v.s +} diff --git a/engine/stream.go b/engine/stream.go new file mode 100644 index 00000000000..4154d215eba --- /dev/null +++ b/engine/stream.go @@ -0,0 +1,68 @@ +package engine + +import "github.com/influxdb/influxdb/protocol" + +// See documentation of Stream +type StreamQuery interface { + // interface that query the state + HasPoint() bool + // Returns a series with one point only + Next() *protocol.Series + Closed() bool +} + +// See documentation of Stream +type StreamUpdate interface { + // interface that updates the state + Close() + Yield(*protocol.Series) +} + +// A stream keep track of the state of a stream of points. A stream is +// defined as an ordered stream of points that have the same set of +// fields. For simplicity this will be the stream of points for one +// time series. This will be extended to include stream of points from +// multiple series that are merged at the shard level and multiple +// streams are merged together again at the coordinator level. +type Stream struct { + s *protocol.Series + closed bool +} + +func NewStream() *Stream { + return &Stream{closed: false} +} + +func (stream *Stream) Yield(s *protocol.Series) { + if stream.s != nil { + stream.s.Points = append(stream.s.Points, s.Points...) + return + } + stream.s = s +} + +func (stream *Stream) Close() { + stream.closed = true +} + +func (stream *Stream) HasPoint() bool { + return stream.s != nil && len(stream.s.Points) > 0 +} + +func (stream *Stream) Next() *protocol.Series { + var p []*protocol.Point + p, stream.s.Points = stream.s.Points[:1:1], stream.s.Points[1:] + s := &protocol.Series{ + Name: stream.s.Name, + Fields: stream.s.Fields, + Points: p, + } + if len(stream.s.Points) == 0 { + stream.s = nil + } + return s +} + +func (stream *Stream) Closed() bool { + return stream.closed +} diff --git a/integration/data_test.go b/integration/data_test.go index 8c7a32a3460..cdce02a9465 100644 --- a/integration/data_test.go +++ b/integration/data_test.go @@ -267,6 +267,53 @@ func (self *DataTestSuite) TestModeWithNils(c *C) { c.Assert(maps[0]["m2"], Equals, nil) } +func (self *DataTestSuite) TestMergeRegex(c *C) { + data := ` +[ + { + "name": "test_merge_1", + "columns": ["time", "value"], + "points": [ + [1401321600000, "m11"], + [1401321800000, "m12"] + ] + }, + { + "name": "test_merge_2", + "columns": ["time", "value"], + "points": [ + [1401321700000, "m21"], + [1401321900000, "m22"] + ] + }, + { + "name": "test_merge_3", + "columns": ["time", "value"], + "points": [ + [1401321500000, "m31"], + [1401322000000, "m32"] + ] + } + ]` + self.client.WriteJsonData(data, c, influxdb.Millisecond) + serieses := self.client.RunQuery("select * from merge(/.*/)", c, "m") + c.Assert(serieses, HasLen, 1) + maps := ToMap(serieses[0]) + c.Assert(maps, HasLen, 6) + t := make([]float64, len(maps)) + for i, m := range maps { + t[i] = m["time"].(float64) + } + c.Assert(t, DeepEquals, []float64{ + 1401322000000, + 1401321900000, + 1401321800000, + 1401321700000, + 1401321600000, + 1401321500000, + }) +} + func (self *DataTestSuite) TestMergingOldData(c *C) { data := ` [ @@ -1135,6 +1182,24 @@ func (self *DataTestSuite) TestWhereConditionWithExpression(c *C) { c.Assert(maps[0]["b"], Equals, 1.0) } +func (self *DataTestSuite) JoinedWithSelf(c *C) (Fun, Fun) { + return func(client Client) { + client.WriteJsonData(` +[ + { + "name": "t", + "columns": ["time", "value"], + "points":[ + [1381346706000, 3], + [1381346701000, 1] + ] + } +]`, c, influxdb.Millisecond) + }, func(client Client) { + client.RunInvalidQuery("select * from t as foo inner join t as bar", c, "m") + } +} + // issue #740 and #781 func (self *DataTestSuite) TestJoiningDifferentFields(c *C) { // TODO: why do we get a different error if we remove all but the first values diff --git a/integration/legacy_data_test.go b/integration/legacy_data_test.go index 0eab7b0afd3..06f866ab977 100644 --- a/integration/legacy_data_test.go +++ b/integration/legacy_data_test.go @@ -1649,32 +1649,6 @@ func (self *DataTestSuite) QueryWithJoinedTablesWithWhereClause(c *C) (Fun, Fun) } } -func (self *DataTestSuite) JoinedWithSelf(c *C) (Fun, Fun) { - return func(client Client) { - createEngine(client, c, `[ - { - "points": [ - { "values": [{ "int64_value": 3 }], "timestamp": 1381346706000000 }, - { "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000 } - ], - "name": "t", - "fields": ["value"] - } - ]`) - }, func(client Client) { - runQuery(client, "select * from t as foo inner join t as bar", c, `[ - { - "points": [ - { "values": [{ "int64_value": 3 }, { "int64_value": 3 }], "timestamp": 1381346706000000 }, - { "values": [{ "int64_value": 1 }, { "int64_value": 1 }], "timestamp": 1381346701000000 } - ], - "name": "foo_join_bar", - "fields": ["foo.value", "bar.value"] - } - ]`) - } -} - func (self *DataTestSuite) QueryWithMergedTablesWithPointsAppend(c *C) (Fun, Fun) { return func(client Client) { createEngine(client, c, `[ diff --git a/integration/test_missing_points.go b/integration/missing_points_test.go similarity index 100% rename from integration/test_missing_points.go rename to integration/missing_points_test.go diff --git a/parser/frees.c b/parser/frees.c index be992917a2a..51ca0a70559 100644 --- a/parser/frees.c +++ b/parser/frees.c @@ -29,7 +29,10 @@ free_table_name_array(table_name_array *array) void free_from_clause(from_clause *f) { - free_table_name_array(f->names); + if (f->names) + free_table_name_array(f->names); + if (f->regex_value) + free_value(f->regex_value); free(f); } diff --git a/parser/from_clause.go b/parser/from_clause.go index 41c88507b08..c058abf7cbc 100644 --- a/parser/from_clause.go +++ b/parser/from_clause.go @@ -5,6 +5,7 @@ package parser import "C" import ( "bytes" + "regexp" "strings" ) import "fmt" @@ -15,6 +16,7 @@ const ( FromClauseArray FromClauseType = C.FROM_ARRAY FromClauseMerge FromClauseType = C.FROM_MERGE FromClauseInnerJoin FromClauseType = C.FROM_INNER_JOIN + FromClauseMergeFun FromClauseType = C.FROM_MERGE_FUNCTION ) func (self *TableName) GetAlias() string { @@ -39,6 +41,7 @@ type TableName struct { type FromClause struct { Type FromClauseType Names []*TableName + Regex *regexp.Regexp } func (self *FromClause) GetString() string { diff --git a/parser/parser.go b/parser/parser.go index 41761e88d80..200b9da9e5c 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -468,11 +468,28 @@ func GetTableNameArray(array *C.table_name_array) ([]*TableName, error) { } func GetFromClause(fromClause *C.from_clause) (*FromClause, error) { - arr, err := GetTableNameArray(fromClause.names) - if err != nil { - return nil, err + t := FromClauseType(fromClause.from_clause_type) + var arr []*TableName + var regex *regexp.Regexp + + switch t { + case FromClauseMergeFun: + val, err := GetValue(fromClause.regex_value) + if err != nil { + return nil, err + } + if val.Type != ValueRegex { + return nil, fmt.Errorf("merge() accepts regex only") + } + regex = val.compiledRegex + default: + var err error + arr, err = GetTableNameArray(fromClause.names) + if err != nil { + return nil, err + } } - return &FromClause{FromClauseType(fromClause.from_clause_type), arr}, nil + return &FromClause{t, arr, regex}, nil } func GetIntoClause(intoClause *C.into_clause) (*IntoClause, error) { diff --git a/parser/parser_test.go b/parser/parser_test.go index 6086bc160ac..adf72016b5c 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -262,6 +262,14 @@ func (self *QueryParserSuite) TestParseFromWithMergedTable(c *C) { c.Assert(fromClause.Names[1].Name.Name, Equals, "user.signups") } +func (self *QueryParserSuite) TestParseFromWithMergeRegex(c *C) { + q, err := ParseSelectQuery("select count(*) from merge(/.*/) where time>now()-1d;") + c.Assert(err, IsNil) + fromClause := q.GetFromClause() + c.Assert(fromClause.Type, Equals, FromClauseMergeFun) + c.Assert(fromClause.Regex, NotNil) +} + func (self *QueryParserSuite) TestMultipleAggregateFunctions(c *C) { q, err := ParseSelectQuery("select first(bar), last(bar) from foo") c.Assert(err, IsNil) diff --git a/parser/query.yacc b/parser/query.yacc index 6e149b55734..fe2a086ce77 100644 --- a/parser/query.yacc +++ b/parser/query.yacc @@ -379,7 +379,7 @@ ALIAS_CLAUSE: FROM_CLAUSE: FROM TABLE_VALUE { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); $$->names = malloc(sizeof(table_name_array)); $$->names->elems = malloc(sizeof(table_name*)); $$->names->size = 1; @@ -391,14 +391,14 @@ FROM_CLAUSE: | FROM SIMPLE_TABLE_VALUES { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); $$->names = $2; $$->from_clause_type = FROM_ARRAY; } | FROM SIMPLE_TABLE_VALUE { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); $$->names = malloc(sizeof(table_name_array)); $$->names->elems = malloc(sizeof(table_name*)); $$->names->size = 1; @@ -410,7 +410,7 @@ FROM_CLAUSE: | FROM SIMPLE_TABLE_VALUE MERGE SIMPLE_TABLE_VALUE { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); $$->names = malloc(sizeof(table_name_array)); $$->names->elems = malloc(2 * sizeof(table_name*)); $$->names->size = 2; @@ -423,9 +423,17 @@ FROM_CLAUSE: $$->from_clause_type = FROM_MERGE; } | + FROM MERGE '(' REGEX_VALUE ')' + { + $$ = calloc(1, sizeof(from_clause)); + $$->from_clause_type = FROM_MERGE_FUNCTION; + $$->regex_value = $4; + } + | FROM SIMPLE_TABLE_VALUE ALIAS_CLAUSE INNER JOIN SIMPLE_TABLE_VALUE ALIAS_CLAUSE { - $$ = malloc(sizeof(from_clause)); + $$ = calloc(1, sizeof(from_clause)); + $$->regex_value = NULL; $$->names = malloc(sizeof(table_name_array)); $$->names->elems = malloc(2 * sizeof(value*)); $$->names->size = 2; diff --git a/parser/query_types.h b/parser/query_types.h index adc15f10c50..85a5eb03b00 100644 --- a/parser/query_types.h +++ b/parser/query_types.h @@ -72,11 +72,13 @@ typedef struct { enum { FROM_ARRAY, FROM_MERGE, - FROM_INNER_JOIN + FROM_INNER_JOIN, + FROM_MERGE_FUNCTION } from_clause_type; // in case of merge or join, it's guaranteed that the names array // will have two table names only and they aren't regex. table_name_array *names; + value *regex_value; /* regex merge */ } from_clause; typedef struct { diff --git a/parser/test_memory_leaks.sh b/parser/test_memory_leaks.sh index 58d6a2989b7..a3b87545726 100755 --- a/parser/test_memory_leaks.sh +++ b/parser/test_memory_leaks.sh @@ -14,6 +14,9 @@ int main(int argc, char **argv) { q = parse_query("select * from foo where time < -1s"); close_queries(&q); + q = parse_query("select * from merge(/.*/) where time < -1s"); + close_queries(&q); + // test partial regex q = parse_query("list series /"); close_queries(&q); diff --git a/protocol/protocol.proto b/protocol/protocol.proto index 5ea92c19ec1..8c8de9d94ce 100644 --- a/protocol/protocol.proto +++ b/protocol/protocol.proto @@ -19,6 +19,7 @@ message Series { required string name = 2; repeated string fields = 3; repeated uint64 fieldIds = 4; + optional uint32 shard_id = 5; } message QueryResponseChunk {