Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add now built in function #1550

Merged
merged 1 commit into from
Oct 27, 2017

Conversation

mbresson
Copy link
Contributor

@mbresson mbresson commented Sep 7, 2017

This pull request adds a stateless function now() that could be used in conjunction with the recently merged function unixNano() to compare fields containing time encoded in nanoseconds with the current time.
Relevant feature request: Add stateless function now() to get the current local time

Required for all non-trivial PRs
  • Rebased/mergable
  • Tests pass
  • CHANGELOG.md updated
  • Sign CLA (if not already signed)

(pull request beginner here, please let me know if I've made a mistake in the process, thanks)

Copy link
Contributor

@desa desa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey! Thanks for the PR! I have a couple issues that I think need to be addressed. I don't think we have any other functions that return time.Time values. So I'm not sure if we'll need a bit of work elsewhere in the PR. More tests that validate its behavior would be great. Good place to stick those tests would be in integrations/streamer_test.go

@@ -1354,6 +1355,27 @@ func (year) Signature() map[Domain]ast.ValueType {
return timeFuncSignature
}

type now struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to get a test that uses the function. Just to verify that it's working as expected.

}

func (now) Signature() map[Domain]ast.ValueType {
return timeFuncSignature
Copy link
Contributor

@desa desa Sep 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want a slightly different function signature here.

The timeFuncSignature is

fn: ast.TTime -> ast.TInt

I think we want

fn: nil -> ast.TTime

for the now function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, you're right, I didn't pay attention to that bit. I'll rework on the PR and get back to you.

@mbresson
Copy link
Contributor Author

Hi! In case you don't have time to have a look at my recent changes on my PR, I:

  • fixed the signature for the function now()
  • added an integration test
  • made the function now() rely on a package-local variable (timeNowFunction). I needed to do so in order to have deterministic tests by stubbing now(). If you prefer another approach or feel like it doesn't fit well enough in Kapacitor's source code and code style, just let me know, I'm open to other ideas.

Also, I know that my test is failing right now and I'm working on it, however I have an issue with not being able to run the integration tests properly on my machine. If I do

./test.sh

I end up with lots of tests failing (besides of the one I wrote) with lots of messages of this form:

Get http://[::]:36319/kapacitor/v1/tasks/TestStream_....: dial tcp [::]:36319: connect: cannot assign requested address

I guess it comes from my personal machine and system configuration, and I know here is not the proper place to ask this, but if you have any clue of what could cause this problem, I'd be pleased to know.

@desa
Copy link
Contributor

desa commented Sep 18, 2017

@mbresson (for running tests) if you're using go 1.9+, I'd recommend just running go test ./.... If you're using a version less than that you'll need to specify the packages you'd like to test explicitly go test . ./alert (...).

@mbresson
Copy link
Contributor Author

Hi!
Thanks for the tip, it worked. I have fixed the test. Can you have a look and tell me if you are satisfied with my PR? Otherwise I can rework it, add more tests or fix whatever is not right.

Copy link
Contributor

@desa desa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbresson Awesome work here! Just a couple more suggestions.


var timeNowFunc func() time.Time = time.Now

func SetTimeNowFunc(newTimeNowFunc func() time.Time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only for testing right? In general, I'm not a huge fan of having package level functions like this. I think we should just get rid of this and use time.Now explicitly in now.Call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's only for testing. I'm not fan of this way of doing things either, but without it, the tests will be reliant on a parameter (time) that's not deterministic. But I understand your point of view and I'll rework my PR.

@@ -9027,6 +9028,59 @@ stream
}
}

func TestStream_LambdaNow(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is great! I'd love to see what happens if we did something like

stream
    |from()
      .measurement('account')
    |eval(lambda: "expiration" < now())
      .as('now')
    |httpOut('TestStream_EvalNow')

I'd also be curious to know what happens when we try to write this to Influx

stream
    |from()
      .measurement('account')
    |eval(lambda: "expiration" < now())
      .as('now')
    |influxDBOut()
	.database('db')
	.retentionPolicy('rp')
	.measurement('m')
	.precision('s')
	.tag('key', 'value')
	.flushInterval(1ms)

See the TestStream_InfluxDBOut for an example of this. I suspect there might be issues when we turn the time value into line protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll try out adding another test with this and I'll get back to you when I'm done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! I don't understand what you want me to do with these tests actually. Because

  • |eval(lambda: "expiration" < now()) is going to output a boolean, right? Not a time value
  • Even if we get a time value, I don't understand the use for this, because in my opinion now() is intended to be used with other functions such as unixnano() or minute() or something else but never alone.

Anyway, I have committed the tests I just wrote and which fail, but in order to fix them I'll need to know what kind of output you would expect for them because it's not entirely clear for me :).

Thanks in advance

