Skip to content

Commit

Permalink
Add msgpack handler for smarter JSON encoding
Browse files Browse the repository at this point in the history
The msgpack handler correctly retains information about whether the
number is a float or an integer, unlike JSON. While the format is not
human-readable, it makes a good interchange format for applications that
don't necessarily care about a human readable output.
  • Loading branch information
jsternberg committed Aug 25, 2016
1 parent a30f9b6 commit f6717ae
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [#7099](https://github.com/influxdata/influxdb/pull/7099): Implement text/csv content encoding for the response writer.
- [#6992](https://github.com/influxdata/influxdb/issues/6992): Support tools for running async queries.
- [#7136](https://github.com/influxdata/influxdb/pull/7136): Update jwt-go dependency to version 3.
- [#7154](https://github.com/influxdata/influxdb/pull/7154): Add msgpack handler for smart JSON encoding.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073
github.com/rakyll/statik 274df120e9065bdd08eb1120e0375e3dc1ae8465
github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d
github.com/ugorji/go 4a1cb5252a6951f715a85d0e4be334c2a2dbf2a2
golang.org/x/crypto c197bcf24cde29d3f73c7b4ac6fd41f4384e8af6
1 change: 1 addition & 0 deletions LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE)
- github.com/ugorji/go/codec [MIT LICENSE](https://github.com/ugorji/go/blob/master/LICENSE)
- glyphicons [LICENSE](http://glyphicons.com/license/)
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)
Expand Down
8 changes: 4 additions & 4 deletions influxql/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func ReadOnlyWarning(stmt string) *Message {
type Result struct {
// StatementID is just the statement's position in the query. It's used
// to combine statement results if they're being buffered in memory.
StatementID int `json:"-"`
Series models.Rows
Messages []*Message
Err error
StatementID int `json:"-"`
Series models.Rows `codec:"series,omitempty"`
Messages []*Message `codec:"messages,omitempty"`
Err error `codec:"error,omitempty"`
}

// MarshalJSON encodes the result into JSON.
Expand Down
4 changes: 2 additions & 2 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,8 +1106,8 @@ func (h *Handler) recovery(inner http.Handler, name string) http.Handler {

// Response represents a list of statement results.
type Response struct {
Results []*influxql.Result
Err error
Results []*influxql.Result `codec:"results,omitempty"`
Err error `codec:"error,omitempty"`
}

// MarshalJSON encodes a Response struct into JSON.
Expand Down
80 changes: 79 additions & 1 deletion services/httpd/response_writer.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package httpd

import (
"bytes"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/http"
"reflect"
"strconv"
"time"

"github.com/influxdata/influxdb/models"
"github.com/ugorji/go/codec"
)

// ResponseWriter is an interface for writing a response.
Expand All @@ -28,6 +32,9 @@ func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter {
case "application/csv", "text/csv":
w.Header().Add("Content-Type", "text/csv")
rw.formatter = &csvFormatter{statementID: -1, Writer: w}
case "application/x-msgpack":
w.Header().Add("Content-Type", "application/x-msgpack")
rw.formatter = newMsgpackFormatter(w)
case "application/json":
fallthrough
default:
Expand Down Expand Up @@ -102,7 +109,8 @@ type csvFormatter struct {
}

func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) {
csv := csv.NewWriter(w)
csv := csv.NewWriter(writer{Writer: w, n: &n})
defer csv.Flush()
for _, result := range resp.Results {
if result.StatementID != w.statementID {
// If there are no series in the result, skip past this result.
Expand Down Expand Up @@ -172,3 +180,73 @@ func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) {
}
return n, nil
}

type msgpackTimeExt struct {
enc *codec.Encoder
buf bytes.Buffer
}

func newMsgpackTimeExt(h *codec.MsgpackHandle) *msgpackTimeExt {
ext := &msgpackTimeExt{}
ext.enc = codec.NewEncoder(&ext.buf, h)
return ext
}

func (x *msgpackTimeExt) WriteExt(v interface{}) (data []byte) {
var t time.Time
switch v := v.(type) {
case time.Time:
t = v
case *time.Time:
t = *v
default:
panic(fmt.Sprintf("unsupported format for time conversion: expecting time.Time, got %T", v))
}

// The codec library does not expose encoding to a byte string directly so
// we use our own encoder to encode an int. We reuse the internal buffer to
// reduce the number of allocations. This makes this extension
// non-threadsafe, but we make a new extension writer every time we create
// an encoder so this shouldn't matter.
x.buf.Reset()
x.enc.MustEncode(t.UnixNano())
return x.buf.Bytes()
}

func (x *msgpackTimeExt) ReadExt(dst interface{}, src []byte) { panic("unsupported") }

type msgpackFormatter struct {
io.Writer
enc *codec.Encoder
w bytes.Buffer
}

func newMsgpackFormatter(w io.Writer) *msgpackFormatter {
var mh codec.MsgpackHandle
mh.WriteExt = true
mh.SetBytesExt(reflect.TypeOf(time.Time{}), 1, newMsgpackTimeExt(&mh))

mf := &msgpackFormatter{Writer: w}
mf.enc = codec.NewEncoder(&mf.w, &mh)
return mf
}

func (w *msgpackFormatter) WriteResponse(resp Response) (n int, err error) {
if err := w.enc.Encode(resp); err != nil {
return 0, err
}
n, err = w.Writer.Write(w.w.Bytes())
w.w.Reset()
return
}

type writer struct {
io.Writer
n *int
}

func (w writer) Write(data []byte) (n int, err error) {
n, err = w.Writer.Write(data)
*w.n += n
return n, err
}
92 changes: 92 additions & 0 deletions services/httpd/response_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package httpd_test

import (
"fmt"
"net/http"
"testing"
"time"

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/httpd"
)

// discard is an http.ResponseWriter that discards all output.
type discard struct{}

func (discard) Header() http.Header { return http.Header{} }
func (discard) WriteHeader(int) {}
func (discard) Write(data []byte) (int, error) { return len(data), nil }

func BenchmarkJSONResponseWriter_1K(b *testing.B) {
benchmarkResponseWriter(b, "application/json", 10, 100)
}
func BenchmarkJSONResponseWriter_100K(b *testing.B) {
benchmarkResponseWriter(b, "application/json", 1000, 100)
}
func BenchmarkJSONResponseWriter_1M(b *testing.B) {
benchmarkResponseWriter(b, "application/json", 10000, 100)
}

func BenchmarkMsgpackResponseWriter_1K(b *testing.B) {
benchmarkResponseWriter(b, "application/x-msgpack", 10, 100)
}
func BenchmarkMsgpackResponseWriter_100K(b *testing.B) {
benchmarkResponseWriter(b, "application/x-msgpack", 1000, 100)
}
func BenchmarkMsgpackResponseWriter_1M(b *testing.B) {
benchmarkResponseWriter(b, "application/x-msgpack", 10000, 100)
}

func BenchmarkCSVResponseWriter_1K(b *testing.B) {
benchmarkResponseWriter(b, "text/csv", 10, 100)
}
func BenchmarkCSVResponseWriter_100K(b *testing.B) {
benchmarkResponseWriter(b, "text/csv", 1000, 100)
}
func BenchmarkCSVResponseWriter_1M(b *testing.B) {
benchmarkResponseWriter(b, "text/csv", 10000, 100)
}

func benchmarkResponseWriter(b *testing.B, contentType string, seriesN, pointsPerSeriesN int) {
r, err := http.NewRequest("POST", "/query", nil)
if err != nil {
b.Fatal(err)
}
r.Header.Set("Accept", contentType)

// Generate a sample result.
rows := make(models.Rows, 0, seriesN)
for i := 0; i < seriesN; i++ {
row := &models.Row{
Name: "cpu",
Tags: map[string]string{
"host": fmt.Sprintf("server-%d", i),
},
Columns: []string{"time", "value"},
Values: make([][]interface{}, 0, b.N),
}

for j := 0; j < pointsPerSeriesN; j++ {
row.Values = append(row.Values, []interface{}{
time.Unix(int64(10*j), 0),
float64(100),
})
}
rows = append(rows, row)
}
result := &influxql.Result{Series: rows}

// Create new ResponseWriter with the underlying ResponseWriter
// being the discard writer so we only benchmark the marshaling.
w := httpd.NewResponseWriter(discard{}, r)

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
w.WriteResponse(httpd.Response{
Results: []*influxql.Result{result},
})
}
}

0 comments on commit f6717ae

Please sign in to comment.