diff --git a/CHANGELOG.md b/CHANGELOG.md index ee7494a3d6c..d81b0dac67e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#7099](https://github.com/influxdata/influxdb/pull/7099): Implement text/csv content encoding for the response writer. - [#6992](https://github.com/influxdata/influxdb/issues/6992): Support tools for running async queries. - [#7136](https://github.com/influxdata/influxdb/pull/7136): Update jwt-go dependency to version 3. +- [#7154](https://github.com/influxdata/influxdb/pull/7154): Add msgpack handler for smart JSON encoding. ### Bugfixes diff --git a/Godeps b/Godeps index f5604307196..60bceb0a558 100644 --- a/Godeps +++ b/Godeps @@ -15,4 +15,5 @@ github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073 github.com/rakyll/statik 274df120e9065bdd08eb1120e0375e3dc1ae8465 github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d +github.com/ugorji/go 4a1cb5252a6951f715a85d0e4be334c2a2dbf2a2 golang.org/x/crypto c197bcf24cde29d3f73c7b4ac6fd41f4384e8af6 diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md index 14bbfa9535f..d23b31951f0 100644 --- a/LICENSE_OF_DEPENDENCIES.md +++ b/LICENSE_OF_DEPENDENCIES.md @@ -17,6 +17,7 @@ - github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) - github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) - github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE) +- github.com/ugorji/go/codec [MIT LICENSE](https://github.com/ugorji/go/blob/master/LICENSE) - glyphicons [LICENSE](http://glyphicons.com/license/) - golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) - jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt) diff --git a/influxql/result.go b/influxql/result.go index 0069208fb85..b745fe3b869 100644 --- a/influxql/result.go +++ b/influxql/result.go @@ -58,10 +58,10 @@ func ReadOnlyWarning(stmt string) *Message { type Result struct { // StatementID is just the statement's position in the query. It's used // to combine statement results if they're being buffered in memory. - StatementID int `json:"-"` - Series models.Rows - Messages []*Message - Err error + StatementID int `json:"-"` + Series models.Rows `codec:"series,omitempty"` + Messages []*Message `codec:"messages,omitempty"` + Err error `codec:"error,omitempty"` } // MarshalJSON encodes the result into JSON. diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 2a1be276745..64b69f9c56b 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -1106,8 +1106,8 @@ func (h *Handler) recovery(inner http.Handler, name string) http.Handler { // Response represents a list of statement results. type Response struct { - Results []*influxql.Result - Err error + Results []*influxql.Result `codec:"results,omitempty"` + Err error `codec:"error,omitempty"` } // MarshalJSON encodes a Response struct into JSON. diff --git a/services/httpd/response_writer.go b/services/httpd/response_writer.go index 9778571874c..683e4b5eb86 100644 --- a/services/httpd/response_writer.go +++ b/services/httpd/response_writer.go @@ -1,14 +1,18 @@ package httpd import ( + "bytes" "encoding/csv" "encoding/json" + "fmt" "io" "net/http" + "reflect" "strconv" "time" "github.com/influxdata/influxdb/models" + "github.com/ugorji/go/codec" ) // ResponseWriter is an interface for writing a response. @@ -28,6 +32,9 @@ func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter { case "application/csv", "text/csv": w.Header().Add("Content-Type", "text/csv") rw.formatter = &csvFormatter{statementID: -1, Writer: w} + case "application/x-msgpack": + w.Header().Add("Content-Type", "application/x-msgpack") + rw.formatter = newMsgpackFormatter(w) case "application/json": fallthrough default: @@ -102,7 +109,8 @@ type csvFormatter struct { } func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) { - csv := csv.NewWriter(w) + csv := csv.NewWriter(writer{Writer: w, n: &n}) + defer csv.Flush() for _, result := range resp.Results { if result.StatementID != w.statementID { // If there are no series in the result, skip past this result. @@ -172,3 +180,73 @@ func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) { } return n, nil } + +type msgpackTimeExt struct { + enc *codec.Encoder + buf bytes.Buffer +} + +func newMsgpackTimeExt(h *codec.MsgpackHandle) *msgpackTimeExt { + ext := &msgpackTimeExt{} + ext.enc = codec.NewEncoder(&ext.buf, h) + return ext +} + +func (x *msgpackTimeExt) WriteExt(v interface{}) (data []byte) { + var t time.Time + switch v := v.(type) { + case time.Time: + t = v + case *time.Time: + t = *v + default: + panic(fmt.Sprintf("unsupported format for time conversion: expecting time.Time, got %T", v)) + } + + // The codec library does not expose encoding to a byte string directly so + // we use our own encoder to encode an int. We reuse the internal buffer to + // reduce the number of allocations. This makes this extension + // non-threadsafe, but we make a new extension writer every time we create + // an encoder so this shouldn't matter. + x.buf.Reset() + x.enc.MustEncode(t.UnixNano()) + return x.buf.Bytes() +} + +func (x *msgpackTimeExt) ReadExt(dst interface{}, src []byte) { panic("unsupported") } + +type msgpackFormatter struct { + io.Writer + enc *codec.Encoder + w bytes.Buffer +} + +func newMsgpackFormatter(w io.Writer) *msgpackFormatter { + var mh codec.MsgpackHandle + mh.WriteExt = true + mh.SetBytesExt(reflect.TypeOf(time.Time{}), 1, newMsgpackTimeExt(&mh)) + + mf := &msgpackFormatter{Writer: w} + mf.enc = codec.NewEncoder(&mf.w, &mh) + return mf +} + +func (w *msgpackFormatter) WriteResponse(resp Response) (n int, err error) { + if err := w.enc.Encode(resp); err != nil { + return 0, err + } + n, err = w.Writer.Write(w.w.Bytes()) + w.w.Reset() + return +} + +type writer struct { + io.Writer + n *int +} + +func (w writer) Write(data []byte) (n int, err error) { + n, err = w.Writer.Write(data) + *w.n += n + return n, err +} diff --git a/services/httpd/response_writer_test.go b/services/httpd/response_writer_test.go new file mode 100644 index 00000000000..06f8b3b7042 --- /dev/null +++ b/services/httpd/response_writer_test.go @@ -0,0 +1,92 @@ +package httpd_test + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/services/httpd" +) + +// discard is an http.ResponseWriter that discards all output. +type discard struct{} + +func (discard) Header() http.Header { return http.Header{} } +func (discard) WriteHeader(int) {} +func (discard) Write(data []byte) (int, error) { return len(data), nil } + +func BenchmarkJSONResponseWriter_1K(b *testing.B) { + benchmarkResponseWriter(b, "application/json", 10, 100) +} +func BenchmarkJSONResponseWriter_100K(b *testing.B) { + benchmarkResponseWriter(b, "application/json", 1000, 100) +} +func BenchmarkJSONResponseWriter_1M(b *testing.B) { + benchmarkResponseWriter(b, "application/json", 10000, 100) +} + +func BenchmarkMsgpackResponseWriter_1K(b *testing.B) { + benchmarkResponseWriter(b, "application/x-msgpack", 10, 100) +} +func BenchmarkMsgpackResponseWriter_100K(b *testing.B) { + benchmarkResponseWriter(b, "application/x-msgpack", 1000, 100) +} +func BenchmarkMsgpackResponseWriter_1M(b *testing.B) { + benchmarkResponseWriter(b, "application/x-msgpack", 10000, 100) +} + +func BenchmarkCSVResponseWriter_1K(b *testing.B) { + benchmarkResponseWriter(b, "text/csv", 10, 100) +} +func BenchmarkCSVResponseWriter_100K(b *testing.B) { + benchmarkResponseWriter(b, "text/csv", 1000, 100) +} +func BenchmarkCSVResponseWriter_1M(b *testing.B) { + benchmarkResponseWriter(b, "text/csv", 10000, 100) +} + +func benchmarkResponseWriter(b *testing.B, contentType string, seriesN, pointsPerSeriesN int) { + r, err := http.NewRequest("POST", "/query", nil) + if err != nil { + b.Fatal(err) + } + r.Header.Set("Accept", contentType) + + // Generate a sample result. + rows := make(models.Rows, 0, seriesN) + for i := 0; i < seriesN; i++ { + row := &models.Row{ + Name: "cpu", + Tags: map[string]string{ + "host": fmt.Sprintf("server-%d", i), + }, + Columns: []string{"time", "value"}, + Values: make([][]interface{}, 0, b.N), + } + + for j := 0; j < pointsPerSeriesN; j++ { + row.Values = append(row.Values, []interface{}{ + time.Unix(int64(10*j), 0), + float64(100), + }) + } + rows = append(rows, row) + } + result := &influxql.Result{Series: rows} + + // Create new ResponseWriter with the underlying ResponseWriter + // being the discard writer so we only benchmark the marshaling. + w := httpd.NewResponseWriter(discard{}, r) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + w.WriteResponse(httpd.Response{ + Results: []*influxql.Result{result}, + }) + } +}