Skip to content

Commit

Permalink
support str concat
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed May 18, 2016
1 parent f7bced7 commit f50ef69
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 79 deletions.
83 changes: 45 additions & 38 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,19 @@ func (e *EvalNode) runEval(snapshot []byte) error {
e.statMap.Set(statsEvalErrors, e.evalErrors)
switch e.Provides() {
case pipeline.StreamEdge:
var err error
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
e.timer.Start()
p.Fields = e.eval(p.Time, p.Fields, p.Tags)
p.Fields, err = e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
e.timer.Stop()
// Skip bad point
continue
}
e.timer.Stop()
for _, child := range e.outs {
err := child.CollectPoint(p)
Expand All @@ -67,10 +77,22 @@ func (e *EvalNode) runEval(snapshot []byte) error {
}
}
case pipeline.BatchEdge:
var err error
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
for i, p := range b.Points {
b.Points[i].Fields = e.eval(p.Time, p.Fields, p.Tags)
for i := 0; i < len(b.Points); {
p := b.Points[i]
b.Points[i].Fields, err = e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
// Remove bad point
b.Points = append(b.Points[:i], b.Points[i+1:]...)
} else {
i++
}
}
e.timer.Stop()
for _, child := range e.outs {
Expand All @@ -84,69 +106,54 @@ func (e *EvalNode) runEval(snapshot []byte) error {
return nil
}

func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) models.Fields {
func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) (models.Fields, error) {
vars := e.scopePool.Get()
defer e.scopePool.Put(vars)
err := fillScope(vars, e.scopePool.ReferenceVariables(), now, fields, tags)
if err != nil {
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
return nil
return nil, err
}
for i, expr := range e.expressions {
if v, err := expr.EvalNum(vars); err == nil {
name := e.e.AsList[i]
vars.Set(name, v)
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
v, err := expr.Eval(vars)
if err != nil {
return nil, err
}
name := e.e.AsList[i]
vars.Set(name, v)
}
var newFields models.Fields
if e.e.KeepFlag {
if l := len(e.e.KeepList); l != 0 {
newFields = make(models.Fields, l)
for _, f := range e.e.KeepList {
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
v, err := vars.Get(f)
if err != nil {
return nil, err
}
newFields[f] = v
}
} else {
newFields = make(models.Fields, len(fields)+len(e.e.AsList))
for f, v := range fields {
newFields[f] = v
}
for _, f := range e.e.AsList {
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
v, err := vars.Get(f)
if err != nil {
return nil, err
}
newFields[f] = v
}
}
} else {
newFields = make(models.Fields, len(e.e.AsList))
for _, f := range e.e.AsList {
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
v, err := vars.Get(f)
if err != nil {
return nil, err
}
newFields[f] = v
}
}
return newFields
return newFields, nil
}
3 changes: 3 additions & 0 deletions integrations/data/TestStream_EvalAllTypes.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dbname
rpname
types str="bob",bool=false,int=5i,float=42 0000000001
29 changes: 29 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,35 @@ stream
testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false)
}

