diff --git a/CHANGELOG.md b/CHANGELOG.md index 53b60097f2a..bc6f3b2229e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,9 @@ - [#4111](https://github.com/influxdb/influxdb/pull/4111): Update pre-commit hook for go vet composites - [#4136](https://github.com/influxdb/influxdb/pull/4136): Return an error-on-write if target retention policy does not exist. Thanks for the report @ymettier - [#4124](https://github.com/influxdb/influxdb/issues/4124): Missing defer/recover/panic idiom in HTTPD service -- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to _internal +- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to internal database. - [#4118](https://github.com/influxdb/influxdb/issues/4118): Return consistent, correct result for SHOW MEASUREMENTS with multiple AND conditions +- [#4191](https://github.com/influxdb/influxdb/pull/4191): Correctly marshal remote mapper responses. Fixes [#4170](https://github.com/influxdb/influxdb/issues/4170) ## v0.9.4 [2015-09-14] diff --git a/tsdb/mapper.go b/tsdb/mapper.go index d6cfc9a1eae..11e6f52bc33 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -22,6 +22,47 @@ type MapperValue struct { Tags map[string]string `json:"tags,omitempty"` // Meta tags for results } +// MapperValueAsJSON is the JSON-encoded representation of MapperValue. Because MapperValue is +// a complex type, custom JSON encoding is required so that none of the types contained within +// a MapperValue are "lost", and so the data are encoded as byte slices where necessary. +type MapperValueAsJSON struct { + Time int64 `json:"time,omitempty"` + RawData []byte `json:"rdata,omitempty"` + AggData [][]byte `json:"adata,omitempty"` + Tags map[string]string `json:"tags,omitempty"` +} + +// MarshalJSON returns the JSON-encoded representation of a MapperValue. +func (mv *MapperValue) MarshalJSON() ([]byte, error) { + o := &MapperValueAsJSON{ + Time: mv.Time, + AggData: make([][]byte, 0), + Tags: mv.Tags, + } + + o.Time = mv.Time + o.Tags = mv.Tags + if values, ok := mv.Value.([]interface{}); ok { + // Value contain a slice of more values. This happens only with + // aggregate output. + for _, v := range values { + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + o.AggData = append(o.AggData, b) + } + } else { + // If must be raw output, so just marshal the single value. + b, err := json.Marshal(mv.Value) + if err != nil { + return nil, err + } + o.RawData = b + } + return json.Marshal(o) +} + type MapperValues []*MapperValue func (a MapperValues) Len() int { return len(a) } @@ -36,6 +77,31 @@ type MapperOutput struct { cursorKey string // Tagset-based key for the source cursor. Cached for performance reasons. } +// MapperOutputAsJSON is the JSON-encoded representation of MapperOutput. The query data is represented +// as a raw JSON message, so decode is delayed, and can proceed in a custom manner. +type MapperOutputAsJSON struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Fields []string `json:"fields,omitempty"` // Field names of returned data. + Values json.RawMessage `json:"values,omitempty"` +} + +// MarshalJSON returns the JSON-encoded representation of a MapperOutput. +func (mo *MapperOutput) MarshalJSON() ([]byte, error) { + o := &MapperOutputAsJSON{ + Name: mo.Name, + Tags: mo.Tags, + Fields: mo.Fields, + } + data, err := json.Marshal(mo.Values) + if err != nil { + return nil, err + } + o.Values = data + + return json.Marshal(o) +} + func (mo *MapperOutput) key() string { return mo.cursorKey } @@ -59,12 +125,13 @@ type SelectMapper struct { // The following attributes are only used when mappers are for aggregate queries. - queryTMinWindow int64 // Minimum time of the query floored to start of interval. - intervalSize int64 // Size of each interval. - numIntervals int // Maximum number of intervals to return. - currInterval int // Current interval for which data is being fetched. - mapFuncs []mapFunc // The mapping functions. - fieldNames []string // the field name being read for mapping. + queryTMinWindow int64 // Minimum time of the query floored to start of interval. + intervalSize int64 // Size of each interval. + numIntervals int // Maximum number of intervals to return. + currInterval int // Current interval for which data is being fetched. + mapFuncs []mapFunc // The mapping functions. + mapUnmarshallers []unmarshalFunc // Mapping-specific unmarshal functions. + fieldNames []string // the field name being read for mapping. } // NewSelectMapper returns a mapper for the given shard, which will return data for the SELECT statement. @@ -93,8 +160,18 @@ func (lm *SelectMapper) timeDirection() Direction { return Forward } -// Open opens the local mapper. +// Open opens the Mapper for a SELECT Statement func (lm *SelectMapper) Open() error { + // If in aggregate mode, initialize the map-and-reduce functions. Both local and + // remote mappers need this. + if !lm.rawMode { + if err := lm.initializeMapFunctions(); err != nil { + return err + } + } + + // If this mapper is actually responsible for accessing remote shard, initialize + // that and return. if lm.remote != nil { return lm.remote.Open() } @@ -128,10 +205,6 @@ func (lm *SelectMapper) Open() error { lm.queryTMin, lm.queryTMax = influxql.TimeRangeAsEpochNano(lm.selectStmt.Condition) if !lm.rawMode { - if err := lm.initializeMapFunctions(); err != nil { - return err - } - // For GROUP BY time queries, limit the number of data points returned by the limit and offset d, err := lm.selectStmt.GroupByInterval() if err != nil { @@ -318,13 +391,60 @@ func (lm *SelectMapper) NextChunk() (interface{}, error) { return nil, nil } - mo := &MapperOutput{} - if err := json.Unmarshal(b.([]byte), mo); err != nil { + moj := &MapperOutputAsJSON{} + if err := json.Unmarshal(b.([]byte), moj); err != nil { return nil, err - } else if len(mo.Values) == 0 { - // Mapper on other node sent 0 values so it's done. - return nil, nil } + mvj := []*MapperValueAsJSON{} + if err := json.Unmarshal(moj.Values, &mvj); err != nil { + return nil, err + } + + // Prep the non-JSON version of Mapper output. + mo := &MapperOutput{ + Name: moj.Name, + Tags: moj.Tags, + Fields: moj.Fields, + } + + if len(mvj) == 1 && len(mvj[0].AggData) > 0 { + // The MapperValue is carrying aggregate data, so run it through the + // custom unmarshallers for the map functions through which the data + // was mapped. + aggValues := []interface{}{} + for i, b := range mvj[0].AggData { + v, err := lm.mapUnmarshallers[i](b) + if err != nil { + return nil, err + } + aggValues = append(aggValues, v) + } + mv := &MapperValue{ + Value: aggValues, + Tags: mvj[0].Tags, + } + mo.Values = []*MapperValue{mv} + } else { + // Must be raw data instead. + for _, v := range mvj { + var rawValue interface{} + err := json.Unmarshal(v.RawData, &rawValue) + if err != nil { + return nil, err + } + mv := &MapperValue{ + Time: v.Time, + Value: rawValue, + Tags: v.Tags, + } + if mo.Values == nil { + mo.Values = []*MapperValue{mv} + } else { + mo.Values = append(mo.Values, mv) + } + } + } + return mo, nil } @@ -495,18 +615,28 @@ func (lm *SelectMapper) nextInterval() (start, end int64) { } // initializeMapFunctions initialize the mapping functions for the mapper. This only applies -// to aggregate queries. +// to aggregate queries func (lm *SelectMapper) initializeMapFunctions() error { var err error // Set up each mapping function for this statement. - aggregates := lm.selectStmt.FunctionCalls() + selectStmt, ok := lm.stmt.(*influxql.SelectStatement) + if !ok { + return fmt.Errorf("No map functions for non-SELECT statement: %s", lm.stmt.String()) + } + + aggregates := selectStmt.FunctionCalls() lm.mapFuncs = make([]mapFunc, len(aggregates)) + lm.mapUnmarshallers = make([]unmarshalFunc, len(aggregates)) lm.fieldNames = make([]string, len(lm.mapFuncs)) for i, c := range aggregates { lm.mapFuncs[i], err = initializeMapFunc(c) if err != nil { return err } + lm.mapUnmarshallers[i], err = initializeUnmarshaller(c) + if err != nil { + return err + } // Check for calls like `derivative(lmean(value), 1d)` var nested *influxql.Call = c diff --git a/tsdb/mapper_test.go b/tsdb/mapper_test.go index a4e1259ae93..d3c6bc2297d 100644 --- a/tsdb/mapper_test.go +++ b/tsdb/mapper_test.go @@ -550,11 +550,7 @@ func nextRawChunkAsJson(t *testing.T, mapper tsdb.Mapper) string { if err != nil { t.Fatalf("failed to get next chunk from mapper: %s", err.Error()) } - b, err := json.Marshal(r) - if err != nil { - t.Fatalf("failed to marshal chunk as JSON: %s", err.Error()) - } - return string(b) + return mustMarshalMapperOutput(r) } func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.SelectMapper { @@ -569,11 +565,53 @@ func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.Sele func aggIntervalAsJson(t *testing.T, mapper *tsdb.SelectMapper) string { r, err := mapper.NextChunk() if err != nil { - t.Fatalf("failed to get chunk from aggregate mapper: %s", err.Error()) + t.Fatalf("failed to get next chunk from aggregate mapper: %s", err.Error()) + } + return mustMarshalMapperOutput(r) +} + +// mustMarshalMapperOutput manually converts a mapper output to JSON, to avoid the +// built-in encoding. +func mustMarshalMapperOutput(r interface{}) string { + if r == nil { + b, err := json.Marshal(nil) + if err != nil { + panic("failed to marshal nil chunk as JSON") + } + return string(b) + } + mo := r.(*tsdb.MapperOutput) + + type v struct { + Time int64 `json:"time,omitempty"` + Value interface{} `json:"value,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + } + + values := make([]*v, len(mo.Values)) + for i, value := range mo.Values { + values[i] = &v{ + Time: value.Time, + Value: value.Value, + Tags: value.Tags, + } } - b, err := json.Marshal(r) + + var o struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Fields []string `json:"fields,omitempty"` + Values []*v `json:"values,omitempty"` + } + + o.Name = mo.Name + o.Tags = mo.Tags + o.Fields = mo.Fields + o.Values = values + + b, err := json.Marshal(o) if err != nil { - t.Fatalf("failed to marshal chunk as JSON: %s", err.Error()) + panic("failed to marshal MapperOutput") } return string(b) }