Skip to content

Commit

Permalink
Flux fixes (#659)
Browse files Browse the repository at this point in the history
* Parse queries from request body

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Parser now takes step from window or aggregateWindow and fails if there isn't one

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Assign step to timerange query in query handler for InfluxDB

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Flux should not use the OPC for queries that are incompatible with Trickster but not invalid flux

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Fixed failure to parse body of non-JSON request properly

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Fixed missing body value in GetValues

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Fixing issues with request bodies

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Duplicate upstreamRequest, support more time formats in Influx

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Refactor SetExtent and Parse on Flux queries

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Remove extraneous prints

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Fixes to getting queries to resolve some test errors

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* Fix flux operation + tests; use RFC3339 instead of nano, populate template URL

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>

* remove comment

---------

Signed-off-by: jakenichols2719 <jnichols2719@protonmail.com>
  • Loading branch information
jnichols-git authored Dec 3, 2023
1 parent 6eb6269 commit a3d24d2
Show file tree
Hide file tree
Showing 18 changed files with 466 additions and 108 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This docker file is for local dev, the official Dockerfile is at
# https://github.com/trickstercache/trickster-docker-images/

FROM golang:1.19 as builder
FROM golang:1.20 as builder
COPY . /go/src/github.com/trickstercache/trickster
WORKDIR /go/src/github.com/trickstercache/trickster

Expand Down
65 changes: 62 additions & 3 deletions pkg/backends/influxdb/flux/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,68 @@

package flux

import "github.com/trickstercache/trickster/v2/pkg/timeseries"
import (
"fmt"
"time"

"github.com/trickstercache/trickster/v2/pkg/timeseries"
)

const (
// ConstStatements are any statements that Trickster doesn't care about modifying.
// This is most statements.
Const = StatementKind(iota)
// RangeStatements are the range(...) contained in the query.
Range
)

type StatementKind int

type Statement interface {
// Kind() returns the StatementKind of the statement.
Kind() StatementKind
// String() returns a string representation of the statement.
String() string
}

type ConstStatement struct {
stmt string
}

func (stmt *ConstStatement) Kind() StatementKind { return Const }
func (stmt *ConstStatement) String() string { return stmt.stmt }

type RangeStatement struct {
ext timeseries.Extent
}

func (stmt *RangeStatement) Kind() StatementKind { return Range }
func (stmt *RangeStatement) String() string {
start := stmt.ext.Start.Format(time.RFC3339)
stop := stmt.ext.End.Format(time.RFC3339)
return fmt.Sprintf("|> range(start: %s, stop: %s)\n", start, stop)
}

type Query struct {
Extent timeseries.Extent
Statement string
stmts []Statement
Extent timeseries.Extent
Step time.Duration
}

func (q *Query) SetExtent(ext timeseries.Extent) {
q.Extent = ext
for _, stmt := range q.stmts {
if rs, ok := stmt.(*RangeStatement); ok {
rs.ext = ext
break
}
}
}

func (q *Query) String() string {
var out string
for _, stmt := range q.stmts {
out += stmt.String()
}
return out
}
89 changes: 70 additions & 19 deletions pkg/backends/influxdb/flux/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,55 @@ func NewParser(reader io.Reader) *Parser {
}
}

