Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support post requests in the frontend queryrange handler. #2023

Merged
merged 2 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 71 additions & 45 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package queryrange
import (
"flag"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/cortexproject/cortex/pkg/chunk/cache"
Expand All @@ -17,6 +15,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
)

Expand Down Expand Up @@ -55,60 +54,87 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet
return func(next http.RoundTripper) http.RoundTripper {
metricRT := metricsTripperware(next)
logFilterRT := logFilterTripperware(next)
return frontend.RoundTripFunc(func(req *http.Request) (*http.Response, error) {
if !strings.HasSuffix(req.URL.Path, "/query_range") && !strings.HasSuffix(req.URL.Path, "/prom/query") {
return next.RoundTrip(req)
}
params := req.URL.Query()
query := params.Get("query")
expr, err := logql.ParseExpr(query)
if err != nil {
// weavework server uses httpgrpc errors for status code.
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

if _, ok := expr.(logql.SampleExpr); ok {
return metricRT.RoundTrip(req)
}
if logSelector, ok := expr.(logql.LogSelectorExpr); ok {
if err := validateLimits(req, params, limits); err != nil {
return nil, err
}

// backport the old regexp params into the query params
regexp := params.Get("regexp")
if regexp != "" {
logSelector = logql.NewFilterExpr(logSelector, labels.MatchRegexp, regexp)
params.Set("query", logSelector.String())
req.URL.RawQuery = params.Encode()
}
filter, err := logSelector.Filter()
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
if filter != nil {
return logFilterRT.RoundTrip(req)
}
}
return next.RoundTrip(req)
})
return newRoundTripper(next, logFilterRT, metricRT, limits)
}, cache, nil
}

// validates log entries limits
func validateLimits(req *http.Request, params url.Values, limits Limits) error {
userID, err := user.ExtractOrgID(req.Context())
type roundTripper struct {
next, log, metric http.RoundTripper

limits Limits
}

// newRoundTripper creates a new queryrange roundtripper
func newRoundTripper(next, log, metric http.RoundTripper, limits Limits) roundTripper {
return roundTripper{
log: log,
limits: limits,
metric: metric,
next: next,
}
}

func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if !strings.HasSuffix(req.URL.Path, "/query_range") && !strings.HasSuffix(req.URL.Path, "/prom/query") {
return r.next.RoundTrip(req)
}
err := req.ParseForm()
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
rangeQuery, err := loghttp.ParseRangeQuery(req)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
expr, err := logql.ParseExpr(rangeQuery.Query)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
switch e := expr.(type) {
case logql.SampleExpr:
return r.metric.RoundTrip(req)
case logql.LogSelectorExpr:
filter, err := transformRegexQuery(req, e).Filter()
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil {
return nil, err
}
if filter == nil {
return r.next.RoundTrip(req)
}
return r.log.RoundTrip(req)

reqLimit, err := strconv.Atoi(params.Get("limit"))
default:
return r.next.RoundTrip(req)
}
}

// transformRegexQuery backport the old regexp params into the v1 query format
func transformRegexQuery(req *http.Request, expr logql.LogSelectorExpr) logql.LogSelectorExpr {
regexp := req.Form.Get("regexp")
if regexp != "" {
expr = logql.NewFilterExpr(expr, labels.MatchRegexp, regexp)
params := req.URL.Query()
params.Set("query", expr.String())
req.URL.RawQuery = params.Encode()
// force the form and query to be parsed again.
req.Form = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit scared of setting this to nil here. I'm afraid it exposes us to issues extending this in the future -- it's easy to think the form would be parsed already. That being said, I think we should merge this. It's just something we should think about improving.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't have a solution yet for that. The problem is that ParseForm does nothing if it's already populated and we need to update the querystring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove this a test will fail.

req.PostForm = nil
}
return expr
}

// validates log entries limits
func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error {
userID, err := user.ExtractOrgID(req.Context())
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

maxEntriesLimit := limits.MaxEntriesLimitPerQuery(userID)
if reqLimit > maxEntriesLimit && maxEntriesLimit != 0 {
if int(reqLimit) > maxEntriesLimit && maxEntriesLimit != 0 {
return httpgrpc.Errorf(http.StatusBadRequest,
"max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", reqLimit, maxEntriesLimit)
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package queryrange

import (
"bytes"
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"sync"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -287,6 +291,34 @@ func TestRegexpParamsSupport(t *testing.T) {
require.NoError(t, err)
}

func TestPostQueries(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "/loki/api/v1/query_range", nil)
data := url.Values{
"query": {`{app="foo"} |~ "foo"`},
}
body := bytes.NewBufferString(data.Encode())
req.Body = ioutil.NopCloser(body)
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))
req = req.WithContext(user.InjectOrgID(context.Background(), "1"))
require.NoError(t, err)
_, err = newRoundTripper(
frontend.RoundTripFunc(func(*http.Request) (*http.Response, error) {
t.Error("unexpected default roundtripper called")
return nil, nil
}),
frontend.RoundTripFunc(func(*http.Request) (*http.Response, error) {
return nil, nil
}),
frontend.RoundTripFunc(func(*http.Request) (*http.Response, error) {
t.Error("unexpected metric roundtripper called")
return nil, nil
}),
fakeLimits{},
).RoundTrip(req)
require.NoError(t, err)
}

func TestEntriesLimitsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, nil)
if stopper != nil {
Expand Down