Skip to content

Commit

Permalink
Correctly merge rows for identical series
Browse files Browse the repository at this point in the history
If no chunking was requested by the user, the co-ordinating node buffers all
results in RAM before emitting a single result. However buffering was not
merging results for rows which had data for the same series. This change fixes this.

Fixes issue #3242.
  • Loading branch information
otoolep committed Aug 17, 2015
1 parent 6bdfa47 commit 487c336
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 0 deletions.
51 changes: 51 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3007,6 +3007,57 @@ func TestServer_Query_Fill(t *testing.T) {
}
}

func TestServer_Query_Chunk(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

writes := make([]string, 10001) // 10,000 is the default chunking size, even when no chunking requested.
expectedValues := make([]string, len(writes))
for i := 0; i < len(writes); i++ {
writes[i] = fmt.Sprintf(`cpu value=%d %d`, i, time.Unix(0, int64(i)).UnixNano())
expectedValues[i] = fmt.Sprintf(`["%s",%d]`, time.Unix(0, int64(i)).UTC().Format(time.RFC3339Nano), i)
}
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[%s]}]}]}`, strings.Join(expectedValues, ","))

test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")

test.addQueries([]*Query{
&Query{
name: "SELECT all values, no chunking",
command: `SELECT value FROM cpu`,
exp: expected,
params: url.Values{"db": []string{"db0"}},
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}

}

func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
Expand Down
5 changes: 5 additions & 0 deletions influxql/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type Row struct {
Err error `json:"err,omitempty"`
}

// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}

// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := fnv.New64a()
Expand Down
15 changes: 15 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,21 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
resp.Results = append(resp.Results, r)
} else if resp.Results[l-1].StatementID == r.StatementID {
cr := resp.Results[l-1]
lastSeries := cr.Series[len(cr.Series)-1]
rowsMerged := 0

for _, row := range r.Series {
if !lastSeries.SameSeries(row) {
// Next row is for a different series than last.
break
}
// Values are for the same series, so append them.
lastSeries.Values = append(lastSeries.Values, row.Values...)
rowsMerged++
}

// Append remaining rows as new rows.
r.Series = r.Series[rowsMerged:]
cr.Series = append(cr.Series, r.Series...)
} else {
resp.Results = append(resp.Results, r)
Expand Down

0 comments on commit 487c336

Please sign in to comment.