From 487c3365713c512365690bd53b0437da5057e287 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 17 Aug 2015 11:11:55 -0700 Subject: [PATCH 1/2] Correctly merge rows for identical series If no chunking was requested by the user, the co-ordinating node buffers all results in RAM before emitting a single result. However buffering was not merging results for rows which had data for the same series. This change fixes this. Fixes issue #3242. --- cmd/influxd/run/server_test.go | 51 ++++++++++++++++++++++++++++++++++ influxql/result.go | 5 ++++ services/httpd/handler.go | 15 ++++++++++ 3 files changed, 71 insertions(+) diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index cc15eb6f841..86d8b36a1a6 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -3007,6 +3007,57 @@ func TestServer_Query_Fill(t *testing.T) { } } +func TestServer_Query_Chunk(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig(), "") + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + writes := make([]string, 10001) // 10,000 is the default chunking size, even when no chunking requested. + expectedValues := make([]string, len(writes)) + for i := 0; i < len(writes); i++ { + writes[i] = fmt.Sprintf(`cpu value=%d %d`, i, time.Unix(0, int64(i)).UnixNano()) + expectedValues[i] = fmt.Sprintf(`["%s",%d]`, time.Unix(0, int64(i)).UTC().Format(time.RFC3339Nano), i) + } + expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[%s]}]}]}`, strings.Join(expectedValues, ",")) + + test := NewTest("db0", "rp0") + test.write = strings.Join(writes, "\n") + + test.addQueries([]*Query{ + &Query{ + name: "SELECT all values, no chunking", + command: `SELECT value FROM cpu`, + exp: expected, + params: url.Values{"db": []string{"db0"}}, + }, + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } + +} + func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) { t.Parallel() s := OpenServer(NewConfig(), "") diff --git a/influxql/result.go b/influxql/result.go index a74ed714a13..a9a8cd561c2 100644 --- a/influxql/result.go +++ b/influxql/result.go @@ -31,6 +31,11 @@ type Row struct { Err error `json:"err,omitempty"` } +// SameSeries returns true if r contains values for the same series as o. +func (r *Row) SameSeries(o *Row) bool { + return r.tagsHash() == o.tagsHash() && r.Name == o.Name +} + // tagsHash returns a hash of tag key/value pairs. func (r *Row) tagsHash() uint64 { h := fnv.New64a() diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 28794063f8b..699a78ad7ae 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -286,6 +286,21 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. resp.Results = append(resp.Results, r) } else if resp.Results[l-1].StatementID == r.StatementID { cr := resp.Results[l-1] + lastSeries := cr.Series[len(cr.Series)-1] + rowsMerged := 0 + + for _, row := range r.Series { + if !lastSeries.SameSeries(row) { + // Next row is for a different series than last. + break + } + // Values are for the same series, so append them. + lastSeries.Values = append(lastSeries.Values, row.Values...) + rowsMerged++ + } + + // Append remaining rows as new rows. + r.Series = r.Series[rowsMerged:] cr.Series = append(cr.Series, r.Series...) } else { resp.Results = append(resp.Results, r) From 836311c2b12861b17b584c4e5d77a2b9c4580a94 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 17 Aug 2015 13:41:07 -0700 Subject: [PATCH 2/2] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2de2f62541e..6d3041f2d6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ There are breaking changes in this release. Please see the *Features* section be - [#3673](https://github.com/influxdb/influxdb/pull/3673): Improve query performance by removing unnecessary tagset sorting. - [#3676](https://github.com/influxdb/influxdb/pull/3676): Improve query performance by memomizing mapper output keys. - [#3687](https://github.com/influxdb/influxdb/issues/3687): Fix panic: runtime error: makeslice: len out of range in hinted handoff +- [#3697](https://github.com/influxdb/influxdb/issues/3697): Correctly merge non-chunked results for same series. Fix issue #3242. ## v0.9.2 [2015-07-24]