From d9b32ea16011de3460d9892aceeb1804e985d2dc Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 31 Mar 2016 15:27:29 -0400 Subject: [PATCH 1/2] Ensure the query is always "killed" after it is finished If the http.CloseNotifier didn't go off for some reason (and it's not guaranteed to go off just because the HTTP connection is closed), the query wouldn't get correctly recycled when chunked output was requested. The query id in the query executor was also not being set correctly. This seems to have been an oversight when merging the point limit monitor. --- cluster/query_executor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cluster/query_executor.go b/cluster/query_executor.go index 7699bbb46b7..f47088d92a7 100644 --- a/cluster/query_executor.go +++ b/cluster/query_executor.go @@ -94,7 +94,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu var qid uint64 if e.QueryManager != nil { var err error - _, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{ + qid, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{ Query: query, Database: database, Timeout: e.QueryTimeout, @@ -105,6 +105,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu results <- &influxql.Result{Err: err} return } + + defer e.QueryManager.KillQuery(qid) } logger := e.logger() From 8752d1b1e3a5b098d425a2fd0263b753114d684c Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Wed, 30 Mar 2016 14:03:03 -0400 Subject: [PATCH 2/2] Support chunked queries in the Go InfluxDB client Modify the CLI to always use chunked queries. --- CHANGELOG.md | 1 + client/influxdb.go | 87 +++++++++++++++++++++++++++++++++++------ client/influxdb_test.go | 49 +++++++++++++++++++++++ cmd/influx/cli/cli.go | 14 ++++++- 4 files changed, 137 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31ca50b1c76..2f758eb7ff2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - [#5372](https://github.com/influxdata/influxdb/pull/5372): Faster shard loading - [#6148](https://github.com/influxdata/influxdb/pull/6148): Build script is now compatible with Python 3. Added ability to create detached signatures for packages. Build script now uses Python logging facility for messages. - [#6115](https://github.com/influxdata/influxdb/issues/6115): Support chunking query results mid-series. Limit non-chunked output. +- [#6166](https://github.com/influxdata/influxdb/pull/6166): Teach influxdb client how to use chunked queries and use in the CLI. ### Bugfixes diff --git a/client/influxdb.go b/client/influxdb.go index 2f62aacb803..343a4e09d35 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -32,6 +33,18 @@ const ( type Query struct { Command string Database string + + // Chunked tells the server to send back chunked responses. This places + // less load on the server by sending back chunks of the response rather + // than waiting for the entire response all at once. + Chunked bool + + // ChunkSize sets the maximum number of rows that will be returned per + // chunk. Chunks are either divided based on their series or if they hit + // the chunk size limit. + // + // Chunked must be set to true for this option to be used. + ChunkSize int } // ParseConnectionString will parse a string to create a valid connection URL @@ -157,6 +170,12 @@ func (c *Client) Query(q Query) (*Response, error) { values := u.Query() values.Set("q", q.Command) values.Set("db", q.Database) + if q.Chunked { + values.Set("chunked", "true") + if q.ChunkSize > 0 { + values.Set("chunk_size", strconv.Itoa(q.ChunkSize)) + } + } if c.precision != "" { values.Set("epoch", c.precision) } @@ -178,19 +197,38 @@ func (c *Client) Query(q Query) (*Response, error) { defer resp.Body.Close() var response Response - dec := json.NewDecoder(resp.Body) - dec.UseNumber() - decErr := dec.Decode(&response) + if q.Chunked { + cr := NewChunkedResponse(resp.Body) + for { + r, err := cr.NextResponse() + if err != nil { + // If we got an error while decoding the response, send that back. + return nil, err + } - // ignore this error if we got an invalid status code - if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { - decErr = nil - } - // If we got a valid decode error, send that back - if decErr != nil { - return nil, decErr + if r == nil { + break + } + + response.Results = append(response.Results, r.Results...) + if r.Err != nil { + response.Err = r.Err + break + } + } + } else { + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + if err := dec.Decode(&response); err != nil { + // Ignore EOF errors if we got an invalid status code. + if !(err == io.EOF && resp.StatusCode != http.StatusOK) { + return nil, err + } + } } - // If we don't have an error in our json response, and didn't get StatusOK, then send back an error + + // If we don't have an error in our json response, and didn't get StatusOK, + // then send back an error. if resp.StatusCode != http.StatusOK && response.Error() == nil { return &response, fmt.Errorf("received status code %d from server", resp.StatusCode) } @@ -437,7 +475,7 @@ func (r *Response) UnmarshalJSON(b []byte) error { // Error returns the first error from any statement. // Returns nil if no errors occurred on any statements. -func (r Response) Error() error { +func (r *Response) Error() error { if r.Err != nil { return r.Err } @@ -449,6 +487,31 @@ func (r Response) Error() error { return nil } +// ChunkedResponse represents a response from the server that +// uses chunking to stream the output. +type ChunkedResponse struct { + dec *json.Decoder +} + +// NewChunkedResponse reads a stream and produces responses from the stream. +func NewChunkedResponse(r io.Reader) *ChunkedResponse { + dec := json.NewDecoder(r) + dec.UseNumber() + return &ChunkedResponse{dec: dec} +} + +// NextResponse reads the next line of the stream and returns a response. +func (r *ChunkedResponse) NextResponse() (*Response, error) { + var response Response + if err := r.dec.Decode(&response); err != nil { + if err == io.EOF { + return nil, nil + } + return nil, err + } + return &response, nil +} + // Point defines the fields that will be written to the database // Measurement, Time, and Fields are required // Precision can be specified if the time is in epoch format (integer). diff --git a/client/influxdb_test.go b/client/influxdb_test.go index c82eb6dd0a8..2f064ca41a5 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -169,6 +169,30 @@ func TestClient_Query(t *testing.T) { } } +func TestClient_ChunkedQuery(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var data client.Response + w.WriteHeader(http.StatusOK) + enc := json.NewEncoder(w) + _ = enc.Encode(data) + _ = enc.Encode(data) + })) + defer ts.Close() + + u, _ := url.Parse(ts.URL) + config := client.Config{URL: *u} + c, err := client.NewClient(config) + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } + + query := client.Query{Chunked: true} + _, err = c.Query(query) + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } +} + func TestClient_BasicAuth(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { u, p, ok := r.BasicAuth() @@ -741,3 +765,28 @@ war3JNM1mGB3o2iAtuOJlFIKLpI1x+1e8pI= } } } + +func TestChunkedResponse(t *testing.T) { + s := `{"results":[{},{}]}{"results":[{}]}` + r := client.NewChunkedResponse(strings.NewReader(s)) + resp, err := r.NextResponse() + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } else if actual := len(resp.Results); actual != 2 { + t.Fatalf("unexpected number of results. expected %v, actual %v", 2, actual) + } + + resp, err = r.NextResponse() + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } else if actual := len(resp.Results); actual != 1 { + t.Fatalf("unexpected number of results. expected %v, actual %v", 1, actual) + } + + resp, err = r.NextResponse() + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } else if resp != nil { + t.Fatalf("unexpected response. expected %v, actual %v", nil, resp) + } +} diff --git a/cmd/influx/cli/cli.go b/cmd/influx/cli/cli.go index 53499a71273..ea4365234fe 100644 --- a/cmd/influx/cli/cli.go +++ b/cmd/influx/cli/cli.go @@ -56,6 +56,7 @@ type CommandLine struct { PPS int // Controls how many points per second the import will allow via throttling Path string Compressed bool + Chunked bool Quit chan struct{} IgnoreSignals bool // Ignore signals normally caught by this process (used primarily for testing) osSignals chan os.Signal @@ -518,9 +519,18 @@ func (c *CommandLine) Insert(stmt string) error { return nil } +// query creates a query struct to be used with the client. +func (c *CommandLine) query(query string, database string) client.Query { + return client.Query{ + Command: query, + Database: database, + Chunked: true, + } +} + // ExecuteQuery runs any query statement func (c *CommandLine) ExecuteQuery(query string) error { - response, err := c.Client.Query(client.Query{Command: query, Database: c.Database}) + response, err := c.Client.Query(c.query(query, c.Database)) if err != nil { fmt.Printf("ERR: %s\n", err) return err @@ -539,7 +549,7 @@ func (c *CommandLine) ExecuteQuery(query string) error { // DatabaseToken retrieves database token func (c *CommandLine) DatabaseToken() (string, error) { - response, err := c.Client.Query(client.Query{Command: "SHOW DIAGNOSTICS for 'registration'"}) + response, err := c.Client.Query(c.query("SHOW DIAGNOSTICS for 'registration'", "")) if err != nil { return "", err }