Copy link
Contributor

@desa desa Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! meant to do this

...
    |eval(lambda: now())
      .as('now')
...

Even if we get a time value, I don't understand the use for this, because in my opinion now() is intended to be used with other functions such as unixnano() or minute() or something else but never alone.

I agree that they'll be used with unixNano, but my concern is what happens if I try to use it as a field value. In particular, what happens when we try to set it as a field and write it to Influx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the tests. They are failing, but I don't know why. I have read some part of the code tick/eval.go but I don't understand a lot about it.
Here is the output of one of the failed tests:

--- FAIL: TestStream_EvalNowInfluxDBOut (0.00s)
	streamer_test.go:11032: digraph TestStream_EvalNowInfluxDBOut {
		stream0 -> from1;
		from1 -> eval2;
		eval2 -> influxdb_out3;
		}
	streamer_test.go:9171: got  exp db
	streamer_test.go:9174: got  exp rp
	streamer_test.go:9177: got  exp s
	streamer_test.go:9180: got 0 exp 1

Can you give me any pointers on this? I don't have any useful log to understand what's going on. I would have expected the test to at least create the database, but it seems not. It must mean I wrote them wrong.

},
}

var timeNowFunc func() time.Time = func() time.Time {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test should work perfectly fine if we rely on the currenttime.Now(). I don't think we need to mock time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input I use for the test has a field named expiration, with the values 1980, 2100, 1990. If I rely directly on time.Now() it means the test will break after 2100. I know it's far, far away and you and I won't exist anymore (and maybe kapacitor will not exist anymore, who knows) but the test will not be deterministic because of this. Is it ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it's okay. I don't think this project is still around in its current form in 2100. :)

Copy link
Contributor

@desa desa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some thoughts. On the surface things looked fine to me.

}
if precision != "s" {
t.Errorf("got %v exp %v", precision, "s")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can disregard these checks here since they're not what we're really testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. I made the changes and added a lot of logging (nothing committed) to try to understand what was going on, since I'm not familiar with how kapacitor is built. It looks like there is no HTTP request made at all to InfluxDB. And it seems that eval is not even called (tried to log inside NewEvalLambdaNode for testing purposes and got nothing). I'll continue my investigations another day.

if database != "db" {
t.Errorf("got %v exp %v", database, "db")
}
if rp != "rp" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can disregard these checks here since they're not what we're really testing.

}
testStreamerNoOutput(t, "TestStream_EvalNowInfluxDBOut", script, 15*time.Second, tmInit)

if database != "db" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can disregard these checks here since they're not what we're really testing.

w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(data)
//Get request data
database = r.URL.Query().Get("db")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can ignore these

//Respond
var data client.Response
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably check the error here. This might be out issue.

return
}
points, err = imodels.ParsePointsWithPrecision(b, time.Unix(0, 0), precision)
done <- err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to wait on the channel send output of this function before we do all the other checks.

done <- err
return
}
points, err = imodels.ParsePointsWithPrecision(b, time.Unix(0, 0), precision)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might be getting an error here and that's causing the issue

@desa
Copy link
Contributor

desa commented Oct 9, 2017

@mbresson just curious if you've made any more progress on this issue. If the InfluxDBOut stuff is still causing issues, maybe we should just make it so that now returns an integer. This will un-complicate the InfluxDB issues that may come up.

@mbresson
Copy link
Contributor Author

mbresson commented Oct 10, 2017

Hi, I have made good progress and I'll update the PR soon. And I prefer not to make now() return an integer and keep working on it, because having now() returning a time.Time is more interesting, especially if it can be used with other functions taking a time in parameter. I'll keep you updated.

@desa
Copy link
Contributor

desa commented Oct 11, 2017

@mbresson Sounds good. Just a heads up, we're getting ready to start the 1.4 release process and I'd love to see this PR make it in there!

And I prefer not to make now() return an integer and keep working on it, because having now() returning a time.Time is more interesting, especially if it can be used with other functions taking a time in parameter.

I definitely agree that having a time.Time value would be more useful, but I think that change might be more involved. In fact I think it might involve a change with InfluxDB, I could be wrong though, maybe just before it's written to InfluxDB convert it to an int? Just kind of thinking out loud.

@mbresson
Copy link
Contributor Author

I should have time to push my updates by tomorrow evening or the day before tomorrow, just didn't have time to do it these last days.
No need to change anything in InfluxDB, but there is a choice to make: before writing it to InfluxDB, should we convert it to an int (Unix timestamp), or to a string representation of the time ? (such as ISO8601)
Tell me what you prefer.

@mbresson mbresson force-pushed the add-stateless-time-function-now branch from bbd05c7 to 36fd1ce Compare October 12, 2017 17:03
@mbresson
Copy link
Contributor Author

