Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant code the from response formatter and fix CloseNotify #7163

Merged
merged 2 commits into from
Aug 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,12 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
resp := Response{Results: make([]*influxql.Result, 0)}

// Status header is OK once this point is reached.
// Attempt to flush the header immediately so the client gets the header information
// and knows the query was accepted.
h.writeHeader(rw, http.StatusOK)
if w, ok := w.(http.Flusher); ok {
w.Flush()
}

// pull all results from the channel
rows := 0
Expand All @@ -460,7 +465,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
n, _ := rw.WriteResponse(Response{
Results: []*influxql.Result{r},
})
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n+1))
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
w.(http.Flusher).Flush()
continue
}
Expand Down Expand Up @@ -940,6 +945,9 @@ func (w gzipResponseWriter) Write(b []byte) (int, error) {

func (w gzipResponseWriter) Flush() {
w.Writer.(*gzip.Writer).Flush()
if w, ok := w.ResponseWriter.(http.Flusher); ok {
w.Flush()
}
}

func (w gzipResponseWriter) CloseNotify() <-chan bool {
Expand Down
68 changes: 68 additions & 0 deletions services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"strings"
"testing"
Expand Down Expand Up @@ -365,6 +366,73 @@ func TestHandler_Query_ErrResult(t *testing.T) {
}
}

// Ensure that closing the HTTP connection causes the query to be interrupted.
func TestHandler_Query_CloseNotify(t *testing.T) {
// Avoid leaking a goroutine when this fails.
done := make(chan struct{})
defer close(done)

interrupted := make(chan struct{})
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
select {
case <-ctx.InterruptCh:
case <-done:
}
close(interrupted)
return nil
}

s := httptest.NewServer(h)
defer s.Close()

// Parse the URL and generate a query request.
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}
u.Path = "/query"

values := url.Values{}
values.Set("q", "SELECT * FROM cpu")
values.Set("db", "db0")
values.Set("rp", "rp0")
values.Set("chunked", "true")
u.RawQuery = values.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
t.Fatal(err)
}

// Perform the request and retrieve the response.
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}

// Validate that the interrupted channel has NOT been closed yet.
timer := time.NewTimer(100 * time.Millisecond)
select {
case <-interrupted:
timer.Stop()
t.Fatal("query interrupted unexpectedly")
case <-timer.C:
}

// Close the response body which should abort the query in the handler.
resp.Body.Close()

// The query should abort within 100 milliseconds.
timer.Reset(100 * time.Millisecond)
select {
case <-interrupted:
timer.Stop()
case <-timer.C:
t.Fatal("timeout while waiting for query to abort")
}
}

// Ensure the handler handles ping requests correctly.
// TODO: This should be expanded to verify the MetaClient check in servePing is working correctly
func TestHandler_Ping(t *testing.T) {
Expand Down
46 changes: 35 additions & 11 deletions services/httpd/response_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,57 @@ type ResponseWriter interface {
// in the request that wraps the ResponseWriter.
func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter {
pretty := r.URL.Query().Get("pretty") == "true"
rw := &responseWriter{ResponseWriter: w}
switch r.Header.Get("Accept") {
case "application/json":
fallthrough
default:
w.Header().Add("Content-Type", "application/json")
return &jsonResponseWriter{Pretty: pretty, ResponseWriter: w}
rw.formatter = &jsonFormatter{Pretty: pretty, Writer: w}
}
return rw
}

// WriteError is a convenience function for writing an error response to the ResponseWriter.
func WriteError(w ResponseWriter, err error) (int, error) {
return w.WriteResponse(Response{Err: err})
}

type jsonResponseWriter struct {
Pretty bool
// responseWriter is an implementation of ResponseWriter.
type responseWriter struct {
formatter interface {
WriteResponse(resp Response) (int, error)
}
http.ResponseWriter
}

func (w *jsonResponseWriter) WriteResponse(resp Response) (n int, err error) {
// WriteResponse writes the response using the formatter.
func (w *responseWriter) WriteResponse(resp Response) (int, error) {
return w.formatter.WriteResponse(resp)
}

// Flush flushes the ResponseWriter if it has a Flush() method.
func (w *responseWriter) Flush() {
if w, ok := w.ResponseWriter.(http.Flusher); ok {
w.Flush()
}
}

// CloseNotify calls CloseNotify on the underlying http.ResponseWriter if it
// exists. Otherwise, it returns a nil channel that will never notify.
func (w *responseWriter) CloseNotify() <-chan bool {
if notifier, ok := w.ResponseWriter.(http.CloseNotifier); ok {
return notifier.CloseNotify()
}
return nil
}

type jsonFormatter struct {
io.Writer
Pretty bool
}

func (w *jsonFormatter) WriteResponse(resp Response) (n int, err error) {
var b []byte
if w.Pretty {
b, err = json.MarshalIndent(resp, "", " ")
Expand All @@ -55,10 +86,3 @@ func (w *jsonResponseWriter) WriteResponse(resp Response) (n int, err error) {
n++
return n, err
}

// Flush flushes the ResponseWriter if it has a Flush() method.
func (w *jsonResponseWriter) Flush() {
if w, ok := w.ResponseWriter.(http.Flusher); ok {
w.Flush()
}
}