func TestStream_EvalAllTypes(t *testing.T) {
var script = `
stream
|from()
.measurement('types')
|eval(lambda: "str" + 'suffix', lambda: !"bool", lambda: "int" + 14, lambda: "float" * 2.0)
.as( 'str', 'bool', 'int', 'float')
|httpOut('TestStream_EvalAllTypes')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "types",
Tags: nil,
Columns: []string{"time", "bool", "float", "int", "str"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
true,
84.0,
19.0,
"bobsuffix",
}},
},
},
}

testStreamerWithOutput(t, "TestStream_EvalAllTypes", script, 2*time.Second, er, nil, false)
}

func TestStream_Default(t *testing.T) {
var script = `
stream
Expand Down
16 changes: 14 additions & 2 deletions tick/stateful/eval_binary_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type resultContainer struct {

IsFloat64Value bool
Float64Value float64

IsStringValue bool
StringValue string
}

// this function shouldn't be used! only for throwing details error messages!
Expand Down Expand Up @@ -110,8 +113,17 @@ func (n *EvalBinaryNode) EvalRegex(scope *tick.Scope, executionState ExecutionSt
return nil, ErrTypeGuardFailed{RequestedType: TRegex, ActualType: n.returnType}
}

func (n *EvalBinaryNode) EvalString(scope *tick.Scope, executionState ExecutionState) (string, error) {
return "", ErrTypeGuardFailed{RequestedType: TString, ActualType: n.returnType}
func (e *EvalBinaryNode) EvalString(scope *tick.Scope, executionState ExecutionState) (string, error) {
result, err := e.eval(scope, executionState)
if err != nil {
return "", err.error
}

if result.IsStringValue {
return result.StringValue, nil
}

return "", fmt.Errorf("expression returned unexpected type %T", result.value())
}

// EvalBool executes the expression based on eval bool
Expand Down
19 changes: 19 additions & 0 deletions tick/stateful/evaluation_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,4 +786,23 @@ var evaluationFuncs = map[operationKey]evaluationFn{

return resultContainer{Int64Value: left % right, IsInt64Value: true}, nil
},

// -----------------------------------------
// String concatenation func

operationKey{operator: tick.TokenPlus, leftType: TString, rightType: TString}: func(scope *tick.Scope, executionState ExecutionState, leftNode, rightNode NodeEvaluator) (resultContainer, *ErrSide) {
var left string
var right string
var err error

if left, err = leftNode.EvalString(scope, executionState); err != nil {
return emptyResultContainer, &ErrSide{error: err, IsLeftSide: true}
}

if right, err = rightNode.EvalString(scope, executionState); err != nil {
return emptyResultContainer, &ErrSide{error: err, IsRightSide: true}
}

return resultContainer{StringValue: left + right, IsStringValue: true}, nil
},
}
28 changes: 17 additions & 11 deletions tick/stateful/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Expression interface {
EvalString(scope *tick.Scope) (string, error)
EvalBool(scope *tick.Scope) (bool, error)

EvalNum(scope *tick.Scope) (interface{}, error)
Eval(scope *tick.Scope) (interface{}, error)
}

type expression struct {
Expand Down Expand Up @@ -62,33 +62,39 @@ func (se *expression) EvalString(scope *tick.Scope) (string, error) {
return se.nodeEvaluator.EvalString(scope, se.executionState)
}

func (se *expression) EvalNum(scope *tick.Scope) (interface{}, error) {
returnType, err := se.nodeEvaluator.Type(scope, CreateExecutionState())
func (se *expression) Eval(scope *tick.Scope) (interface{}, error) {
typ, err := se.nodeEvaluator.Type(scope, CreateExecutionState())
if err != nil {
return nil, err
}

switch returnType {
switch typ {
case TInt64:
result, err := se.EvalInt(scope)
if err != nil {
// Making sure we return consistently nil on errors, and not zero values.
return nil, err
}

return result, err

case TFloat64:
result, err := se.EvalFloat(scope)
if err != nil {
// Making sure we return consistently nil on errors, and not zero values.
return nil, err
}

return result, err

case TString:
result, err := se.EvalString(scope)
if err != nil {
return nil, err
}
return result, err
case TBool:
result, err := se.EvalBool(scope)
if err != nil {
return nil, err
}
return result, err
default:
return nil, fmt.Errorf("expression returned unexpected type %s", returnType)
return nil, fmt.Errorf("expression returned unexpected type %s", typ)
}
}

Expand Down
2 changes: 1 addition & 1 deletion tick/stateful/expr_dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func runDynamicTestCase(t *testing.T, tc testCase) {

if expectation.IsEvalNum {
evaluationType = "EvalNum"
result, err = se.EvalNum(scope)
result, err = se.Eval(scope)
}

if err != nil {
Expand Down
Loading

0 comments on commit f50ef69

Please sign in to comment.