Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly track the response bytes written for queries in all format types #9696

Merged
merged 1 commit into from
Apr 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 66 additions & 62 deletions services/httpd/response_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter {
switch r.Header.Get("Accept") {
case "application/csv", "text/csv":
w.Header().Add("Content-Type", "text/csv")
rw.formatter = &csvFormatter{statementID: -1, Writer: w}
rw.formatter = &csvFormatter{statementID: -1}
case "application/x-msgpack":
w.Header().Add("Content-Type", "application/x-msgpack")
rw.formatter = &msgpackFormatter{Writer: w}
rw.formatter = &msgpackFormatter{}
case "application/json":
fallthrough
default:
w.Header().Add("Content-Type", "application/json")
rw.formatter = &jsonFormatter{Pretty: pretty, Writer: w}
rw.formatter = &jsonFormatter{Pretty: pretty}
}
return rw
}
Expand All @@ -49,14 +49,27 @@ func WriteError(w ResponseWriter, err error) (int, error) {
// responseWriter is an implementation of ResponseWriter.
type responseWriter struct {
formatter interface {
WriteResponse(resp Response) (int, error)
WriteResponse(w io.Writer, resp Response) error
}
http.ResponseWriter
}

type bytesCountWriter struct {
w io.Writer
n int
}

func (w *bytesCountWriter) Write(data []byte) (int, error) {
n, err := w.w.Write(data)
w.n += n
return n, err
}

// WriteResponse writes the response using the formatter.
func (w *responseWriter) WriteResponse(resp Response) (int, error) {
return w.formatter.WriteResponse(resp)
writer := bytesCountWriter{w: w.ResponseWriter}
err := w.formatter.WriteResponse(&writer, resp)
return writer.n, err
}

// Flush flushes the ResponseWriter if it has a Flush() method.
Expand All @@ -76,74 +89,69 @@ func (w *responseWriter) CloseNotify() <-chan bool {
}

type jsonFormatter struct {
io.Writer
Pretty bool
}

func (w *jsonFormatter) WriteResponse(resp Response) (n int, err error) {
func (f *jsonFormatter) WriteResponse(w io.Writer, resp Response) (err error) {
var b []byte
if w.Pretty {
if f.Pretty {
b, err = json.MarshalIndent(resp, "", " ")
} else {
b, err = json.Marshal(resp)
}

if err != nil {
n, err = io.WriteString(w, err.Error())
_, err = io.WriteString(w, err.Error())
} else {
n, err = w.Write(b)
_, err = w.Write(b)
}

w.Write([]byte("\n"))
n++
return n, err
return err
}

type csvFormatter struct {
io.Writer
statementID int
columns []string
}

func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) {
func (f *csvFormatter) WriteResponse(w io.Writer, resp Response) (err error) {
csv := csv.NewWriter(w)
if resp.Err != nil {
csv.Write([]string{"error"})
csv.Write([]string{resp.Err.Error()})
csv.Flush()
return n, csv.Error()
return csv.Error()
}

for _, result := range resp.Results {
if result.StatementID != w.statementID {
if result.StatementID != f.statementID {
// If there are no series in the result, skip past this result.
if len(result.Series) == 0 {
continue
}

// Set the statement id and print out a newline if this is not the first statement.
if w.statementID >= 0 {
if f.statementID >= 0 {
// Flush the csv writer and write a newline.
csv.Flush()
if err := csv.Error(); err != nil {
return n, err
return err
}

out, err := io.WriteString(w, "\n")
if err != nil {
return n, err
if _, err := io.WriteString(w, "\n"); err != nil {
return err
}
n += out
}
w.statementID = result.StatementID
f.statementID = result.StatementID

// Print out the column headers from the first series.
w.columns = make([]string, 2+len(result.Series[0].Columns))
w.columns[0] = "name"
w.columns[1] = "tags"
copy(w.columns[2:], result.Series[0].Columns)
if err := csv.Write(w.columns); err != nil {
return n, err
f.columns = make([]string, 2+len(result.Series[0].Columns))
f.columns[0] = "name"
f.columns[1] = "tags"
copy(f.columns[2:], result.Series[0].Columns)
if err := csv.Write(f.columns); err != nil {
return err
}
}

Expand All @@ -152,83 +160,79 @@ func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) {
// The columns have changed. Print a newline and reprint the header.
csv.Flush()
if err := csv.Error(); err != nil {
return n, err
return err
}

out, err := io.WriteString(w, "\n")
if err != nil {
return n, err
if _, err := io.WriteString(w, "\n"); err != nil {
return err
}
n += out

w.columns = make([]string, 2+len(row.Columns))
w.columns[0] = "name"
w.columns[1] = "tags"
copy(w.columns[2:], row.Columns)
if err := csv.Write(w.columns); err != nil {
return n, err

f.columns = make([]string, 2+len(row.Columns))
f.columns[0] = "name"
f.columns[1] = "tags"
copy(f.columns[2:], row.Columns)
if err := csv.Write(f.columns); err != nil {
return err
}
}

w.columns[0] = row.Name
f.columns[0] = row.Name
if len(row.Tags) > 0 {
w.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:])
f.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:])
} else {
w.columns[1] = ""
f.columns[1] = ""
}
for _, values := range row.Values {
for i, value := range values {
if value == nil {
w.columns[i+2] = ""
f.columns[i+2] = ""
continue
}

switch v := value.(type) {
case float64:
w.columns[i+2] = strconv.FormatFloat(v, 'f', -1, 64)
f.columns[i+2] = strconv.FormatFloat(v, 'f', -1, 64)
case int64:
w.columns[i+2] = strconv.FormatInt(v, 10)
f.columns[i+2] = strconv.FormatInt(v, 10)
case uint64:
w.columns[i+2] = strconv.FormatUint(v, 10)
f.columns[i+2] = strconv.FormatUint(v, 10)
case string:
w.columns[i+2] = v
f.columns[i+2] = v
case bool:
if v {
w.columns[i+2] = "true"
f.columns[i+2] = "true"
} else {
w.columns[i+2] = "false"
f.columns[i+2] = "false"
}
case time.Time:
w.columns[i+2] = strconv.FormatInt(v.UnixNano(), 10)
f.columns[i+2] = strconv.FormatInt(v.UnixNano(), 10)
case *float64, *int64, *string, *bool:
w.columns[i+2] = ""
f.columns[i+2] = ""
}
}
csv.Write(w.columns)
csv.Write(f.columns)
}
}
}
csv.Flush()
return n, csv.Error()
return csv.Error()
}

type msgpackFormatter struct {
io.Writer
}
type msgpackFormatter struct{}

func (f *msgpackFormatter) ContentType() string {
return "application/x-msgpack"
}

func (f *msgpackFormatter) WriteResponse(resp Response) (n int, err error) {
enc := msgp.NewWriter(f.Writer)
func (f *msgpackFormatter) WriteResponse(w io.Writer, resp Response) (err error) {
enc := msgp.NewWriter(w)
defer enc.Flush()

enc.WriteMapHeader(1)
if resp.Err != nil {
enc.WriteString("error")
enc.WriteString(resp.Err.Error())
return 0, nil
return nil
} else {
enc.WriteString("results")
enc.WriteArrayHeader(uint32(len(resp.Results)))
Expand Down Expand Up @@ -311,7 +315,7 @@ func (f *msgpackFormatter) WriteResponse(resp Response) (n int, err error) {
}
}
}
return 0, nil
return nil
}

func stringsEqual(a, b []string) bool {
Expand Down
16 changes: 12 additions & 4 deletions services/httpd/response_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestResponseWriter_CSV(t *testing.T) {
w := httptest.NewRecorder()

writer := httpd.NewResponseWriter(w, r)
writer.WriteResponse(httpd.Response{
n, err := writer.WriteResponse(httpd.Response{
Results: []*query.Result{
{
StatementID: 0,
Expand All @@ -54,6 +54,9 @@ func TestResponseWriter_CSV(t *testing.T) {
},
},
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

if got, want := w.Body.String(), `name,tags,time,value
cpu,"host=server01,region=uswest",10,2.5
Expand All @@ -65,6 +68,8 @@ cpu,"host=server01,region=uswest",60,false
cpu,"host=server01,region=uswest",70,9223372036854775808
`; got != want {
t.Errorf("unexpected output:\n\ngot=%v\nwant=%s", got, want)
} else if got, want := n, len(want); got != want {
t.Errorf("unexpected output length: got=%d want=%d", got, want)
}
}

Expand All @@ -78,7 +83,7 @@ func TestResponseWriter_MessagePack(t *testing.T) {
w := httptest.NewRecorder()

writer := httpd.NewResponseWriter(w, r)
writer.WriteResponse(httpd.Response{
_, err := writer.WriteResponse(httpd.Response{
Results: []*query.Result{
{
StatementID: 0,
Expand All @@ -103,6 +108,9 @@ func TestResponseWriter_MessagePack(t *testing.T) {
},
},
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

// The reader always reads times as time.Local so encode the expected response
// as JSON and insert it into the expected values.
Expand All @@ -125,8 +133,8 @@ func TestResponseWriter_MessagePack(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
want := fmt.Sprintf(`{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":%s}]}]}`, string(values))
if have := strings.TrimSpace(buf.String()); have != want {
t.Fatalf("unexpected output: %s != %s", have, want)
if got := strings.TrimSpace(buf.String()); got != want {
t.Fatalf("unexpected output:\n\ngot=%v\nwant=%v", got, want)
}
}

Expand Down