func (p *Parser) ParseQuery() (*Query, error) {
// Parse a Flux query.
// Returns the query and an error, plus a bool indicating if the query can use the OPC or not.
// A 'true' value should be taken as the error being for Trickster (no timestep), but not necessarily for InfluxDB.
func (p *Parser) ParseQuery() (*Query, bool, error) {
r := bufio.NewReader(p.reader)
q := &Query{}
if raw, err := io.ReadAll(r); err != nil {
return nil, err
} else {
content := string(raw)
if idx := strings.Index(content, "|> range("); idx == -1 {
return nil, ErrFluxSyntax("range()", "flux timerange query scripts must contain a range() function")
} else {
q.Extent, err = parseRangeFilter(content, idx+len("|> range("))
q := &Query{
stmts: make([]Statement, 0),
}
var hasRange, hasWindow bool
for {
line, err := r.ReadString('\n')
line = strings.TrimSpace(line) + "\n"
var stmt Statement
if err != nil {
if err == io.EOF {
break
} else {
return nil, false, err
}
}
if !hasRange && strings.Contains(line, "range") {
stmt, err = parseRangeFilter(line, 0)
if err != nil {
return nil, err
return nil, false, err
}
q.Extent = stmt.(*RangeStatement).ext
q.stmts = append(q.stmts, stmt)
hasRange = true
} else if !hasWindow && strings.Contains(strings.ToLower(line), "window") {
q.Step, err = parseWindowFunction(line, 0)
if err != nil {
return nil, false, err
}
q.stmts = append(q.stmts, &ConstStatement{line})
hasWindow = true
} else {
q.stmts = append(q.stmts, &ConstStatement{line})
}
}
return q, nil
if !hasRange {
return nil, false, ErrFluxSemantics("flux queries must have a range() filter")
} else if !hasWindow {
return nil, false, ErrFluxSemantics("flux queries in Trickster must have a window() filter to determine timestep")
}
return q, false, nil
}

// Parse a line that is a range filter range(start: $[start], stop: $[stop])
func parseRangeFilter(query string, at int) (timeseries.Extent, error) {
func parseRangeFilter(query string, at int) (Statement, error) {
var start, stop time.Time
var err error
for i := at; i < len(query); {
Expand All @@ -72,12 +100,12 @@ func parseRangeFilter(query string, at int) (timeseries.Extent, error) {
}
timeArgEnd := timeArgStart + strings.IndexAny(query[timeArgStart:], " ,)")
if timeArgEnd == -1 {
return timeseries.Extent{}, ErrFluxSyntax(query[timeArgStart:timeArgStart+10]+"...", "couldn't parse time field from start argument")
return nil, ErrFluxSyntax(query[timeArgStart:timeArgStart+10]+"...", "couldn't parse time field from start argument")
}
// and try to parse that argument as a time field
start, err = tryParseTimeField(query[timeArgStart:timeArgEnd])
if err != nil {
return timeseries.Extent{}, err
return nil, err
}
i = timeArgEnd
continue
Expand All @@ -90,12 +118,12 @@ func parseRangeFilter(query string, at int) (timeseries.Extent, error) {
}
timeArgEnd := timeArgStart + strings.IndexAny(query[timeArgStart:], " )")
if timeArgEnd == -1 {
return timeseries.Extent{}, ErrFluxSyntax(query[timeArgStart:timeArgStart+10]+"...", "couldn't parse time field from stop argument")
return nil, ErrFluxSyntax(query[timeArgStart:timeArgStart+10]+"...", "couldn't parse time field from stop argument")
}
// and try to parse that argument as a time field
stop, err = tryParseTimeField(query[timeArgStart:timeArgEnd])
if err != nil {
return timeseries.Extent{}, err
return nil, err
}
i = timeArgEnd
continue
Expand All @@ -107,9 +135,32 @@ func parseRangeFilter(query string, at int) (timeseries.Extent, error) {
i++
}
if start.IsZero() {
return timeseries.Extent{}, ErrFluxSemantics("range() expressions require a valid start argument")
return nil, ErrFluxSemantics("range() expressions require a valid start argument")
}
if stop.IsZero() {
stop = time.Now()
}
return &RangeStatement{timeseries.Extent{Start: start, End: stop}}, nil
}

func parseWindowFunction(query string, at int) (time.Duration, error) {
for i := at; i < len(query); i++ {
if token := tstrings.Substring(query, i, len("every:")); token == "every:" {
stepArgStart := i + len(token)
if query[stepArgStart] == ' ' {
stepArgStart++
}
stepArgEnd := stepArgStart + strings.IndexAny(query[stepArgStart:], ", )")
if stepArgEnd == -1 {
return 0, ErrFluxSyntax(query[stepArgStart:stepArgStart+10]+"...", "couldn't parse timestep from window function")
}
return timeconv.ParseDuration(query[stepArgStart:stepArgEnd])
}
if query[i] == ')' {
break
}
}
return timeseries.Extent{Start: start, End: stop}, nil
return 0, ErrFluxSyntax("window()", "couldn't find a timestep, make sure argument 'every:' is included")
}

func tryParseTimeField(s string) (time.Time, error) {
Expand Down
52 changes: 47 additions & 5 deletions pkg/backends/influxdb/flux/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,30 @@ import (

var testRelativeDuration string = `from("test-bucket")
|> range(start: -7d, stop: -6d)
|> window(every: 1m)
|> mean()
|> window(every: 10s)
`
var testAbsoluteTime string = `from("test-bucket")
|> range(start: 2023-01-01T00:00:00Z, stop: 2023-01-08T00:00:00Z)
|> window(every: 5m)
|> mean()
|> window(every: 10s)
`
var testUnixTime string = `from("test-bucket"
var testUnixTime string = `from("test-bucket")
|> range(start: 1672531200, stop: 1673136000)
|> aggregateWindow(every: 30s, fn: mean)
`

var testNoRange string = `from("test-bucket")
|> aggregateWindow(every: 30s, fn: mean)
`
var testNoStart string = `from("test-bucket")
|> range(stop: 10)
|> aggregateWindow(every: 30s, fn: mean)
`
var testNoWindow string = `from("test-bucket")
|> range(start: 0, stop: 10)
`

var testsOK map[string]string = map[string]string{
Expand All @@ -40,22 +58,37 @@ var testsOK map[string]string = map[string]string{
"UnixTime": testUnixTime,
}

var testsNotOK map[string]string = map[string]string{
"FailNoRange": testNoRange,
"FailNoStart": testNoStart,
"FailNoWindow": testNoWindow,
}

func TestParserOK(t *testing.T) {
for test, script := range testsOK {
t.Run(test, func(t *testing.T) {
p := NewParser(strings.NewReader(script))
_, err := p.ParseQuery()
_, _, err := p.ParseQuery()
if err != nil {
t.Errorf("failed to parse valid script: %s", err)
}
})
}
for test, script := range testsNotOK {
t.Run(test, func(t *testing.T) {
p := NewParser(strings.NewReader(script))
_, _, err := p.ParseQuery()
if err == nil {
t.Errorf("parsed invalid script")
}
})
}
}

func TestRelativeDuration(t *testing.T) {
p := NewParser(strings.NewReader(testRelativeDuration))
now := time.Now()
q, err := p.ParseQuery()
q, _, err := p.ParseQuery()
if err != nil {
t.Errorf("failed to parse valid script: %s", err)
t.FailNow()
Expand All @@ -70,11 +103,14 @@ func TestRelativeDuration(t *testing.T) {
if !stop.Equal(qStopApprox) {
t.Errorf("query stop time incorrect; got %v, should be %v", qStopApprox, stop)
}
if q.Step != timeconv.Minute {
t.Errorf("query step incorrect; got %v, should be %v", q.Step, timeconv.Minute)
}
}

func TestRFC3999Time(t *testing.T) {
p := NewParser(strings.NewReader(testAbsoluteTime))
q, err := p.ParseQuery()
q, _, err := p.ParseQuery()
if err != nil {
t.Errorf("failed to parse valid script: %s", err)
t.FailNow()
Expand All @@ -87,11 +123,14 @@ func TestRFC3999Time(t *testing.T) {
if !stop.Equal(q.Extent.End) {
t.Errorf("query stop time incorrect; got %v, should be %v", q.Extent.End, stop)
}
if q.Step != 5*timeconv.Minute {
t.Errorf("query step incorrect; got %v, should be %v", q.Step, 5*timeconv.Minute)
}
}

func TestUnixTime(t *testing.T) {
p := NewParser(strings.NewReader(testUnixTime))
q, err := p.ParseQuery()
q, _, err := p.ParseQuery()
if err != nil {
t.Errorf("failed to parse valid script: %s", err)
t.FailNow()
Expand All @@ -104,4 +143,7 @@ func TestUnixTime(t *testing.T) {
if !stop.Equal(q.Extent.End) {
t.Errorf("query stop time incorrect; got %v, should be %v", q.Extent.End, stop)
}
if q.Step != 30*timeconv.Second {
t.Errorf("query step incorrect; got %v, should be %v", q.Step, 30*timeconv.Second)
}
}
Loading

0 comments on commit a3d24d2

Please sign in to comment.