-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Correct marshalling of remotely-mapped data #4191
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,47 @@ type MapperValue struct { | |
Tags map[string]string `json:"tags,omitempty"` // Meta tags for results | ||
} | ||
|
||
// MapperValueAsJSON is the JSON-encoded representation of MapperValue. Because MapperValue is | ||
// a complex type, custom JSON encoding is required so that none of the types contained within | ||
// a MapperValue are "lost", and so the data are encoded as byte slices where necessary. | ||
type MapperValueAsJSON struct { | ||
Time int64 `json:"time,omitempty"` | ||
RawData []byte `json:"rdata,omitempty"` | ||
AggData [][]byte `json:"adata,omitempty"` | ||
Tags map[string]string `json:"tags,omitempty"` | ||
} | ||
|
||
// MarshalJSON returns the JSON-encoded representation of a MapperValue. | ||
func (mv *MapperValue) MarshalJSON() ([]byte, error) { | ||
o := &MapperValueAsJSON{ | ||
Time: mv.Time, | ||
AggData: make([][]byte, 0), | ||
Tags: mv.Tags, | ||
} | ||
|
||
o.Time = mv.Time | ||
o.Tags = mv.Tags | ||
if values, ok := mv.Value.([]interface{}); ok { | ||
// Value contain a slice of more values. This happens only with | ||
// aggregate output. | ||
for _, v := range values { | ||
b, err := json.Marshal(v) | ||
if err != nil { | ||
return nil, err | ||
} | ||
o.AggData = append(o.AggData, b) | ||
} | ||
} else { | ||
// If must be raw output, so just marshal the single value. | ||
b, err := json.Marshal(mv.Value) | ||
if err != nil { | ||
return nil, err | ||
} | ||
o.RawData = b | ||
} | ||
return json.Marshal(o) | ||
} | ||
|
||
type MapperValues []*MapperValue | ||
|
||
func (a MapperValues) Len() int { return len(a) } | ||
|
@@ -36,6 +77,31 @@ type MapperOutput struct { | |
cursorKey string // Tagset-based key for the source cursor. Cached for performance reasons. | ||
} | ||
|
||
// MapperOutputAsJSON is the JSON-encoded representation of MapperOutput. The query data is represented | ||
// as a raw JSON message, so decode is delayed, and can proceed in a custom manner. | ||
type MapperOutputAsJSON struct { | ||
Name string `json:"name,omitempty"` | ||
Tags map[string]string `json:"tags,omitempty"` | ||
Fields []string `json:"fields,omitempty"` // Field names of returned data. | ||
Values json.RawMessage `json:"values,omitempty"` | ||
} | ||
|
||
// MarshalJSON returns the JSON-encoded representation of a MapperOutput. | ||
func (mo *MapperOutput) MarshalJSON() ([]byte, error) { | ||
o := &MapperOutputAsJSON{ | ||
Name: mo.Name, | ||
Tags: mo.Tags, | ||
Fields: mo.Fields, | ||
} | ||
data, err := json.Marshal(mo.Values) | ||
if err != nil { | ||
return nil, err | ||
} | ||
o.Values = data | ||
|
||
return json.Marshal(o) | ||
} | ||
|
||
func (mo *MapperOutput) key() string { | ||
return mo.cursorKey | ||
} | ||
|
@@ -59,12 +125,13 @@ type SelectMapper struct { | |
|
||
// The following attributes are only used when mappers are for aggregate queries. | ||
|
||
queryTMinWindow int64 // Minimum time of the query floored to start of interval. | ||
intervalSize int64 // Size of each interval. | ||
numIntervals int // Maximum number of intervals to return. | ||
currInterval int // Current interval for which data is being fetched. | ||
mapFuncs []mapFunc // The mapping functions. | ||
fieldNames []string // the field name being read for mapping. | ||
queryTMinWindow int64 // Minimum time of the query floored to start of interval. | ||
intervalSize int64 // Size of each interval. | ||
numIntervals int // Maximum number of intervals to return. | ||
currInterval int // Current interval for which data is being fetched. | ||
mapFuncs []mapFunc // The mapping functions. | ||
mapUnmarshallers []unmarshalFunc // Mapping-specific unmarshal functions. | ||
fieldNames []string // the field name being read for mapping. | ||
} | ||
|
||
// NewSelectMapper returns a mapper for the given shard, which will return data for the SELECT statement. | ||
|
@@ -93,8 +160,18 @@ func (lm *SelectMapper) timeDirection() Direction { | |
return Forward | ||
} | ||
|
||
// Open opens the local mapper. | ||
// Open opens the Mapper for a SELECT Statement | ||
func (lm *SelectMapper) Open() error { | ||
// If in aggregate mode, initialize the map-and-reduce functions. Both local and | ||
// remote mappers need this. | ||
if !lm.rawMode { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was moved up here because it now needs to be done before the remote mapper is opened. |
||
if err := lm.initializeMapFunctions(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// If this mapper is actually responsible for accessing remote shard, initialize | ||
// that and return. | ||
if lm.remote != nil { | ||
return lm.remote.Open() | ||
} | ||
|
@@ -128,10 +205,6 @@ func (lm *SelectMapper) Open() error { | |
lm.queryTMin, lm.queryTMax = influxql.TimeRangeAsEpochNano(lm.selectStmt.Condition) | ||
|
||
if !lm.rawMode { | ||
if err := lm.initializeMapFunctions(); err != nil { | ||
return err | ||
} | ||
|
||
// For GROUP BY time queries, limit the number of data points returned by the limit and offset | ||
d, err := lm.selectStmt.GroupByInterval() | ||
if err != nil { | ||
|
@@ -318,13 +391,60 @@ func (lm *SelectMapper) NextChunk() (interface{}, error) { | |
return nil, nil | ||
} | ||
|
||
mo := &MapperOutput{} | ||
if err := json.Unmarshal(b.([]byte), mo); err != nil { | ||
moj := &MapperOutputAsJSON{} | ||
if err := json.Unmarshal(b.([]byte), moj); err != nil { | ||
return nil, err | ||
} else if len(mo.Values) == 0 { | ||
// Mapper on other node sent 0 values so it's done. | ||
return nil, nil | ||
} | ||
mvj := []*MapperValueAsJSON{} | ||
if err := json.Unmarshal(moj.Values, &mvj); err != nil { | ||
return nil, err | ||
} | ||
|
||
// Prep the non-JSON version of Mapper output. | ||
mo := &MapperOutput{ | ||
Name: moj.Name, | ||
Tags: moj.Tags, | ||
Fields: moj.Fields, | ||
} | ||
|
||
if len(mvj) == 1 && len(mvj[0].AggData) > 0 { | ||
// The MapperValue is carrying aggregate data, so run it through the | ||
// custom unmarshallers for the map functions through which the data | ||
// was mapped. | ||
aggValues := []interface{}{} | ||
for i, b := range mvj[0].AggData { | ||
v, err := lm.mapUnmarshallers[i](b) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The key fix -- unmarshal the aggregate data coming across the wire into the correct type. |
||
if err != nil { | ||
return nil, err | ||
} | ||
aggValues = append(aggValues, v) | ||
} | ||
mv := &MapperValue{ | ||
Value: aggValues, | ||
Tags: mvj[0].Tags, | ||
} | ||
mo.Values = []*MapperValue{mv} | ||
} else { | ||
// Must be raw data instead. | ||
for _, v := range mvj { | ||
var rawValue interface{} | ||
err := json.Unmarshal(v.RawData, &rawValue) | ||
if err != nil { | ||
return nil, err | ||
} | ||
mv := &MapperValue{ | ||
Time: v.Time, | ||
Value: rawValue, | ||
Tags: v.Tags, | ||
} | ||
if mo.Values == nil { | ||
mo.Values = []*MapperValue{mv} | ||
} else { | ||
mo.Values = append(mo.Values, mv) | ||
} | ||
} | ||
} | ||
|
||
return mo, nil | ||
} | ||
|
||
|
@@ -495,18 +615,28 @@ func (lm *SelectMapper) nextInterval() (start, end int64) { | |
} | ||
|
||
// initializeMapFunctions initialize the mapping functions for the mapper. This only applies | ||
// to aggregate queries. | ||
// to aggregate queries | ||
func (lm *SelectMapper) initializeMapFunctions() error { | ||
var err error | ||
// Set up each mapping function for this statement. | ||
aggregates := lm.selectStmt.FunctionCalls() | ||
selectStmt, ok := lm.stmt.(*influxql.SelectStatement) | ||
if !ok { | ||
return fmt.Errorf("No map functions for non-SELECT statement: %s", lm.stmt.String()) | ||
} | ||
|
||
aggregates := selectStmt.FunctionCalls() | ||
lm.mapFuncs = make([]mapFunc, len(aggregates)) | ||
lm.mapUnmarshallers = make([]unmarshalFunc, len(aggregates)) | ||
lm.fieldNames = make([]string, len(lm.mapFuncs)) | ||
for i, c := range aggregates { | ||
lm.mapFuncs[i], err = initializeMapFunc(c) | ||
if err != nil { | ||
return err | ||
} | ||
lm.mapUnmarshallers[i], err = initializeUnmarshaller(c) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Check for calls like `derivative(lmean(value), 1d)` | ||
var nested *influxql.Call = c | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -550,11 +550,7 @@ func nextRawChunkAsJson(t *testing.T, mapper tsdb.Mapper) string { | |
if err != nil { | ||
t.Fatalf("failed to get next chunk from mapper: %s", err.Error()) | ||
} | ||
b, err := json.Marshal(r) | ||
if err != nil { | ||
t.Fatalf("failed to marshal chunk as JSON: %s", err.Error()) | ||
} | ||
return string(b) | ||
return mustMarshalMapperOutput(r) | ||
} | ||
|
||
func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.SelectMapper { | ||
|
@@ -569,11 +565,53 @@ func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.Sele | |
func aggIntervalAsJson(t *testing.T, mapper *tsdb.SelectMapper) string { | ||
r, err := mapper.NextChunk() | ||
if err != nil { | ||
t.Fatalf("failed to get chunk from aggregate mapper: %s", err.Error()) | ||
t.Fatalf("failed to get next chunk from aggregate mapper: %s", err.Error()) | ||
} | ||
return mustMarshalMapperOutput(r) | ||
} | ||
|
||
// mustMarshalMapperOutput manually converts a mapper output to JSON, to avoid the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since production code overrides JSON-encoding, the test code now needs an explicit simpler version. |
||
// built-in encoding. | ||
func mustMarshalMapperOutput(r interface{}) string { | ||
if r == nil { | ||
b, err := json.Marshal(nil) | ||
if err != nil { | ||
panic("failed to marshal nil chunk as JSON") | ||
} | ||
return string(b) | ||
} | ||
mo := r.(*tsdb.MapperOutput) | ||
|
||
type v struct { | ||
Time int64 `json:"time,omitempty"` | ||
Value interface{} `json:"value,omitempty"` | ||
Tags map[string]string `json:"tags,omitempty"` | ||
} | ||
|
||
values := make([]*v, len(mo.Values)) | ||
for i, value := range mo.Values { | ||
values[i] = &v{ | ||
Time: value.Time, | ||
Value: value.Value, | ||
Tags: value.Tags, | ||
} | ||
} | ||
b, err := json.Marshal(r) | ||
|
||
var o struct { | ||
Name string `json:"name,omitempty"` | ||
Tags map[string]string `json:"tags,omitempty"` | ||
Fields []string `json:"fields,omitempty"` | ||
Values []*v `json:"values,omitempty"` | ||
} | ||
|
||
o.Name = mo.Name | ||
o.Tags = mo.Tags | ||
o.Fields = mo.Fields | ||
o.Values = values | ||
|
||
b, err := json.Marshal(o) | ||
if err != nil { | ||
t.Fatalf("failed to marshal chunk as JSON: %s", err.Error()) | ||
panic("failed to marshal MapperOutput") | ||
} | ||
return string(b) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new slice added to the
SelectMapper
type.