Hi, finally, here are my updates:

  • In order to be able to evaluate time.Time values, in tick/ast/node.go, I added a TimeNode type, and I also added the file tick/stateful/eval_time_node.go for the same reasons.
  • In edge/messages's ToRow methods, I convert time.Time values to unix timestamps in nanoseconds (int64). I do the same in influxdb/client.go's Bytes method. I do it so that we can convert the time.Time values we get when now() is evaluated to integral values. Otherwise, they are automatically converted to string, ISO8601 representation of dates, which is not what we want.
  • I have changed my test TestStream_EvalNow because it was impossible to have it pass if I made it output now(), since the expected value could never be exactly the same, nanoseconds-wise, as the value produced. Instead I altered the test to get the current year() and compare it, but I think it makes the test much less useful since initially it was intended to check what would happen if now() was fed to httpOut. I'm not very satisfied for one more reason, it is that if that test happens to run at the very end of a year, it may fail because the expected value for year may be the next year. That's the problem with testing non-deterministic time values :P.
  • I have completed the test TestStream_EvalNowInfluxDBOut, and it checks that the time.Time value resulting from lambda: now() is indeed sent as a unix nanoseconds timestamp, encoded as an int64.

I'm not sure my explanations are clear because to be blunt, my understanding of Kapacitor's source code is incomplete, so I'd appreciate if you could double-check the new code that I brought in. Maybe some more tests could be added, if you feel like it's needed, I can work on it another day.

Thanks

@mbresson
Copy link
Contributor Author

It seems that the CI checks have failed. I didn't have any error when running the tests on my machine, and I cannot access the CI logs (the loader keep spinning forever). I'll check again tomorrow.

@desa
Copy link
Contributor

desa commented Oct 12, 2017

@mbresson looks like its just a flakey test. I'm rerunning the build.

Copy link
Contributor

@desa desa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the direction this is headed. Just a couple suggestions and questions.

edge/messages.go Outdated
row.Values[0][i+1] = v
finalValue := v

