Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support chunked queries in the Go InfluxDB client #6166

Merged
merged 2 commits into from
Mar 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [#5372](https://github.com/influxdata/influxdb/pull/5372): Faster shard loading
- [#6148](https://github.com/influxdata/influxdb/pull/6148): Build script is now compatible with Python 3. Added ability to create detached signatures for packages. Build script now uses Python logging facility for messages.
- [#6115](https://github.com/influxdata/influxdb/issues/6115): Support chunking query results mid-series. Limit non-chunked output.
- [#6166](https://github.com/influxdata/influxdb/pull/6166): Teach influxdb client how to use chunked queries and use in the CLI.

### Bugfixes

Expand Down
87 changes: 75 additions & 12 deletions client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand All @@ -32,6 +33,18 @@ const (
type Query struct {
Command string
Database string

// Chunked tells the server to send back chunked responses. This places
// less load on the server by sending back chunks of the response rather
// than waiting for the entire response all at once.
Chunked bool

// ChunkSize sets the maximum number of rows that will be returned per
// chunk. Chunks are either divided based on their series or if they hit
// the chunk size limit.
//
// Chunked must be set to true for this option to be used.
ChunkSize int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need Chunked? Could we use ChunkSize > 0 to indicate Chunked = true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you just send chunked, it will use the default. If you require the chunk size to be set, you can never use the default and have to set the value. I guess the question is if we care about that or we want the client to have a default value that it uses that is separate from the server's.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. Makes sense.

}

// ParseConnectionString will parse a string to create a valid connection URL
Expand Down Expand Up @@ -157,6 +170,12 @@ func (c *Client) Query(q Query) (*Response, error) {
values := u.Query()
values.Set("q", q.Command)
values.Set("db", q.Database)
if q.Chunked {
values.Set("chunked", "true")
if q.ChunkSize > 0 {
values.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}
if c.precision != "" {
values.Set("epoch", c.precision)
}
Expand All @@ -178,19 +197,38 @@ func (c *Client) Query(q Query) (*Response, error) {
defer resp.Body.Close()

var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}

// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, decErr
if r == nil {
break
}

response.Results = append(response.Results, r.Results...)
if r.Err != nil {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
if err := dec.Decode(&response); err != nil {
// Ignore EOF errors if we got an invalid status code.
if !(err == io.EOF && resp.StatusCode != http.StatusOK) {
return nil, err
}
}
}
// If we don't have an error in our json response, and didn't get StatusOK, then send back an error

// If we don't have an error in our json response, and didn't get StatusOK,
// then send back an error.
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
}
Expand Down Expand Up @@ -437,7 +475,7 @@ func (r *Response) UnmarshalJSON(b []byte) error {

// Error returns the first error from any statement.
// Returns nil if no errors occurred on any statements.
func (r Response) Error() error {
func (r *Response) Error() error {
if r.Err != nil {
return r.Err
}
Expand All @@ -449,6 +487,31 @@ func (r Response) Error() error {
return nil
}

// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
}

// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
dec := json.NewDecoder(r)
dec.UseNumber()
return &ChunkedResponse{dec: dec}
}

// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
return nil, err
}
return &response, nil
}

// Point defines the fields that will be written to the database
// Measurement, Time, and Fields are required
// Precision can be specified if the time is in epoch format (integer).
Expand Down
49 changes: 49 additions & 0 deletions client/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,30 @@ func TestClient_Query(t *testing.T) {
}
}

func TestClient_ChunkedQuery(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data client.Response
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
_ = enc.Encode(data)
_ = enc.Encode(data)
}))
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}

query := client.Query{Chunked: true}
_, err = c.Query(query)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
}

func TestClient_BasicAuth(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u, p, ok := r.BasicAuth()
Expand Down Expand Up @@ -741,3 +765,28 @@ war3JNM1mGB3o2iAtuOJlFIKLpI1x+1e8pI=
}
}
}

func TestChunkedResponse(t *testing.T) {
s := `{"results":[{},{}]}{"results":[{}]}`
r := client.NewChunkedResponse(strings.NewReader(s))
resp, err := r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know what I'm going to say about the order of expected and actual here 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same response I always give :) the rest of the file is one way and I'm going to follow whatever the rest of the file does. I think the double space is weird, but keeping an individual file consistent is more important than my OCD.

} else if actual := len(resp.Results); actual != 2 {
t.Fatalf("unexpected number of results. expected %v, actual %v", 2, actual)
}

resp, err = r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
} else if actual := len(resp.Results); actual != 1 {
t.Fatalf("unexpected number of results. expected %v, actual %v", 1, actual)
}

resp, err = r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
} else if resp != nil {
t.Fatalf("unexpected response. expected %v, actual %v", nil, resp)
}
}
4 changes: 3 additions & 1 deletion cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
var qid uint64
if e.QueryManager != nil {
var err error
_, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{
qid, closing, err = e.QueryManager.AttachQuery(&influxql.QueryParams{
Query: query,
Database: database,
Timeout: e.QueryTimeout,
Expand All @@ -105,6 +105,8 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
results <- &influxql.Result{Err: err}
return
}

defer e.QueryManager.KillQuery(qid)
}

logger := e.logger()
Expand Down
14 changes: 12 additions & 2 deletions cmd/influx/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type CommandLine struct {
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
Chunked bool
Quit chan struct{}
IgnoreSignals bool // Ignore signals normally caught by this process (used primarily for testing)
osSignals chan os.Signal
Expand Down Expand Up @@ -518,9 +519,18 @@ func (c *CommandLine) Insert(stmt string) error {
return nil
}

// query creates a query struct to be used with the client.
func (c *CommandLine) query(query string, database string) client.Query {
return client.Query{
Command: query,
Database: database,
Chunked: true,
}
}

// ExecuteQuery runs any query statement
func (c *CommandLine) ExecuteQuery(query string) error {
response, err := c.Client.Query(client.Query{Command: query, Database: c.Database})
response, err := c.Client.Query(c.query(query, c.Database))
if err != nil {
fmt.Printf("ERR: %s\n", err)
return err
Expand All @@ -539,7 +549,7 @@ func (c *CommandLine) ExecuteQuery(query string) error {

// DatabaseToken retrieves database token
func (c *CommandLine) DatabaseToken() (string, error) {
response, err := c.Client.Query(client.Query{Command: "SHOW DIAGNOSTICS for 'registration'"})
response, err := c.Client.Query(c.query("SHOW DIAGNOSTICS for 'registration'", ""))
if err != nil {
return "", err
}
Expand Down