Skip to content

Commit

Permalink
feat: reject filter queries to /patterns endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed May 23, 2024
1 parent dc620e7 commit f0d6a92
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 40 deletions.
8 changes: 4 additions & 4 deletions pkg/logql/range_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,20 @@ func NewBatchRangeVectorIterator(
end: end,
current: start - step, // first loop iteration will set it to start
offset: offset,
metrics: map[string]labels.Labels{},
window: map[string]*promql.Series{},
metrics: map[string]labels.Labels{},
window: map[string]*promql.Series{},
agg: agg,
}
}

func (r *batchRangeVectorIterator) Next() bool {
// slides the range window to the next position
r.current = r.current + r.step // first current will be 5 min before start
r.current = r.current + r.step
if r.current > r.end {
return false
}
rangeEnd := r.current
rangeStart := rangeEnd - r.selRange // in nanoseconds
rangeStart := rangeEnd - r.selRange
// load samples
r.popBack(rangeStart)
r.load(rangeStart, rangeEnd)
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ type Loki struct {
distributor *distributor.Distributor
Ingester ingester.Interface
PatternIngester *pattern.Ingester
PatternRingClient *pattern.RingClient
PatternRingClient pattern.RingClient
Querier querier.Querier
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
querierAPI *querier.QuerierAPI
Expand Down
28 changes: 18 additions & 10 deletions pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"
"errors"
"math"
"net/http"

"github.com/go-kit/log"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"

Expand All @@ -26,14 +24,14 @@ type IngesterQuerier struct {
cfg Config
logger log.Logger

ringClient *RingClient
ringClient RingClient

registerer prometheus.Registerer
}

func NewIngesterQuerier(
cfg Config,
ringClient *RingClient,
ringClient RingClient,
metricsNamespace string,
registerer prometheus.Registerer,
logger log.Logger,
Expand All @@ -48,21 +46,31 @@ func NewIngesterQuerier(

func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) {
// validate that a supported query was provided
// TODO(twhitney): validate metric queries don't have filters
var expr syntax.Expr
_, err := syntax.ParseMatchers(req.Query, true)
if err != nil {
expr, err = syntax.ParseSampleExpr(req.Query)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, ErrParseQuery.Error())
return nil, ErrParseQuery
}

var selector syntax.LogSelectorExpr
switch expr.(type) {

Check failure on line 58 in pkg/pattern/ingester_querier.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1034: assigning the result of this type assertion to a variable (switch expr := expr.(type)) could eliminate type assertions in switch cases (gosimple)
case *syntax.VectorAggregationExpr, *syntax.RangeAggregationExpr:
break
case *syntax.VectorAggregationExpr:
selector, err = expr.(*syntax.VectorAggregationExpr).Selector()

Check failure on line 60 in pkg/pattern/ingester_querier.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1034(related information): could eliminate this type assertion (gosimple)
case *syntax.RangeAggregationExpr:
selector, err = expr.(*syntax.RangeAggregationExpr).Selector()

Check failure on line 62 in pkg/pattern/ingester_querier.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1034(related information): could eliminate this type assertion (gosimple)
default:
return nil, ErrParseQuery
}

if err != nil {
return nil, err
}

if selector == nil || selector.HasFilter() {
return nil, ErrParseQuery
}
}

resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.PatternClient) (interface{}, error) {
Expand Down Expand Up @@ -118,7 +126,7 @@ func prunePatterns(

// ForAllIngesters runs f, in parallel, for all ingesters
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
replicationSet, err := q.ringClient.ring.GetReplicationSetForOperation(ring.Read)
replicationSet, err := q.ringClient.Ring().GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, err
}
Expand All @@ -137,7 +145,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
// Nothing here
}
results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (ResponseFromIngesters, error) {
client, err := q.ringClient.pool.GetClientFor(ingester.Addr)
client, err := q.ringClient.Pool().GetClientFor(ingester.Addr)
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}
Expand Down
158 changes: 158 additions & 0 deletions pkg/pattern/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package pattern

import (
"bufio"
"context"
"os"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/loki/v3/pkg/logproto"
)

Expand Down Expand Up @@ -39,3 +45,155 @@ func Test_prunePatterns(t *testing.T) {

require.Equal(t, expectedPatterns, patterns)
}

func Test_Patterns(t *testing.T) {
t.Run("it rejects metric queries with filters", func(t *testing.T) {
q := &IngesterQuerier{
cfg: Config{},
logger: log.NewNopLogger(),
ringClient: &fakeRingClient{},
registerer: nil,
}
for _, query := range []string{
`count_over_time({foo="bar"} |= "baz" [5m])`,
`count_over_time({foo="bar"} != "baz" [5m])`,
`count_over_time({foo="bar"} =~ "baz" [5m])`,
`count_over_time({foo="bar"} !~ "baz" [5m])`,
`count_over_time({foo="bar"} | logfmt | color=blue [5m])`,
`sum(count_over_time({foo="bar"} |= "baz" [5m]))`,
`sum by label(count_over_time({foo="bar"} |= "baz" [5m]))`,
`bytes_over_time({foo="bar"} |= "baz" [5m])`,
} {
_, err := q.Patterns(
context.Background(),
&logproto.QueryPatternsRequest{
Query: query,
},
)
require.Error(t, err, query)
require.ErrorIs(t, err, ErrParseQuery)

}
})

t.Run("accepts log selector queries and count and bytes metric queries", func(t *testing.T) {
q := &IngesterQuerier{
cfg: Config{},
logger: log.NewNopLogger(),
ringClient: &fakeRingClient{},
registerer: nil,
}
for _, query := range []string{
`{foo="bar"}`,
`count_over_time({foo="bar"}[5m])`,
`bytes_over_time({foo="bar"}[5m])`,
`sum(count_over_time({foo="bar"}[5m]))`,
`sum(bytes_over_time({foo="bar"}[5m]))`,
`sum by (level)(count_over_time({foo="bar"}[5m]))`,
`sum by (level)(bytes_over_time({foo="bar"}[5m]))`,
} {
_, err := q.Patterns(
context.Background(),
&logproto.QueryPatternsRequest{
Query: query,
},
)
require.NoError(t, err, query)
}
})
}

type fakeRingClient struct{}

func (f *fakeRingClient) Pool() *ring_client.Pool {
panic("not implemented")
}

func (f *fakeRingClient) StartAsync(ctx context.Context) error {

Check warning on line 112 in pkg/pattern/ingester_querier_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
panic("not implemented")
}

func (f *fakeRingClient) AwaitRunning(ctx context.Context) error {

Check warning on line 116 in pkg/pattern/ingester_querier_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
panic("not implemented")
}

func (f *fakeRingClient) StopAsync() {
panic("not implemented")
}

func (f *fakeRingClient) AwaitTerminated(ctx context.Context) error {
panic("not implemented")
}

func (f *fakeRingClient) FailureCase() error {
panic("not implemented")
}

func (f *fakeRingClient) State() services.State {
panic("not implemented")
}

func (f *fakeRingClient) AddListener(listener services.Listener) {

Check warning on line 136 in pkg/pattern/ingester_querier_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'listener' seems to be unused, consider removing or renaming it as _ (revive)
panic("not implemented")
}

func (f *fakeRingClient) Ring() ring.ReadRing {
return &fakeRing{}
}

type fakeRing struct{}

func (f *fakeRing) Get(
key uint32,

Check warning on line 147 in pkg/pattern/ingester_querier_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'key' seems to be unused, consider removing or renaming it as _ (revive)
op ring.Operation,
bufDescs []ring.InstanceDesc,
bufHosts []string,
bufZones []string,
) (ring.ReplicationSet, error) {
panic("not implemented")
}

func (f *fakeRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) {
panic("not implemented")
}

func (f *fakeRing) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{}, nil
}