switch v.(type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be turned into

val, ok := v.(time.Time)
if ok {
v = val.UnixNano()
}

row.Values[0][i+1] = v

edge/messages.go Outdated
row.Values[i][j+1] = v
finalValue := v

switch v.(type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same point as the one above.


for fieldName, fieldValue := range influxDBFields {
switch fieldValue.(type) {
case time.Time:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be

val, ok := fieldValue.(time.Time)
if ok {
influxDBFields[fieldName] =  = val.UnixNano()
}

tick/ast/node.go Outdated
@@ -214,6 +214,36 @@ func (n *DurationNode) Equal(o interface{}) bool {
return false
}

type TimeNode struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need this for?

@mbresson mbresson force-pushed the add-stateless-time-function-now branch from 495451e to ce7de65 Compare October 18, 2017 16:22
@mbresson
Copy link
Contributor Author

Hi, thanks for your feedback! I made the changes you suggested, I think your code suggestions are clearer and more concise.
About the TimeNode, you are right, it is not needed, as other things that I have removed in my latest commit. I added these because I misunderstood some parts of the code in kapacitor. My fault. :)
I'll be without internet until beginning of next week, so I hurried a little bit for my latest commit. I'm looking forward to your feedback.

@desa
Copy link
Contributor

desa commented Oct 18, 2017

@mbresson looks good! After a rebase and squash into a single commit I think this should be good to merge.

@mbresson mbresson force-pushed the add-stateless-time-function-now branch from ce7de65 to 860b95c Compare October 19, 2017 00:42
@mbresson
Copy link
Contributor Author

mbresson commented Oct 19, 2017

Sorry, I've failed my squash & rebase because now it appears that I'm the (co-)committer of commits that are not mine. (8933719, aa33d98, e48f9f0, 81576da, 156349f, 871d796, 77c91f9, ffcb25a, 0dd1d3e, 3a4d43f, 860b95c).
I'm not good at git, do you know how to squash & rebase and retain the original information?
Either way I'll try and clean up my mess when I'm back next week.
(edit: it looks like the tests fail for no reason, like last time)

@desa
Copy link
Contributor

desa commented Oct 19, 2017

@mbresson The process should be something like

git rebase -i master

that will take you a screen that looks like this

  1 pick d25510d0 Add example of all nodes in load tasks style dir
  2
  3 # Rebase 6e45b39d..d25510d0 onto 6e45b39d (1 command)
  4 #
  5 # Commands:
  6 # p, pick = use commit
  7 # r, reword = use commit, but edit the commit message
  8 # e, edit = use commit, but stop for amending
  9 # s, squash = use commit, but meld into previous commit
 10 # f, fixup = like "squash", but discard this commit's log message
 11 # x, exec = run command (the rest of the line) using shell
 12 # d, drop = remove commit
 13 #
 14 # These lines can be re-ordered; they are executed from top to bottom.
 15 #
 16 # If you remove a line here THAT COMMIT WILL BE LOST.
 17 #
 18 # However, if you remove everything, the rebase will be aborted.
 19 #
 20 # Note that empty commits are commented out

Change all of the pick commits to squash except for the very first one and change that one to reword. Then quit and reword the commit appropriately.

@mbresson
Copy link
Contributor Author

I actually did git rebase -i HEAD~10 and squashed all my commits into f1f9bb0 and left other people's commits intact.
But I don't understand why, on Github, it looks like I'm the committer of all commits, not only mine:

commits

Anyway, I'll fix that when I'm back.

@mbresson mbresson force-pushed the add-stateless-time-function-now branch from 860b95c to 2ba8c30 Compare October 23, 2017 14:08
@mbresson
Copy link
Contributor Author

Hi! I'm finished at last. I see the checks have failed but it seems to come from another test than mine:

=== RUN   TestServer_RecordReplayStreamWithPost
--- FAIL: TestServer_RecordReplayStreamWithPost (1.19s)
	server_test.go:3528: failed to finish recording

Maybe it's an issue with the CI system again?

@mbresson
Copy link
Contributor Author

Hi, is everything ok with my pull request? Please let me know if some rework or enhancement is needed.

@desa
Copy link
Contributor

desa commented Oct 27, 2017

@mbresson sorry about the delay in my response. After a bit of internal discussion, we think that allowing time.Times to be passed around as fields is a bit more of a substantial change. There are a number of places where we make assumptions that the fields will be an int, float, string, or bool.

We think the now function is still a great addition, the only change from whats here will be

stream
    |from()
        .measurement('cpu')
    |eval(lambda: now())
        .as('now')

should result in

ts=2017-10-27T12:02:35.313-04:00 lvl=error msg="error evaluating expression" service=kapacitor task_master=main task=now node=log4 err="expression returned unexpected type time"

But

stream
    |from()
        .measurement('cpu')
    |where(lambda: "time" > now())

and

stream
    |from()
        .measurement('cpu')
    |eval(lambda: unixNano(now()))
        .as('now')

are perfectly fine.

My apologies the last second changes. I wasn't thinking clearly when I suggested that we allow times be passed around like values. I'll go through and note the changes necessary to get us to the state suggested above.

Copy link
Contributor

@desa desa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went through and noted the changes that would prevent time.Times from being set as fields.

Sorry again about leading you down the wrong path.

edge/messages.go Outdated
// force converting time.Time fields (other than the built-in time column)
// to an integer value representing a Unix nanoseconds timestamp
// otherwise, they'll be turned into ISO8601 strings during marshalling
timeValue, isTime := v.(time.Time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need this any more.

edge/messages.go Outdated
// force converting time.Time fields (other than the built-in time column)
// to an integer value representing a Unix nanoseconds timestamp
// otherwise, they'll be turned into ISO8601 strings during marshalling
timeValue, isTime := v.(time.Time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need this any more.

@@ -517,7 +517,22 @@ type Point struct {
// Returns byte array of a line protocol representation of the point
func (p Point) Bytes(precision string) []byte {
key := imodels.MakeKey([]byte(p.Name), imodels.NewTags(p.Tags))
fields := imodels.Fields(p.Fields).MarshalBinary()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need this any more.

testStreamerWithOutput(t, "TestStream_EvalNow", script, time.Second, expectedOutput, false, nil)
}

func TestStream_EvalNowInfluxDBOut(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need this any more.

@@ -19,6 +19,7 @@ type Expression interface {
EvalString(scope *Scope) (string, error)
EvalBool(scope *Scope) (bool, error)
EvalDuration(scope *Scope) (time.Duration, error)
EvalTime(scope *Scope) (time.Time, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need this any more.

@@ -84,6 +85,10 @@ func (se *expression) EvalDuration(scope *Scope) (time.Duration, error) {
return se.nodeEvaluator.EvalDuration(scope, se.executionState)
}

func (se *expression) EvalTime(scope *Scope) (time.Time, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need this any more.

@@ -131,6 +136,12 @@ func (se *expression) Eval(scope *Scope) (interface{}, error) {
return nil, err
}
return result, err
case ast.TTime:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need this any more.

Add now() built-in, tick stateless function that returns a time.Time
@mbresson mbresson force-pushed the add-stateless-time-function-now branch from 2ba8c30 to 1342ba3 Compare October 27, 2017 16:37
@mbresson
Copy link
Contributor Author

mbresson commented Oct 27, 2017

No problem, I understand. I have made the relevant changes.

@desa desa merged commit c0d57c8 into influxdata:master Oct 27, 2017
@mbresson mbresson deleted the add-stateless-time-function-now branch October 28, 2017 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants