Skip to content

Commit

Permalink
fix #2555: add backreference in CQs
Browse files Browse the repository at this point in the history
Add new query syntax to allow the following in CQs:

INTO "1hPolicy".:MEASUREMENT
  • Loading branch information
dgnorton committed Aug 27, 2015
1 parent 841319e commit 1490f45
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 17 deletions.
4 changes: 4 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,9 @@ func (t *Target) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("INTO ")
_, _ = buf.WriteString(t.Measurement.String())
if t.Measurement.Name == "" {
_, _ = buf.WriteString(":MEASUREMENT")
}

return buf.String()
}
Expand Down Expand Up @@ -2159,6 +2162,7 @@ type Measurement struct {
RetentionPolicy string
Name string
Regex *RegexLiteral
Parent Node
}

// String returns a string representation of the measurement.
Expand Down
37 changes: 26 additions & 11 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,9 @@ func (p *Parser) parseSegmentedIdents() ([]string, error) {
if ch := p.peekRune(); ch == '/' {
// Next segment is a regex so we're done.
break
} else if ch == ':' {
// Next segment is context-specific so let caller handle it.
break
} else if ch == '.' {
// Add an empty identifier.
idents = append(idents, "")
Expand Down Expand Up @@ -716,7 +719,7 @@ func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, e
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
}
if stmt.Sources, err = p.parseSources(); err != nil {
if stmt.Sources, err = p.parseSources(stmt); err != nil {
return nil, err
}

Expand Down Expand Up @@ -799,7 +802,19 @@ func (p *Parser) parseTarget(tr targetRequirement) (*Target, error) {
return nil, err
}

if len(idents) < 3 {
// Check for source measurement reference.
if ch := p.peekRune(); ch == ':' {
if err := p.parseTokens([]Token{COLON, MEASUREMENT}); err != nil {
return nil, err
}
// Append empty measurement name.
idents = append(idents, "")
}
}

t := &Target{Measurement: &Measurement{}}
t.Measurement.Parent = t

switch len(idents) {
case 1:
Expand All @@ -825,7 +840,7 @@ func (p *Parser) parseDeleteStatement() (*DeleteStatement, error) {
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
}
source, err := p.parseSource()
source, err := p.parseSource(stmt)
if err != nil {
return nil, err
}
Expand All @@ -849,7 +864,7 @@ func (p *Parser) parseShowSeriesStatement() (*ShowSeriesStatement, error) {

// Parse optional FROM.
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
if stmt.Sources, err = p.parseSources(); err != nil {
if stmt.Sources, err = p.parseSources(stmt); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -936,7 +951,7 @@ func (p *Parser) parseShowTagKeysStatement() (*ShowTagKeysStatement, error) {

// Parse optional source.
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
if stmt.Sources, err = p.parseSources(); err != nil {
if stmt.Sources, err = p.parseSources(stmt); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -974,7 +989,7 @@ func (p *Parser) parseShowTagValuesStatement() (*ShowTagValuesStatement, error)

// Parse optional source.
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
if stmt.Sources, err = p.parseSources(); err != nil {
if stmt.Sources, err = p.parseSources(stmt); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -1064,7 +1079,7 @@ func (p *Parser) parseShowFieldKeysStatement() (*ShowFieldKeysStatement, error)

// Parse optional source.
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
if stmt.Sources, err = p.parseSources(); err != nil {
if stmt.Sources, err = p.parseSources(stmt); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -1114,7 +1129,7 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {

if tok == FROM {
// Parse source.
if stmt.Sources, err = p.parseSources(); err != nil {
if stmt.Sources, err = p.parseSources(stmt); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -1539,11 +1554,11 @@ func (p *Parser) parseAlias() (string, error) {
}

// parseSources parses a comma delimited list of sources.
func (p *Parser) parseSources() (Sources, error) {
func (p *Parser) parseSources(parent Node) (Sources, error) {
var sources Sources

for {
s, err := p.parseSource()
s, err := p.parseSource(parent)
if err != nil {
return nil, err
}
Expand All @@ -1568,8 +1583,8 @@ func (p *Parser) peekRune() rune {
return r
}

func (p *Parser) parseSource() (Source, error) {
m := &Measurement{}
func (p *Parser) parseSource(parent Node) (Source, error) {
m := &Measurement{Parent: parent}

// Attempt to parse a regex.
re, err := p.parseRegex()
Expand Down
26 changes: 26 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,32 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// CREATE CONTINUOUS QUERY with backreference measurement name
{
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT mean(value) INTO "policy1".:measurement FROM /^[a-z]+.*/ GROUP BY time(1m) END`,
stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Database: "testdb",
Source: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "mean", Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
Target: &influxql.Target{
Measurement: &influxql.Measurement{RetentionPolicy: "policy1"},
},
Sources: []influxql.Source{&influxql.Measurement{Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`^[a-z]+.*`)}}},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 1 * time.Minute},
},
},
},
},
},
},
},

// CREATE DATABASE statement
{
s: `CREATE DATABASE testdb`,
Expand Down
2 changes: 2 additions & 0 deletions influxql/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
return COMMA, pos, ""
case ';':
return SEMICOLON, pos, ""
case ':':
return COLON, pos, ""
}

return ILLEGAL, pos, string(ch0)
Expand Down
2 changes: 2 additions & 0 deletions influxql/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
LPAREN // (
RPAREN // )
COMMA // ,
COLON // :
SEMICOLON // ;
DOT // .

Expand Down Expand Up @@ -159,6 +160,7 @@ var tokens = [...]string{
LPAREN: "(",
RPAREN: ")",
COMMA: ",",
COLON: ":",
SEMICOLON: ";",
DOT: ".",

Expand Down
13 changes: 11 additions & 2 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,13 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
}

for _, row := range result.Series {
// Get the measurement name for the result.
measurement := cq.intoMeasurement()
if measurement == "" {
measurement = row.Name
}
// Convert the result row to points.
part, err := s.convertRowToPoints(cq.intoMeasurement(), row)
part, err := s.convertRowToPoints(measurement, row)
if err != nil {
log.Println(err)
continue
Expand Down Expand Up @@ -346,7 +351,11 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
}

if s.loggingEnabled {
s.Logger.Printf("wrote %d point(s) to %s.%s.%s", len(points), cq.intoDB(), cq.intoRP(), cq.Info.Name)
db := cq.intoDB()
if db == "" {
db = cq.Database
}
s.Logger.Printf("wrote %d point(s) to %s.%s", len(points), db, cq.intoRP())
}

return nil
Expand Down
49 changes: 47 additions & 2 deletions services/continuous_querier/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"log"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -61,6 +62,45 @@ func TestExecuteContinuousQuery_HappyPath(t *testing.T) {
}
}

// Test ExecuteContinuousQuery when INTO measurements are taken from the FROM clause.
func TestExecuteContinuousQuery_ReferenceSource(t *testing.T) {
s := NewTestService(t)
dbis, _ := s.MetaStore.Databases()
dbi := dbis[2]
cqi := dbi.ContinuousQueries[0]

rowCnt := 2
pointCnt := 1
qe := s.QueryExecutor.(*QueryExecutor)
qe.Results = []*influxql.Result{genResult(rowCnt, pointCnt)}

pw := s.PointsWriter.(*PointsWriter)
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
if len(p.Points) != pointCnt*rowCnt {
return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points))
}

exp := "cpu,host=server01 value=0"
got := p.Points[0].String()
if !strings.Contains(got, exp) {
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
}

exp = "cpu2,host=server01 value=0"
got = p.Points[1].String()
if !strings.Contains(got, exp) {
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
}

return nil
}

err := s.ExecuteContinuousQuery(&dbi, &cqi)
if err != nil {
t.Error(err)
}
}

// Test the service happy path.
func TestService_HappyPath(t *testing.T) {
s := NewTestService(t)
Expand All @@ -70,7 +110,7 @@ func TestService_HappyPath(t *testing.T) {
qe.Results = []*influxql.Result{genResult(1, pointCnt)}

pw := s.PointsWriter.(*PointsWriter)
ch := make(chan int, 5)
ch := make(chan int, 10)
defer close(ch)
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
ch <- len(p.Points)
Expand All @@ -97,7 +137,7 @@ func TestService_Run(t *testing.T) {
s.Config.RecomputePreviousN = 0

done := make(chan struct{})
expectCallCnt := 2
expectCallCnt := 3
callCnt := 0

// Set a callback for ExecuteQuery.
Expand Down Expand Up @@ -252,6 +292,8 @@ func NewTestService(t *testing.T) *Service {
ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END`)
ms.CreateDatabase("db2", "default")
ms.CreateContinuousQuery("db2", "cq2", `CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END`)
ms.CreateDatabase("db3", "default")
ms.CreateContinuousQuery("db3", "cq3", `CREATE CONTINUOUS QUERY cq3 ON db3 BEGIN SELECT mean(value) INTO "1hAverages".:MEASUREMENT FROM /cpu[0-9]?/ GROUP BY time(10s) END`)

return s
}
Expand Down Expand Up @@ -471,6 +513,9 @@ func genResult(rowCnt, valCnt int) *influxql.Result {
Columns: []string{"time", "value"},
Values: vals,
}
if len(rows) > 0 {
row.Name = fmt.Sprintf("cpu%d", len(rows)+1)
}
rows = append(rows, row)
}
return &influxql.Result{
Expand Down
8 changes: 6 additions & 2 deletions tsdb/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,8 +860,12 @@ func (q *QueryExecutor) normalizeStatement(stmt influxql.Statement, defaultDatab
// normalizeMeasurement inserts the default database or policy into all measurement names,
// if required.
func (q *QueryExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error {
if m.Name == "" && m.Regex == nil {
return errors.New("invalid measurement")
// Targets (measurements in an INTO clause) can have blank names, which means it will be
// the same as the measurement name it came from in the FROM clause.
if _, ok := m.Parent.(*influxql.Target); !ok {
if m.Name == "" && m.Regex == nil {
return errors.New("invalid measurement")
}
}

// Measurement does not have an explicit database? Insert default.
Expand Down

0 comments on commit 1490f45

Please sign in to comment.