Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct marshalling of remotely-mapped data #4191

Merged
merged 4 commits into from
Sep 22, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
- [#4111](https://github.com/influxdb/influxdb/pull/4111): Update pre-commit hook for go vet composites
- [#4136](https://github.com/influxdb/influxdb/pull/4136): Return an error-on-write if target retention policy does not exist. Thanks for the report @ymettier
- [#4124](https://github.com/influxdb/influxdb/issues/4124): Missing defer/recover/panic idiom in HTTPD service
- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to _internal
- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to internal database.
- [#4118](https://github.com/influxdb/influxdb/issues/4118): Return consistent, correct result for SHOW MEASUREMENTS with multiple AND conditions
- [#4191](https://github.com/influxdb/influxdb/pull/4191): Correctly marshal remote mapper responses. Fixes [#4170](https://github.com/influxdb/influxdb/issues/4170)

## v0.9.4 [2015-09-14]

Expand Down
166 changes: 148 additions & 18 deletions tsdb/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,47 @@ type MapperValue struct {
Tags map[string]string `json:"tags,omitempty"` // Meta tags for results
}

// MapperValueAsJSON is the JSON-encoded representation of MapperValue. Because MapperValue is
// a complex type, custom JSON encoding is required so that none of the types contained within
// a MapperValue are "lost", and so the data are encoded as byte slices where necessary.
type MapperValueAsJSON struct {
Time int64 `json:"time,omitempty"`
RawData []byte `json:"rdata,omitempty"`
AggData [][]byte `json:"adata,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
}

// MarshalJSON returns the JSON-encoded representation of a MapperValue.
func (mv *MapperValue) MarshalJSON() ([]byte, error) {
o := &MapperValueAsJSON{
Time: mv.Time,
AggData: make([][]byte, 0),
Tags: mv.Tags,
}

o.Time = mv.Time
o.Tags = mv.Tags
if values, ok := mv.Value.([]interface{}); ok {
// Value contain a slice of more values. This happens only with
// aggregate output.
for _, v := range values {
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
o.AggData = append(o.AggData, b)
}
} else {
// If must be raw output, so just marshal the single value.
b, err := json.Marshal(mv.Value)
if err != nil {
return nil, err
}
o.RawData = b
}
return json.Marshal(o)
}

type MapperValues []*MapperValue

func (a MapperValues) Len() int { return len(a) }
Expand All @@ -36,6 +77,31 @@ type MapperOutput struct {
cursorKey string // Tagset-based key for the source cursor. Cached for performance reasons.
}

// MapperOutputAsJSON is the JSON-encoded representation of MapperValue. The query data is represented
// as a raw JSON message, so decode can be delayed can performed in a customer manner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: This comment doesn't read right.

type MapperOutputAsJSON struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Fields []string `json:"fields,omitempty"` // Field names of returned data.
Values json.RawMessage `json:"values,omitempty"`
}

// MarshalJSON returns the JSON-encoded representation of a MapperOutput.
func (mo *MapperOutput) MarshalJSON() ([]byte, error) {
o := &MapperOutputAsJSON{
Name: mo.Name,
Tags: mo.Tags,
Fields: mo.Fields,
}
data, err := json.Marshal(mo.Values)
if err != nil {
return nil, err
}
o.Values = data

return json.Marshal(o)
}

func (mo *MapperOutput) key() string {
return mo.cursorKey
}
Expand All @@ -59,12 +125,13 @@ type SelectMapper struct {

// The following attributes are only used when mappers are for aggregate queries.

queryTMinWindow int64 // Minimum time of the query floored to start of interval.
intervalSize int64 // Size of each interval.
numIntervals int // Maximum number of intervals to return.
currInterval int // Current interval for which data is being fetched.
mapFuncs []mapFunc // The mapping functions.
fieldNames []string // the field name being read for mapping.
queryTMinWindow int64 // Minimum time of the query floored to start of interval.
intervalSize int64 // Size of each interval.
numIntervals int // Maximum number of intervals to return.
currInterval int // Current interval for which data is being fetched.
mapFuncs []mapFunc // The mapping functions.
mapUnmarshallers []unmarshalFunc // Mapping-specific unmarshal functions.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new slice added to the SelectMapper type.

fieldNames []string // the field name being read for mapping.
}

// NewSelectMapper returns a mapper for the given shard, which will return data for the SELECT statement.
Expand Down Expand Up @@ -93,8 +160,18 @@ func (lm *SelectMapper) timeDirection() Direction {
return Forward
}

// Open opens the local mapper.
// Open opens the Mapper for a SELECT Statement
func (lm *SelectMapper) Open() error {
// If in aggregate mode, initialize the map-and-reduce functions. Both local and
// remote mappers need this.
if !lm.rawMode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved up here because it now needs to be done before the remote mapper is opened.

if err := lm.initializeMapFunctions(); err != nil {
return err
}
}

// If this mapper is actually responsible for accessing remote shard, initialize
// that and return.
if lm.remote != nil {
return lm.remote.Open()
}
Expand Down Expand Up @@ -128,10 +205,6 @@ func (lm *SelectMapper) Open() error {
lm.queryTMin, lm.queryTMax = influxql.TimeRangeAsEpochNano(lm.selectStmt.Condition)

if !lm.rawMode {
if err := lm.initializeMapFunctions(); err != nil {
return err
}

// For GROUP BY time queries, limit the number of data points returned by the limit and offset
d, err := lm.selectStmt.GroupByInterval()
if err != nil {
Expand Down Expand Up @@ -318,13 +391,60 @@ func (lm *SelectMapper) NextChunk() (interface{}, error) {
return nil, nil
}

mo := &MapperOutput{}
if err := json.Unmarshal(b.([]byte), mo); err != nil {
moj := &MapperOutputAsJSON{}
if err := json.Unmarshal(b.([]byte), moj); err != nil {
return nil, err
} else if len(mo.Values) == 0 {
// Mapper on other node sent 0 values so it's done.
return nil, nil
}
mvj := []*MapperValueAsJSON{}
if err := json.Unmarshal(moj.Values, &mvj); err != nil {
return nil, err
}

// Prep the non-JSON version of Mapper output.
mo := &MapperOutput{
Name: moj.Name,
Tags: moj.Tags,
Fields: moj.Fields,
}

if len(mvj) == 1 && len(mvj[0].AggData) > 0 {
// The MapperValue is carrying aggregate data, so run it through the
// custom unmarshallers for the map functions through which the data
// was mapped.
aggValues := []interface{}{}
for i, b := range mvj[0].AggData {
v, err := lm.mapUnmarshallers[i](b)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key fix -- unmarshal the aggregate data coming across the wire into the correct type.

if err != nil {
return nil, err
}
aggValues = append(aggValues, v)
}
mv := &MapperValue{
Value: aggValues,
Tags: mvj[0].Tags,
}
mo.Values = []*MapperValue{mv}
} else {
// Must be raw data instead.
for _, v := range mvj {
var rawValue interface{}
err := json.Unmarshal(v.RawData, &rawValue)
if err != nil {
return nil, err
}
mv := &MapperValue{
Time: v.Time,
Value: rawValue,
Tags: v.Tags,
}
if mo.Values == nil {
mo.Values = []*MapperValue{mv}
} else {
mo.Values = append(mo.Values, mv)
}
}
}

return mo, nil
}

Expand Down Expand Up @@ -495,18 +615,28 @@ func (lm *SelectMapper) nextInterval() (start, end int64) {
}

// initializeMapFunctions initialize the mapping functions for the mapper. This only applies
// to aggregate queries.
// to aggregate queries
func (lm *SelectMapper) initializeMapFunctions() error {
var err error
// Set up each mapping function for this statement.
aggregates := lm.selectStmt.FunctionCalls()
selectStmt, ok := lm.stmt.(*influxql.SelectStatement)
if !ok {
return fmt.Errorf("No map functions for non-SELECT statement: %s", lm.stmt.String())
}

aggregates := selectStmt.FunctionCalls()
lm.mapFuncs = make([]mapFunc, len(aggregates))
lm.mapUnmarshallers = make([]unmarshalFunc, len(aggregates))
lm.fieldNames = make([]string, len(lm.mapFuncs))
for i, c := range aggregates {
lm.mapFuncs[i], err = initializeMapFunc(c)
if err != nil {
return err
}
lm.mapUnmarshallers[i], err = initializeUnmarshaller(c)
if err != nil {
return err
}

// Check for calls like `derivative(lmean(value), 1d)`
var nested *influxql.Call = c
Expand Down
54 changes: 46 additions & 8 deletions tsdb/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,7 @@ func nextRawChunkAsJson(t *testing.T, mapper tsdb.Mapper) string {
if err != nil {
t.Fatalf("failed to get next chunk from mapper: %s", err.Error())
}
b, err := json.Marshal(r)
if err != nil {
t.Fatalf("failed to marshal chunk as JSON: %s", err.Error())
}
return string(b)
return mustMarshalMapperOutput(r)
}

func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.SelectMapper {
Expand All @@ -569,11 +565,53 @@ func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.Sele
func aggIntervalAsJson(t *testing.T, mapper *tsdb.SelectMapper) string {
r, err := mapper.NextChunk()
if err != nil {
t.Fatalf("failed to get chunk from aggregate mapper: %s", err.Error())
t.Fatalf("failed to get next chunk from aggregate mapper: %s", err.Error())
}
return mustMarshalMapperOutput(r)
}

// mustMarshalMapperOutput manually converts a mapper output to JSON, to avoid the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since production code overrides JSON-encoding, the test code now needs an explicit simpler version.

// built-in encoding.
func mustMarshalMapperOutput(r interface{}) string {
if r == nil {
b, err := json.Marshal(nil)
if err != nil {
panic("failed to marshal nil chunk as JSON")
}
return string(b)
}
mo := r.(*tsdb.MapperOutput)

type v struct {
Time int64 `json:"time,omitempty"`
Value interface{} `json:"value,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
}

values := make([]*v, len(mo.Values))
for i, value := range mo.Values {
values[i] = &v{
Time: value.Time,
Value: value.Value,
Tags: value.Tags,
}
}
b, err := json.Marshal(r)

var o struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Fields []string `json:"fields,omitempty"`
Values []*v `json:"values,omitempty"`
}

o.Name = mo.Name
o.Tags = mo.Tags
o.Fields = mo.Fields
o.Values = values

b, err := json.Marshal(o)
if err != nil {
t.Fatalf("failed to marshal chunk as JSON: %s", err.Error())
panic("failed to marshal MapperOutput")
}
return string(b)
}