Skip to content

Commit

Permalink
Merge pull request #7989 from jgeiger/master
Browse files Browse the repository at this point in the history
Add chunked processing back into v2 client
  • Loading branch information
jsternberg authored Feb 13, 2017
2 parents 8d0f2c3 + 43117a9 commit 19f331a
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [#7776](https://github.com/influxdata/influxdb/issues/7776): Add system information to /debug/vars.
- [#7948](https://github.com/influxdata/influxdb/pull/7948): Reduce memory allocations by reusing gzip.Writers across requests
- [#7553](https://github.com/influxdata/influxdb/issues/7553): Add modulo operator to the query language.
- [#7977](https://github.com/influxdata/influxdb/issues/7977): Add chunked request processing back into the Go client v2

## v1.2.1 [unreleased]

Expand Down
105 changes: 94 additions & 11 deletions client/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb/models"
Expand Down Expand Up @@ -405,6 +408,8 @@ type Query struct {
Command string
Database string
Precision string
Chunked bool
ChunkSize int
Parameters map[string]interface{}
}

Expand Down Expand Up @@ -491,6 +496,12 @@ func (c *client) Query(q Query) (*Response, error) {
params.Set("q", q.Command)
params.Set("db", q.Database)
params.Set("params", string(jsonParameters))
if q.Chunked {
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}

if q.Precision != "" {
params.Set("epoch", q.Precision)
Expand All @@ -504,17 +515,38 @@ func (c *client) Query(q Query) (*Response, error) {
defer resp.Body.Close()

var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)

// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}

if r == nil {
break
}

response.Results = append(response.Results, r.Results...)
if r.Err != "" {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)

// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
}
}
// If we don't have an error in our json response, and didn't get statusOK
// then send back an error
Expand All @@ -524,3 +556,54 @@ func (c *client) Query(q Query) (*Response, error) {
}
return &response, nil
}

// duplexReader reads responses and writes it to another writer while
// satisfying the reader interface.
type duplexReader struct {
r io.Reader
w io.Writer
}

func (r *duplexReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
if err == nil {
r.w.Write(p[:n])
}
return n, err
}

// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
duplex *duplexReader
buf bytes.Buffer
}

// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
resp := &ChunkedResponse{}
resp.duplex = &duplexReader{r: r, w: &resp.buf}
resp.dec = json.NewDecoder(resp.duplex)
resp.dec.UseNumber()
return resp
}

// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response

if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
// A decoding error happened. This probably means the server crashed
// and sent a last-ditch error message to us. Ensure we have read the
// entirety of the connection to get any remaining error text.
io.Copy(ioutil.Discard, r.duplex)
return nil, errors.New(strings.TrimSpace(r.buf.String()))
}

r.buf.Reset()
return &response, nil
}
23 changes: 23 additions & 0 deletions client/v2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,29 @@ func TestClient_Query(t *testing.T) {
}
}

func TestClient_ChunkedQuery(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
_ = enc.Encode(data)
_ = enc.Encode(data)
}))
defer ts.Close()

config := HTTPConfig{Addr: ts.URL}
c, err := NewHTTPClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}

query := Query{Chunked: true}
_, err = c.Query(query)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
}

func TestClient_BoundParameters(t *testing.T) {
var parameterString string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down

0 comments on commit 19f331a

Please sign in to comment.