func (f *fakeRing) ReplicationFactor() int {
panic("not implemented")
}

func (f *fakeRing) InstancesCount() int {
panic("not implemented")
}

func (f *fakeRing) ShuffleShard(identifier string, size int) ring.ReadRing {
panic("not implemented")
}

func (f *fakeRing) GetInstanceState(instanceID string) (ring.InstanceState, error) {
panic("not implemented")
}

func (f *fakeRing) ShuffleShardWithLookback(
identifier string,
size int,
lookbackPeriod time.Duration,
now time.Time,
) ring.ReadRing {
panic("not implemented")
}

func (f *fakeRing) HasInstance(instanceID string) bool {
panic("not implemented")
}

func (f *fakeRing) CleanupShuffleShardCache(identifier string) {
panic("not implemented")
}

func (f *fakeRing) GetTokenRangesForInstance(instanceID string) (ring.TokenRanges, error) {
panic("not implemented")
}
4 changes: 2 additions & 2 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestInstancePushQuery(t *testing.T) {

t.Run("test count_over_time samples", func(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger())
inst, err := newInstance("foo", log.NewNopLogger(), nil)
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestInstancePushQuery(t *testing.T) {

t.Run("test bytes_over_time samples", func(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger())
inst, err := newInstance("foo", log.NewNopLogger(), nil)
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestInstance_QuerySample(t *testing.T) {
Step: oneMin,
}

instance, err := newInstance("test", log.NewNopLogger())
instance, err := newInstance("test", log.NewNopLogger(), nil)
require.NoError(t, err)

labels := model.LabelSet{
Expand Down
18 changes: 11 additions & 7 deletions pkg/pattern/metric/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -48,9 +49,9 @@ func ExtractMetricType(expr syntax.SampleExpr) (MetricType, error) {
}

type SampleEvaluatorFactory interface {
// NewStepEvaluator returns a NewStepEvaluator for a given SampleExpr.
// It's explicitly passed another NewStepEvaluator
// in order to enable arbitrary computation of embedded expressions. This allows more modular & extensible
// NewStepEvaluator returns a NewStepEvaluator for a given SampleExpr.
// It's explicitly passed another NewStepEvaluator
// in order to enable arbitrary computation of embedded expressions. This allows more modular & extensible
// NewStepEvaluator implementations which can be composed.
NewStepEvaluator(
ctx context.Context,
Expand Down Expand Up @@ -161,7 +162,7 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator(
}
}

// Need to create our own StepEvaluator since we only support bytes and count over time,
// Need to create our own StepEvaluator since we only support bytes and count over time,
// and always sum to get those values. In order to accomplish this we need control over the
// aggregation operation..
func NewPatternSampleRangeAggEvaluator(
Expand Down Expand Up @@ -199,7 +200,7 @@ func newRangeVectorIterator(
// TODO(twhitney): do I need a streaming aggregator?
// if so the aggregator is going to make this
// a bit of a bad time, as there's currently no
// way to provide a custom one.
// way to provide a custom one.
//
// var overlap bool
// if selRange >= step && start != end {
Expand All @@ -222,7 +223,7 @@ func newRangeVectorIterator(
// }, nil
// }

// always sum
// always sum
aggregator := logql.BatchRangeVectorAggregator(func(samples []promql.FPoint) float64 {
sum := 0.0
for _, v := range samples {
Expand Down Expand Up @@ -263,7 +264,6 @@ func (s *SeriesToSampleIterator) Next() bool {

current, rest := s.floats[0], s.floats[1:]

//Is timestamp the correct unit here?
s.curTs = current.T
s.cur = current.F

Expand Down Expand Up @@ -352,3 +352,7 @@ func (p *paramCompat) GetExpression() syntax.Expr {
func (p *paramCompat) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p *paramCompat) CachingOptions() (res resultscache.CachingOptions) {
return
}
Loading

0 comments on commit f0d6a92

Please sign in to comment.