Skip to content

Commit

Permalink
Mimir query engine: finish reorganisation started in #8230 (#8247)
Browse files Browse the repository at this point in the history
* Move interface definitions to `types` package

* Move operators to `operators` package

* Add changelog entry
  • Loading branch information
charleskorn authored Jun 4, 2024
1 parent 960f224 commit 006ee9c
Show file tree
Hide file tree
Showing 17 changed files with 63 additions and 65 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ flowchart TB
max --> output
```

Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./operator/operator.go).
Each of these operators satisfies the `InstantVectorOperator` interface, defined [here](./types/operator.go).
The two key methods of this interface are `SeriesMetadata()` and `NextSeries()`:

`SeriesMetadata()` returns the list of all series' labels that will be returned by the operator[^2].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -22,7 +22,7 @@ import (
)

type Aggregation struct {
Inner InstantVectorOperator
Inner types.InstantVectorOperator
Start time.Time
End time.Time
Interval time.Duration
Expand Down Expand Up @@ -51,7 +51,7 @@ type group struct {
present []bool
}

var _ InstantVectorOperator = &Aggregation{}
var _ types.InstantVectorOperator = &Aggregation{}

var groupPool = zeropool.New(func() *group {
return &group{}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (a *Aggregation) labelsForGroup(m labels.Labels, lb *labels.Builder) labels
func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
if len(a.remainingGroups) == 0 {
// No more groups left.
return types.InstantVectorSeriesData{}, EOS
return types.InstantVectorSeriesData{}, types.EOS
}

start := timestamp.FromTime(a.Start)
Expand All @@ -142,7 +142,7 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries
s, err := a.Inner.NextSeries(ctx)

if err != nil {
if errors.Is(err, EOS) {
if errors.Is(err, types.EOS) {
return types.InstantVectorSeriesData{}, fmt.Errorf("exhausted series before all groups were completed: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -25,8 +25,8 @@ import (

// BinaryOperation represents a binary operation between instant vectors such as "<expr> + <expr>" or "<expr> - <expr>".
type BinaryOperation struct {
Left InstantVectorOperator
Right InstantVectorOperator
Left types.InstantVectorOperator
Right types.InstantVectorOperator
Op parser.ItemType
Pool *pooling.LimitingPool

Expand All @@ -45,7 +45,7 @@ type BinaryOperation struct {
opFunc binaryOperationFunc
}

var _ InstantVectorOperator = &BinaryOperation{}
var _ types.InstantVectorOperator = &BinaryOperation{}

type binaryOperationOutputSeries struct {
leftSeriesIndices []int
Expand All @@ -66,7 +66,7 @@ func (s binaryOperationOutputSeries) latestRightSeries() int {
return s.rightSeriesIndices[len(s.rightSeriesIndices)-1]
}

func NewBinaryOperation(left InstantVectorOperator, right InstantVectorOperator, vectorMatching parser.VectorMatching, op parser.ItemType, pool *pooling.LimitingPool) (*BinaryOperation, error) {
func NewBinaryOperation(left types.InstantVectorOperator, right types.InstantVectorOperator, vectorMatching parser.VectorMatching, op parser.ItemType, pool *pooling.LimitingPool) (*BinaryOperation, error) {
opFunc := arithmeticOperationFuncs[op]
if opFunc == nil {
return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op))
Expand Down Expand Up @@ -331,7 +331,7 @@ func (b *BinaryOperation) labelsFunc() func(labels.Labels) labels.Labels {

func (b *BinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
if len(b.remainingSeries) == 0 {
return types.InstantVectorSeriesData{}, EOS
return types.InstantVectorSeriesData{}, types.EOS
}

thisSeries := b.remainingSeries[0]
Expand Down Expand Up @@ -545,7 +545,7 @@ func (b *BinaryOperation) Close() {
// binary operation are in order B, A, C, binaryOperationSeriesBuffer will buffer the data for series A while series B is
// produced, then return series A when needed.
type binaryOperationSeriesBuffer struct {
source InstantVectorOperator
source types.InstantVectorOperator
nextIndexToRead int

// If seriesUsed[i] == true, then the series at index i is needed for this operation and should be buffered if not used immediately.
Expand All @@ -562,7 +562,7 @@ type binaryOperationSeriesBuffer struct {
output []types.InstantVectorSeriesData
}

func newBinaryOperationSeriesBuffer(source InstantVectorOperator, seriesUsed []bool, pool *pooling.LimitingPool) *binaryOperationSeriesBuffer {
func newBinaryOperationSeriesBuffer(source types.InstantVectorOperator, seriesUsed []bool, pool *pooling.LimitingPool) *binaryOperationSeriesBuffer {
return &binaryOperationSeriesBuffer{
source: source,
seriesUsed: seriesUsed,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -29,7 +29,7 @@ type InstantVectorSelector struct {
memoizedIterator *storage.MemoizedSeriesIterator
}

var _ InstantVectorOperator = &InstantVectorSelector{}
var _ types.InstantVectorOperator = &InstantVectorSelector{}

func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
// Compute value we need on every call to NextSeries() once, here.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"context"
Expand All @@ -22,7 +22,7 @@ func (t *testOperator) SeriesMetadata(_ context.Context) ([]types.SeriesMetadata

func (t *testOperator) NextSeries(_ context.Context) (types.InstantVectorSeriesData, error) {
if len(t.data) == 0 {
return types.InstantVectorSeriesData{}, EOS
return types.InstantVectorSeriesData{}, types.EOS
}

d := t.data[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -18,15 +18,15 @@ import (

// RangeVectorFunction performs a rate calculation over a range vector.
type RangeVectorFunction struct {
Inner RangeVectorOperator
Inner types.RangeVectorOperator
Pool *pooling.LimitingPool

numSteps int
rangeSeconds float64
buffer *RingBuffer
buffer *types.RingBuffer
}

var _ InstantVectorOperator = &RangeVectorFunction{}
var _ types.InstantVectorOperator = &RangeVectorFunction{}

func (m *RangeVectorFunction) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
metadata, err := m.Inner.SeriesMetadata(ctx)
Expand Down Expand Up @@ -57,7 +57,7 @@ func (m *RangeVectorFunction) NextSeries(ctx context.Context) (types.InstantVect
}

if m.buffer == nil {
m.buffer = NewRingBuffer(m.Pool)
m.buffer = types.NewRingBuffer(m.Pool)
}

m.buffer.Reset()
Expand All @@ -75,7 +75,7 @@ func (m *RangeVectorFunction) NextSeries(ctx context.Context) (types.InstantVect
step, err := m.Inner.NextStepSamples(m.buffer)

// nolint:errorlint // errors.Is introduces a performance overhead, and NextStepSamples is guaranteed to return exactly EOS, never a wrapped error.
if err == EOS {
if err == types.EOS {
return data, nil
} else if err != nil {
return types.InstantVectorSeriesData{}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operator
package operators

import (
"context"
Expand All @@ -27,7 +27,7 @@ type RangeVectorSelector struct {
nextT int64
}

var _ RangeVectorOperator = &RangeVectorSelector{}
var _ types.RangeVectorOperator = &RangeVectorSelector{}

func (m *RangeVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
// Compute value we need on every call to NextSeries() once, here.
Expand Down Expand Up @@ -56,9 +56,9 @@ func (m *RangeVectorSelector) NextSeries(ctx context.Context) error {
return nil
}

func (m *RangeVectorSelector) NextStepSamples(floats *RingBuffer) (types.RangeVectorStepData, error) {
func (m *RangeVectorSelector) NextStepSamples(floats *types.RingBuffer) (types.RangeVectorStepData, error) {
if m.nextT > m.Selector.End {
return types.RangeVectorStepData{}, EOS
return types.RangeVectorStepData{}, types.EOS
}

stepT := m.nextT
Expand All @@ -84,7 +84,7 @@ func (m *RangeVectorSelector) NextStepSamples(floats *RingBuffer) (types.RangeVe
}, nil
}

func (m *RangeVectorSelector) fillBuffer(floats *RingBuffer, rangeStart, rangeEnd int64) error {
func (m *RangeVectorSelector) fillBuffer(floats *types.RingBuffer, rangeStart, rangeEnd int64) error {
// Keep filling the buffer until we reach the end of the range or the end of the iterator.
for {
valueType := m.chunkIterator.Next()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"context"
Expand Down Expand Up @@ -90,7 +90,7 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata,

func (s *Selector) Next(ctx context.Context, existing chunkenc.Iterator) (chunkenc.Iterator, error) {
if s.series.Len() == 0 {
return nil, EOS
return nil, types.EOS
}

s.seriesIdx++
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operator
package operators

func stepCount(start, end, interval int64) int {
return int((end-start)/interval) + 1
Expand Down
Loading

0 comments on commit 006ee9c

Please sign in to comment.