-
Notifications
You must be signed in to change notification settings - Fork 489
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add now built in function #1550
Add now built in function #1550
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey! Thanks for the PR! I have a couple issues that I think need to be addressed. I don't think we have any other functions that return time.Time
values. So I'm not sure if we'll need a bit of work elsewhere in the PR. More tests that validate its behavior would be great. Good place to stick those tests would be in integrations/streamer_test.go
tick/stateful/functions.go
Outdated
@@ -1354,6 +1355,27 @@ func (year) Signature() map[Domain]ast.ValueType { | |||
return timeFuncSignature | |||
} | |||
|
|||
type now struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to get a test that uses the function. Just to verify that it's working as expected.
tick/stateful/functions.go
Outdated
} | ||
|
||
func (now) Signature() map[Domain]ast.ValueType { | ||
return timeFuncSignature |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want a slightly different function signature here.
The timeFuncSignature
is
fn: ast.TTime -> ast.TInt
I think we want
fn: nil -> ast.TTime
for the now
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch, you're right, I didn't pay attention to that bit. I'll rework on the PR and get back to you.
Hi! In case you don't have time to have a look at my recent changes on my PR, I:
Also, I know that my test is failing right now and I'm working on it, however I have an issue with not being able to run the integration tests properly on my machine. If I do
I end up with lots of tests failing (besides of the one I wrote) with lots of messages of this form:
I guess it comes from my personal machine and system configuration, and I know here is not the proper place to ask this, but if you have any clue of what could cause this problem, I'd be pleased to know. |
@mbresson (for running tests) if you're using go 1.9+, I'd recommend just running |
Hi! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbresson Awesome work here! Just a couple more suggestions.
tick/stateful/functions.go
Outdated
|
||
var timeNowFunc func() time.Time = time.Now | ||
|
||
func SetTimeNowFunc(newTimeNowFunc func() time.Time) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is only for testing right? In general, I'm not a huge fan of having package level functions like this. I think we should just get rid of this and use time.Now
explicitly in now.Call
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's only for testing. I'm not fan of this way of doing things either, but without it, the tests will be reliant on a parameter (time) that's not deterministic. But I understand your point of view and I'll rework my PR.
@@ -9027,6 +9028,59 @@ stream | |||
} | |||
} | |||
|
|||
func TestStream_LambdaNow(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is great! I'd love to see what happens if we did something like
stream
|from()
.measurement('account')
|eval(lambda: "expiration" < now())
.as('now')
|httpOut('TestStream_EvalNow')
I'd also be curious to know what happens when we try to write this to Influx
stream
|from()
.measurement('account')
|eval(lambda: "expiration" < now())
.as('now')
|influxDBOut()
.database('db')
.retentionPolicy('rp')
.measurement('m')
.precision('s')
.tag('key', 'value')
.flushInterval(1ms)
See the TestStream_InfluxDBOut
for an example of this. I suspect there might be issues when we turn the time value into line protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I'll try out adding another test with this and I'll get back to you when I'm done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi! I don't understand what you want me to do with these tests actually. Because
|eval(lambda: "expiration" < now())
is going to output a boolean, right? Not a time value- Even if we get a time value, I don't understand the use for this, because in my opinion now() is intended to be used with other functions such as unixnano() or minute() or something else but never alone.
Anyway, I have committed the tests I just wrote and which fail, but in order to fix them I'll need to know what kind of output you would expect for them because it's not entirely clear for me :).
Thanks in advance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! meant to do this
...
|eval(lambda: now())
.as('now')
...
Even if we get a time value, I don't understand the use for this, because in my opinion now() is intended to be used with other functions such as unixnano() or minute() or something else but never alone.
I agree that they'll be used with unixNano
, but my concern is what happens if I try to use it as a field value. In particular, what happens when we try to set it as a field and write it to Influx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the tests. They are failing, but I don't know why. I have read some part of the code tick/eval.go but I don't understand a lot about it.
Here is the output of one of the failed tests:
--- FAIL: TestStream_EvalNowInfluxDBOut (0.00s)
streamer_test.go:11032: digraph TestStream_EvalNowInfluxDBOut {
stream0 -> from1;
from1 -> eval2;
eval2 -> influxdb_out3;
}
streamer_test.go:9171: got exp db
streamer_test.go:9174: got exp rp
streamer_test.go:9177: got exp s
streamer_test.go:9180: got 0 exp 1
Can you give me any pointers on this? I don't have any useful log to understand what's going on. I would have expected the test to at least create the database, but it seems not. It must mean I wrote them wrong.
integrations/streamer_test.go
Outdated
}, | ||
} | ||
|
||
var timeNowFunc func() time.Time = func() time.Time { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test should work perfectly fine if we rely on the currenttime.Now()
. I don't think we need to mock time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The input I use for the test has a field named expiration, with the values 1980, 2100, 1990. If I rely directly on time.Now() it means the test will break after 2100. I know it's far, far away and you and I won't exist anymore (and maybe kapacitor will not exist anymore, who knows) but the test will not be deterministic because of this. Is it ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think it's okay. I don't think this project is still around in its current form in 2100. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some thoughts. On the surface things looked fine to me.
integrations/streamer_test.go
Outdated
} | ||
if precision != "s" { | ||
t.Errorf("got %v exp %v", precision, "s") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can disregard these checks here since they're not what we're really testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi. I made the changes and added a lot of logging (nothing committed) to try to understand what was going on, since I'm not familiar with how kapacitor is built. It looks like there is no HTTP request made at all to InfluxDB. And it seems that eval is not even called (tried to log inside NewEvalLambdaNode for testing purposes and got nothing). I'll continue my investigations another day.
integrations/streamer_test.go
Outdated
if database != "db" { | ||
t.Errorf("got %v exp %v", database, "db") | ||
} | ||
if rp != "rp" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can disregard these checks here since they're not what we're really testing.
integrations/streamer_test.go
Outdated
} | ||
testStreamerNoOutput(t, "TestStream_EvalNowInfluxDBOut", script, 15*time.Second, tmInit) | ||
|
||
if database != "db" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can disregard these checks here since they're not what we're really testing.
integrations/streamer_test.go
Outdated
w.WriteHeader(http.StatusOK) | ||
_ = json.NewEncoder(w).Encode(data) | ||
//Get request data | ||
database = r.URL.Query().Get("db") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can ignore these
integrations/streamer_test.go
Outdated
//Respond | ||
var data client.Response | ||
w.WriteHeader(http.StatusOK) | ||
_ = json.NewEncoder(w).Encode(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably check the error here. This might be out issue.
integrations/streamer_test.go
Outdated
return | ||
} | ||
points, err = imodels.ParsePointsWithPrecision(b, time.Unix(0, 0), precision) | ||
done <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want to wait on the channel send output of this function before we do all the other checks.
integrations/streamer_test.go
Outdated
done <- err | ||
return | ||
} | ||
points, err = imodels.ParsePointsWithPrecision(b, time.Unix(0, 0), precision) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might be getting an error here and that's causing the issue
@mbresson just curious if you've made any more progress on this issue. If the InfluxDBOut stuff is still causing issues, maybe we should just make it so that |
Hi, I have made good progress and I'll update the PR soon. And I prefer not to make now() return an integer and keep working on it, because having now() returning a time.Time is more interesting, especially if it can be used with other functions taking a time in parameter. I'll keep you updated. |
@mbresson Sounds good. Just a heads up, we're getting ready to start the 1.4 release process and I'd love to see this PR make it in there!
I definitely agree that having a |
I should have time to push my updates by tomorrow evening or the day before tomorrow, just didn't have time to do it these last days. |
bbd05c7
to
36fd1ce
Compare
Hi, finally, here are my updates:
I'm not sure my explanations are clear because to be blunt, my understanding of Kapacitor's source code is incomplete, so I'd appreciate if you could double-check the new code that I brought in. Maybe some more tests could be added, if you feel like it's needed, I can work on it another day. Thanks |
It seems that the CI checks have failed. I didn't have any error when running the tests on my machine, and I cannot access the CI logs (the loader keep spinning forever). I'll check again tomorrow. |
@mbresson looks like its just a flakey test. I'm rerunning the build. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the direction this is headed. Just a couple suggestions and questions.
edge/messages.go
Outdated
row.Values[0][i+1] = v | ||
finalValue := v | ||
|
||
switch v.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be turned into
val, ok := v.(time.Time)
if ok {
v = val.UnixNano()
}
row.Values[0][i+1] = v
edge/messages.go
Outdated
row.Values[i][j+1] = v | ||
finalValue := v | ||
|
||
switch v.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same point as the one above.
influxdb/client.go
Outdated
|
||
for fieldName, fieldValue := range influxDBFields { | ||
switch fieldValue.(type) { | ||
case time.Time: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be
val, ok := fieldValue.(time.Time)
if ok {
influxDBFields[fieldName] = = val.UnixNano()
}
tick/ast/node.go
Outdated
@@ -214,6 +214,36 @@ func (n *DurationNode) Equal(o interface{}) bool { | |||
return false | |||
} | |||
|
|||
type TimeNode struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we need this for?
495451e
to
ce7de65
Compare
Hi, thanks for your feedback! I made the changes you suggested, I think your code suggestions are clearer and more concise. |
@mbresson looks good! After a rebase and squash into a single commit I think this should be good to merge. |
ce7de65
to
860b95c
Compare
Sorry, I've failed my squash & rebase because now it appears that I'm the (co-)committer of commits that are not mine. (8933719, aa33d98, e48f9f0, 81576da, 156349f, 871d796, 77c91f9, ffcb25a, 0dd1d3e, 3a4d43f, 860b95c). |
@mbresson The process should be something like
that will take you a screen that looks like this
Change all of the |
I actually did git rebase -i HEAD~10 and squashed all my commits into f1f9bb0 and left other people's commits intact. Anyway, I'll fix that when I'm back. |
860b95c
to
2ba8c30
Compare
Hi! I'm finished at last. I see the checks have failed but it seems to come from another test than mine:
Maybe it's an issue with the CI system again? |
Hi, is everything ok with my pull request? Please let me know if some rework or enhancement is needed. |
@mbresson sorry about the delay in my response. After a bit of internal discussion, we think that allowing We think the
should result in
But
and
are perfectly fine. My apologies the last second changes. I wasn't thinking clearly when I suggested that we allow times be passed around like values. I'll go through and note the changes necessary to get us to the state suggested above. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through and noted the changes that would prevent time.Time
s from being set as fields.
Sorry again about leading you down the wrong path.
edge/messages.go
Outdated
// force converting time.Time fields (other than the built-in time column) | ||
// to an integer value representing a Unix nanoseconds timestamp | ||
// otherwise, they'll be turned into ISO8601 strings during marshalling | ||
timeValue, isTime := v.(time.Time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need this any more.
edge/messages.go
Outdated
// force converting time.Time fields (other than the built-in time column) | ||
// to an integer value representing a Unix nanoseconds timestamp | ||
// otherwise, they'll be turned into ISO8601 strings during marshalling | ||
timeValue, isTime := v.(time.Time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need this any more.
influxdb/client.go
Outdated
@@ -517,7 +517,22 @@ type Point struct { | |||
// Returns byte array of a line protocol representation of the point | |||
func (p Point) Bytes(precision string) []byte { | |||
key := imodels.MakeKey([]byte(p.Name), imodels.NewTags(p.Tags)) | |||
fields := imodels.Fields(p.Fields).MarshalBinary() | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need this any more.
integrations/streamer_test.go
Outdated
testStreamerWithOutput(t, "TestStream_EvalNow", script, time.Second, expectedOutput, false, nil) | ||
} | ||
|
||
func TestStream_EvalNowInfluxDBOut(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need this any more.
tick/stateful/expr.go
Outdated
@@ -19,6 +19,7 @@ type Expression interface { | |||
EvalString(scope *Scope) (string, error) | |||
EvalBool(scope *Scope) (bool, error) | |||
EvalDuration(scope *Scope) (time.Duration, error) | |||
EvalTime(scope *Scope) (time.Time, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need this any more.
tick/stateful/expr.go
Outdated
@@ -84,6 +85,10 @@ func (se *expression) EvalDuration(scope *Scope) (time.Duration, error) { | |||
return se.nodeEvaluator.EvalDuration(scope, se.executionState) | |||
} | |||
|
|||
func (se *expression) EvalTime(scope *Scope) (time.Time, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need this any more.
tick/stateful/expr.go
Outdated
@@ -131,6 +136,12 @@ func (se *expression) Eval(scope *Scope) (interface{}, error) { | |||
return nil, err | |||
} | |||
return result, err | |||
case ast.TTime: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need this any more.
Add now() built-in, tick stateless function that returns a time.Time
2ba8c30
to
1342ba3
Compare
No problem, I understand. I have made the relevant changes. |
This pull request adds a stateless function now() that could be used in conjunction with the recently merged function unixNano() to compare fields containing time encoded in nanoseconds with the current time.
Relevant feature request: Add stateless function now() to get the current local time
Required for all non-trivial PRs
(pull request beginner here, please let me know if I've made a mistake in the process, thanks)