Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunked processing back into v2 client #7989

Merged
merged 1 commit into from
Feb 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [#7776](https://github.com/influxdata/influxdb/issues/7776): Add system information to /debug/vars.
- [#7948](https://github.com/influxdata/influxdb/pull/7948): Reduce memory allocations by reusing gzip.Writers across requests
- [#7553](https://github.com/influxdata/influxdb/issues/7553): Add modulo operator to the query language.
- [#7977](https://github.com/influxdata/influxdb/issues/7977): Add chunked request processing back into the Go client v2

## v1.2.1 [unreleased]

Expand Down
105 changes: 94 additions & 11 deletions client/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb/models"
Expand Down Expand Up @@ -405,6 +408,8 @@ type Query struct {
Command string
Database string
Precision string
Chunked bool
ChunkSize int
Parameters map[string]interface{}
}

Expand Down Expand Up @@ -491,6 +496,12 @@ func (c *client) Query(q Query) (*Response, error) {
params.Set("q", q.Command)
params.Set("db", q.Database)
params.Set("params", string(jsonParameters))
if q.Chunked {
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}

if q.Precision != "" {
params.Set("epoch", q.Precision)
Expand All @@ -504,17 +515,38 @@ func (c *client) Query(q Query) (*Response, error) {
defer resp.Body.Close()

var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)

// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}

if r == nil {
break
}

response.Results = append(response.Results, r.Results...)
if r.Err != "" {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)

// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
}
}
// If we don't have an error in our json response, and didn't get statusOK
// then send back an error
Expand All @@ -524,3 +556,54 @@ func (c *client) Query(q Query) (*Response, error) {
}
return &response, nil
}

// duplexReader reads responses and writes it to another writer while
// satisfying the reader interface.
type duplexReader struct {
r io.Reader
w io.Writer
}

func (r *duplexReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
if err == nil {
r.w.Write(p[:n])
}
return n, err
}

// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
duplex *duplexReader
buf bytes.Buffer
}

// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
resp := &ChunkedResponse{}
resp.duplex = &duplexReader{r: r, w: &resp.buf}
resp.dec = json.NewDecoder(resp.duplex)
resp.dec.UseNumber()
return resp
}

// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response

if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
// A decoding error happened. This probably means the server crashed
// and sent a last-ditch error message to us. Ensure we have read the
// entirety of the connection to get any remaining error text.
io.Copy(ioutil.Discard, r.duplex)
return nil, errors.New(strings.TrimSpace(r.buf.String()))
}

r.buf.Reset()
return &response, nil
}
23 changes: 23 additions & 0 deletions client/v2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,29 @@ func TestClient_Query(t *testing.T) {
}
}

func TestClient_ChunkedQuery(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
_ = enc.Encode(data)
_ = enc.Encode(data)
}))
defer ts.Close()

config := HTTPConfig{Addr: ts.URL}
c, err := NewHTTPClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}

query := Query{Chunked: true}
_, err = c.Query(query)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
}

func TestClient_BoundParameters(t *testing.T) {
var parameterString string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down