From f4d55fdcf40c5902ad65ea2c9aeb5c95ec5f823a Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 29 Nov 2016 14:02:49 -0600 Subject: [PATCH] Add msgpack handler for smarter JSON encoding The msgpack handler correctly retains information about whether the number is a float or an integer, unlike JSON. While the format is not human-readable, it makes a good interchange format for applications that don't necessarily care about a human readable output. This uses `github.com/tinylib/msgp` to precompile the marshaling for `models.Row`. Since we only use this library to marshal the one type, this is a much more efficient method of encoding than using reflection. --- CHANGELOG.md | 4 + Godeps | 2 + LICENSE_OF_DEPENDENCIES.md | 2 + models/encode.go | 271 +++++++++++++++++++++++++ models/encode_test.go | 122 +++++++++++ models/rows.go | 40 ++-- services/httpd/response_writer.go | 54 ++++- services/httpd/response_writer_test.go | 92 +++++++++ 8 files changed, 567 insertions(+), 20 deletions(-) create mode 100644 models/encode.go create mode 100644 models/encode_test.go create mode 100644 services/httpd/response_writer_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 46567a73c86..6ff6b30332c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - [#7066](https://github.com/influxdata/influxdb/issues/7066): Add support for secure transmission via collectd. - [#7554](https://github.com/influxdata/influxdb/pull/7554): update latest dependencies with Godeps. - [#7368](https://github.com/influxdata/influxdb/pull/7368): Introduce syntax for marking a partial response with chunking. +- [#7154](https://github.com/influxdata/influxdb/pull/7154): Add msgpack handler for smart JSON encoding. ### Bugfixes @@ -88,6 +89,9 @@ All Changes: ### Bugfixes - [#7392](https://github.com/influxdata/influxdb/pull/7392): Enable https subscriptions to work with custom CA certificates. + +### Bugfixes + - [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key. - [#7152](https://github.com/influxdata/influxdb/issues/7152): Decrement number of measurements only once when deleting the last series from a measurement. - [#7177](https://github.com/influxdata/influxdb/issues/7177): Fix base64 encoding issue with /debug/vars stats. diff --git a/Godeps b/Godeps index 36246ff162e..66d4fc1f073 100644 --- a/Godeps +++ b/Godeps @@ -12,6 +12,8 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/jwilder/encoding 4dada27c33277820fe35c7ee71ed34fbc9477d00 github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073 +github.com/philhofer/fwd 98c11a7a6ec829d672b03833c3d69a7fae1ca972 github.com/rakyll/statik e383bbf6b2ec1a2fb8492dfd152d945fb88919b6 github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d +github.com/tinylib/msgp ad0ff2e232ad2e37faf67087fb24bf8d04a8ce20 golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md index f35eb5ba081..2458c00e2cc 100644 --- a/LICENSE_OF_DEPENDENCIES.md +++ b/LICENSE_OF_DEPENDENCIES.md @@ -14,8 +14,10 @@ - github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE) - github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE) - github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) +- github.com/philhofer/fwd [MIT LICENSE](https://github.com/philhofer/fwd/blob/master/LICENSE.md) - github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) - github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE) +- github.com/tinylib/msgp [BSD LICENSE](https://github.com/tinylib/msgp/blob/master/LICENSE) - glyphicons [LICENSE](http://glyphicons.com/license/) - golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) - jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt) diff --git a/models/encode.go b/models/encode.go new file mode 100644 index 00000000000..a2c850174f5 --- /dev/null +++ b/models/encode.go @@ -0,0 +1,271 @@ +package models + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import "github.com/tinylib/msgp/msgp" + +// DecodeMsg implements msgp.Decodable +func (z *Row) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zwht uint32 + zwht, err = dc.ReadMapHeader() + if err != nil { + return + } + for zwht > 0 { + zwht-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "name": + z.Name, err = dc.ReadString() + if err != nil { + return + } + case "tags": + var zhct uint32 + zhct, err = dc.ReadMapHeader() + if err != nil { + return + } + if z.Tags == nil && zhct > 0 { + z.Tags = make(map[string]string, zhct) + } else if len(z.Tags) > 0 { + for key, _ := range z.Tags { + delete(z.Tags, key) + } + } + for zhct > 0 { + zhct-- + var zxvk string + var zbzg string + zxvk, err = dc.ReadString() + if err != nil { + return + } + zbzg, err = dc.ReadString() + if err != nil { + return + } + z.Tags[zxvk] = zbzg + } + case "columns": + var zcua uint32 + zcua, err = dc.ReadArrayHeader() + if err != nil { + return + } + if cap(z.Columns) >= int(zcua) { + z.Columns = (z.Columns)[:zcua] + } else { + z.Columns = make([]string, zcua) + } + for zbai := range z.Columns { + z.Columns[zbai], err = dc.ReadString() + if err != nil { + return + } + } + case "values": + var zxhx uint32 + zxhx, err = dc.ReadArrayHeader() + if err != nil { + return + } + if cap(z.Values) >= int(zxhx) { + z.Values = (z.Values)[:zxhx] + } else { + z.Values = make([][]interface{}, zxhx) + } + for zcmr := range z.Values { + var zlqf uint32 + zlqf, err = dc.ReadArrayHeader() + if err != nil { + return + } + if cap(z.Values[zcmr]) >= int(zlqf) { + z.Values[zcmr] = (z.Values[zcmr])[:zlqf] + } else { + z.Values[zcmr] = make([]interface{}, zlqf) + } + for zajw := range z.Values[zcmr] { + z.Values[zcmr][zajw], err = dc.ReadIntf() + if err != nil { + return + } + } + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *Row) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "name" + err = en.Append(0x84, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + if err != nil { + return err + } + err = en.WriteString(z.Name) + if err != nil { + return + } + // write "tags" + err = en.Append(0xa4, 0x74, 0x61, 0x67, 0x73) + if err != nil { + return err + } + err = en.WriteMapHeader(uint32(len(z.Tags))) + if err != nil { + return + } + for zxvk, zbzg := range z.Tags { + err = en.WriteString(zxvk) + if err != nil { + return + } + err = en.WriteString(zbzg) + if err != nil { + return + } + } + // write "columns" + err = en.Append(0xa7, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73) + if err != nil { + return err + } + err = en.WriteArrayHeader(uint32(len(z.Columns))) + if err != nil { + return + } + for zbai := range z.Columns { + err = en.WriteString(z.Columns[zbai]) + if err != nil { + return + } + } + // write "values" + err = en.Append(0xa6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73) + if err != nil { + return err + } + err = en.WriteArrayHeader(uint32(len(z.Values))) + if err != nil { + return + } + for zcmr := range z.Values { + err = en.WriteArrayHeader(uint32(len(z.Values[zcmr]))) + if err != nil { + return + } + for zajw := range z.Values[zcmr] { + err = en.WriteIntf(z.Values[zcmr][zajw]) + if err != nil { + return + } + } + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *Row) Msgsize() (s int) { + s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 5 + msgp.MapHeaderSize + if z.Tags != nil { + for zxvk, zbzg := range z.Tags { + _ = zbzg + s += msgp.StringPrefixSize + len(zxvk) + msgp.StringPrefixSize + len(zbzg) + } + } + s += 8 + msgp.ArrayHeaderSize + for zbai := range z.Columns { + s += msgp.StringPrefixSize + len(z.Columns[zbai]) + } + s += 7 + msgp.ArrayHeaderSize + for zcmr := range z.Values { + s += msgp.ArrayHeaderSize + for zajw := range z.Values[zcmr] { + s += msgp.GuessSize(z.Values[zcmr][zajw]) + } + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *Rows) DecodeMsg(dc *msgp.Reader) (err error) { + var zjfb uint32 + zjfb, err = dc.ReadArrayHeader() + if err != nil { + return + } + if cap((*z)) >= int(zjfb) { + (*z) = (*z)[:zjfb] + } else { + (*z) = make(Rows, zjfb) + } + for zpks := range *z { + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + return + } + (*z)[zpks] = nil + } else { + if (*z)[zpks] == nil { + (*z)[zpks] = new(Row) + } + err = (*z)[zpks].DecodeMsg(dc) + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z Rows) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteArrayHeader(uint32(len(z))) + if err != nil { + return + } + for zcxo := range z { + if z[zcxo] == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = z[zcxo].EncodeMsg(en) + if err != nil { + return + } + } + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z Rows) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + for zcxo := range z { + if z[zcxo] == nil { + s += msgp.NilSize + } else { + s += z[zcxo].Msgsize() + } + } + return +} diff --git a/models/encode_test.go b/models/encode_test.go new file mode 100644 index 00000000000..6b232393595 --- /dev/null +++ b/models/encode_test.go @@ -0,0 +1,122 @@ +package models + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestEncodeDecodeRow(t *testing.T) { + v := Row{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + } + + vn := Row{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRow(b *testing.B) { + v := Row{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRow(b *testing.B) { + v := Row{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeRows(t *testing.T) { + v := Rows{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + } + + vn := Rows{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRows(b *testing.B) { + v := Rows{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRows(b *testing.B) { + v := Rows{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/models/rows.go b/models/rows.go index 6006cdd19dd..6de8929735d 100644 --- a/models/rows.go +++ b/models/rows.go @@ -4,35 +4,37 @@ import ( "sort" ) +//go:generate msgp -io=true -marshal=false -o=encode.go + // Row represents a single row returned from the execution of a statement. type Row struct { - Name string `json:"name,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Columns []string `json:"columns,omitempty"` - Values [][]interface{} `json:"values,omitempty"` - Partial bool `json:"partial,omitempty"` + Name string `json:"name,omitempty" msg:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty" msg:"tags,omitempty"` + Columns []string `json:"columns,omitempty" msg:"columns,omitempty"` + Values [][]interface{} `json:"values,omitempty" msg:"values,omitempty"` + Partial bool `json:"partial,omitempty" msg:"values,omitempty"` } // SameSeries returns true if r contains values for the same series as o. -func (r *Row) SameSeries(o *Row) bool { - return r.tagsHash() == o.tagsHash() && r.Name == o.Name +func (z *Row) SameSeries(o *Row) bool { + return z.tagsHash() == o.tagsHash() && z.Name == o.Name } // tagsHash returns a hash of tag key/value pairs. -func (r *Row) tagsHash() uint64 { +func (z *Row) tagsHash() uint64 { h := NewInlineFNV64a() - keys := r.tagsKeys() + keys := z.tagsKeys() for _, k := range keys { h.Write([]byte(k)) - h.Write([]byte(r.Tags[k])) + h.Write([]byte(z.Tags[k])) } return h.Sum64() } // tagKeys returns a sorted list of tag keys. -func (r *Row) tagsKeys() []string { - a := make([]string, 0, len(r.Tags)) - for k := range r.Tags { +func (z *Row) tagsKeys() []string { + a := make([]string, 0, len(z.Tags)) + for k := range z.Tags { a = append(a, k) } sort.Strings(a) @@ -42,18 +44,18 @@ func (r *Row) tagsKeys() []string { // Rows represents a collection of rows. Rows implements sort.Interface. type Rows []*Row -func (p Rows) Len() int { return len(p) } +func (z Rows) Len() int { return len(z) } -func (p Rows) Less(i, j int) bool { +func (z Rows) Less(i, j int) bool { // Sort by name first. - if p[i].Name != p[j].Name { - return p[i].Name < p[j].Name + if z[i].Name != z[j].Name { + return z[i].Name < z[j].Name } // Sort by tag set hash. Tags don't have a meaningful sort order so we // just compute a hash and sort by that instead. This allows the tests // to receive rows in a predictable order every time. - return p[i].tagsHash() < p[j].tagsHash() + return z[i].tagsHash() < z[j].tagsHash() } -func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (z Rows) Swap(i, j int) { z[i], z[j] = z[j], z[i] } diff --git a/services/httpd/response_writer.go b/services/httpd/response_writer.go index 9778571874c..45fbbfdd6e7 100644 --- a/services/httpd/response_writer.go +++ b/services/httpd/response_writer.go @@ -9,8 +9,11 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/tinylib/msgp/msgp" ) +//go:generate msgp + // ResponseWriter is an interface for writing a response. type ResponseWriter interface { // WriteResponse writes a response. @@ -28,6 +31,9 @@ func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter { case "application/csv", "text/csv": w.Header().Add("Content-Type", "text/csv") rw.formatter = &csvFormatter{statementID: -1, Writer: w} + case "application/x-msgpack": + w.Header().Add("Content-Type", "application/x-msgpack") + rw.formatter = newMsgpackFormatter(w) case "application/json": fallthrough default: @@ -102,7 +108,8 @@ type csvFormatter struct { } func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) { - csv := csv.NewWriter(w) + csv := csv.NewWriter(writer{Writer: w, n: &n}) + defer csv.Flush() for _, result := range resp.Results { if result.StatementID != w.statementID { // If there are no series in the result, skip past this result. @@ -172,3 +179,48 @@ func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) { } return n, nil } + +type msgpackFormatter struct { + enc *msgp.Writer + n int +} + +func newMsgpackFormatter(w io.Writer) *msgpackFormatter { + mf := &msgpackFormatter{} + mf.enc = msgp.NewWriter(writer{Writer: w, n: &mf.n}) + return mf +} + +func (w *msgpackFormatter) WriteResponse(resp Response) (n int, err error) { + w.n = 0 + + w.enc.WriteMapHeader(1) + w.enc.WriteString("results") + + w.enc.WriteArrayHeader(uint32(len(resp.Results))) + for _, result := range resp.Results { + w.enc.WriteMapHeader(1) + w.enc.WriteString("series") + + w.enc.WriteArrayHeader(uint32(len(result.Series))) + for _, row := range result.Series { + err = row.EncodeMsg(w.enc) + if err != nil { + return 0, err + } + } + } + err = w.enc.Flush() + return w.n, err +} + +type writer struct { + io.Writer + n *int +} + +func (w writer) Write(data []byte) (n int, err error) { + n, err = w.Writer.Write(data) + *w.n += n + return n, err +} diff --git a/services/httpd/response_writer_test.go b/services/httpd/response_writer_test.go new file mode 100644 index 00000000000..06f8b3b7042 --- /dev/null +++ b/services/httpd/response_writer_test.go @@ -0,0 +1,92 @@ +package httpd_test + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/services/httpd" +) + +// discard is an http.ResponseWriter that discards all output. +type discard struct{} + +func (discard) Header() http.Header { return http.Header{} } +func (discard) WriteHeader(int) {} +func (discard) Write(data []byte) (int, error) { return len(data), nil } + +func BenchmarkJSONResponseWriter_1K(b *testing.B) { + benchmarkResponseWriter(b, "application/json", 10, 100) +} +func BenchmarkJSONResponseWriter_100K(b *testing.B) { + benchmarkResponseWriter(b, "application/json", 1000, 100) +} +func BenchmarkJSONResponseWriter_1M(b *testing.B) { + benchmarkResponseWriter(b, "application/json", 10000, 100) +} + +func BenchmarkMsgpackResponseWriter_1K(b *testing.B) { + benchmarkResponseWriter(b, "application/x-msgpack", 10, 100) +} +func BenchmarkMsgpackResponseWriter_100K(b *testing.B) { + benchmarkResponseWriter(b, "application/x-msgpack", 1000, 100) +} +func BenchmarkMsgpackResponseWriter_1M(b *testing.B) { + benchmarkResponseWriter(b, "application/x-msgpack", 10000, 100) +} + +func BenchmarkCSVResponseWriter_1K(b *testing.B) { + benchmarkResponseWriter(b, "text/csv", 10, 100) +} +func BenchmarkCSVResponseWriter_100K(b *testing.B) { + benchmarkResponseWriter(b, "text/csv", 1000, 100) +} +func BenchmarkCSVResponseWriter_1M(b *testing.B) { + benchmarkResponseWriter(b, "text/csv", 10000, 100) +} + +func benchmarkResponseWriter(b *testing.B, contentType string, seriesN, pointsPerSeriesN int) { + r, err := http.NewRequest("POST", "/query", nil) + if err != nil { + b.Fatal(err) + } + r.Header.Set("Accept", contentType) + + // Generate a sample result. + rows := make(models.Rows, 0, seriesN) + for i := 0; i < seriesN; i++ { + row := &models.Row{ + Name: "cpu", + Tags: map[string]string{ + "host": fmt.Sprintf("server-%d", i), + }, + Columns: []string{"time", "value"}, + Values: make([][]interface{}, 0, b.N), + } + + for j := 0; j < pointsPerSeriesN; j++ { + row.Values = append(row.Values, []interface{}{ + time.Unix(int64(10*j), 0), + float64(100), + }) + } + rows = append(rows, row) + } + result := &influxql.Result{Series: rows} + + // Create new ResponseWriter with the underlying ResponseWriter + // being the discard writer so we only benchmark the marshaling. + w := httpd.NewResponseWriter(discard{}, r) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + w.WriteResponse(httpd.Response{ + Results: []*influxql.Result{result}, + }) + } +}