diff --git a/CHANGELOG.md b/CHANGELOG.md index 31ca50b1c76..2f758eb7ff2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - [#5372](https://github.com/influxdata/influxdb/pull/5372): Faster shard loading - [#6148](https://github.com/influxdata/influxdb/pull/6148): Build script is now compatible with Python 3. Added ability to create detached signatures for packages. Build script now uses Python logging facility for messages. - [#6115](https://github.com/influxdata/influxdb/issues/6115): Support chunking query results mid-series. Limit non-chunked output. +- [#6166](https://github.com/influxdata/influxdb/pull/6166): Teach influxdb client how to use chunked queries and use in the CLI. ### Bugfixes diff --git a/client/influxdb.go b/client/influxdb.go index 2f62aacb803..343a4e09d35 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -32,6 +33,18 @@ const ( type Query struct { Command string Database string + + // Chunked tells the server to send back chunked responses. This places + // less load on the server by sending back chunks of the response rather + // than waiting for the entire response all at once. + Chunked bool + + // ChunkSize sets the maximum number of rows that will be returned per + // chunk. Chunks are either divided based on their series or if they hit + // the chunk size limit. + // + // Chunked must be set to true for this option to be used. + ChunkSize int } // ParseConnectionString will parse a string to create a valid connection URL @@ -157,6 +170,12 @@ func (c *Client) Query(q Query) (*Response, error) { values := u.Query() values.Set("q", q.Command) values.Set("db", q.Database) + if q.Chunked { + values.Set("chunked", "true") + if q.ChunkSize > 0 { + values.Set("chunk_size", strconv.Itoa(q.ChunkSize)) + } + } if c.precision != "" { values.Set("epoch", c.precision) } @@ -178,19 +197,38 @@ func (c *Client) Query(q Query) (*Response, error) { defer resp.Body.Close() var response Response - dec := json.NewDecoder(resp.Body) - dec.UseNumber() - decErr := dec.Decode(&response) + if q.Chunked { + cr := NewChunkedResponse(resp.Body) + for { + r, err := cr.NextResponse() + if err != nil { + // If we got an error while decoding the response, send that back. + return nil, err + } - // ignore this error if we got an invalid status code - if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { - decErr = nil - } - // If we got a valid decode error, send that back - if decErr != nil { - return nil, decErr + if r == nil { + break + } + + response.Results = append(response.Results, r.Results...) + if r.Err != nil { + response.Err = r.Err + break + } + } + } else { + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + if err := dec.Decode(&response); err != nil { + // Ignore EOF errors if we got an invalid status code. + if !(err == io.EOF && resp.StatusCode != http.StatusOK) { + return nil, err + } + } } - // If we don't have an error in our json response, and didn't get StatusOK, then send back an error + + // If we don't have an error in our json response, and didn't get StatusOK, + // then send back an error. if resp.StatusCode != http.StatusOK && response.Error() == nil { return &response, fmt.Errorf("received status code %d from server", resp.StatusCode) } @@ -437,7 +475,7 @@ func (r *Response) UnmarshalJSON(b []byte) error { // Error returns the first error from any statement. // Returns nil if no errors occurred on any statements. -func (r Response) Error() error { +func (r *Response) Error() error { if r.Err != nil { return r.Err } @@ -449,6 +487,31 @@ func (r Response) Error() error { return nil } +// ChunkedResponse represents a response from the server that +// uses chunking to stream the output. +type ChunkedResponse struct { + dec *json.Decoder +} + +// NewChunkedResponse reads a stream and produces responses from the stream. +func NewChunkedResponse(r io.Reader) *ChunkedResponse { + dec := json.NewDecoder(r) + dec.UseNumber() + return &ChunkedResponse{dec: dec} +} + +// NextResponse reads the next line of the stream and returns a response. +func (r *ChunkedResponse) NextResponse() (*Response, error) { + var response Response + if err := r.dec.Decode(&response); err != nil { + if err == io.EOF { + return nil, nil + } + return nil, err + } + return &response, nil +} + // Point defines the fields that will be written to the database // Measurement, Time, and Fields are required // Precision can be specified if the time is in epoch format (integer). diff --git a/client/influxdb_test.go b/client/influxdb_test.go index c82eb6dd0a8..2f064ca41a5 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -169,6 +169,30 @@ func TestClient_Query(t *testing.T) { } } +func TestClient_ChunkedQuery(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var data client.Response + w.WriteHeader(http.StatusOK) + enc := json.NewEncoder(w) + _ = enc.Encode(data) + _ = enc.Encode(data) + })) + defer ts.Close() + + u, _ := url.Parse(ts.URL) + config := client.Config{URL: *u} + c, err := client.NewClient(config) + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } + + query := client.Query{Chunked: true} + _, err = c.Query(query) + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } +} + func TestClient_BasicAuth(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { u, p, ok := r.BasicAuth() @@ -741,3 +765,28 @@ war3JNM1mGB3o2iAtuOJlFIKLpI1x+1e8pI= } } } + +func TestChunkedResponse(t *testing.T) { + s := `{"results":[{},{}]}{"results":[{}]}` + r := client.NewChunkedResponse(strings.NewReader(s)) + resp, err := r.NextResponse() + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } else if actual := len(resp.Results); actual != 2 { + t.Fatalf("unexpected number of results. expected %v, actual %v", 2, actual) + } + + resp, err = r.NextResponse() + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } else if actual := len(resp.Results); actual != 1 { + t.Fatalf("unexpected number of results. expected %v, actual %v", 1, actual) + } + + resp, err = r.NextResponse() + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } else if resp != nil { + t.Fatalf("unexpected response. expected %v, actual %v", nil, resp) + } +} diff --git a/cluster/query_executor.go b/cluster/query_executor.go index 7699bbb46b7..f47088d92a7 100644 --- a/cluster/query_executor.go +++ b/cluster/query_executor.go @@ -94,7 +94,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu var qid uint64 if e.QueryManager != nil { var err error - _, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{ + qid, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{ Query: query, Database: database, Timeout: e.QueryTimeout, @@ -105,6 +105,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu results <- &influxql.Result{Err: err} return } + + defer e.QueryManager.KillQuery(qid) } logger := e.logger() diff --git a/cmd/influx/cli/cli.go b/cmd/influx/cli/cli.go index 53499a71273..ea4365234fe 100644 --- a/cmd/influx/cli/cli.go +++ b/cmd/influx/cli/cli.go @@ -56,6 +56,7 @@ type CommandLine struct { PPS int // Controls how many points per second the import will allow via throttling Path string Compressed bool + Chunked bool Quit chan struct{} IgnoreSignals bool // Ignore signals normally caught by this process (used primarily for testing) osSignals chan os.Signal @@ -518,9 +519,18 @@ func (c *CommandLine) Insert(stmt string) error { return nil } +// query creates a query struct to be used with the client. +func (c *CommandLine) query(query string, database string) client.Query { + return client.Query{ + Command: query, + Database: database, + Chunked: true, + } +} + // ExecuteQuery runs any query statement func (c *CommandLine) ExecuteQuery(query string) error { - response, err := c.Client.Query(client.Query{Command: query, Database: c.Database}) + response, err := c.Client.Query(c.query(query, c.Database)) if err != nil { fmt.Printf("ERR: %s\n", err) return err @@ -539,7 +549,7 @@ func (c *CommandLine) ExecuteQuery(query string) error { // DatabaseToken retrieves database token func (c *CommandLine) DatabaseToken() (string, error) { - response, err := c.Client.Query(client.Query{Command: "SHOW DIAGNOSTICS for 'registration'"}) + response, err := c.Client.Query(c.query("SHOW DIAGNOSTICS for 'registration'", "")) if err != nil { return "", err }