-
Notifications
You must be signed in to change notification settings - Fork 492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: tricklenode #2530
feat: tricklenode #2530
Conversation
pipeline/tick/tick_test.go
Outdated
@@ -8,6 +8,8 @@ import ( | |||
"testing" | |||
"time" | |||
|
|||
"github.com/google/go-cmp/cmp" | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs reformatted
|
||
func (n *TrickleNode) runTrickle(_ []byte) error { | ||
consumer := edge.NewConsumerWithReceiver( | ||
n.ins[0], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we enforce anywhere that len(ins) == 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in the attachment, in the tick code.
|
||
// BatchPoint forwards a PointMessage | ||
func (n *TrickleNode) BatchPoint(bp edge.BatchPointMessage) error { | ||
return n.outs[0].Collect(edge.NewPointMessage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question with len(outs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in the attachment, in the tick code. if len is not 1, we will have errors way before it gets to here.
data | ||
|trickle() | ||
|window().period(10s) | ||
|httpOut('TestBatch_Trickle')` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking my understanding: this test is showing that trickle()
followed by window
is a way to re-batch data? i.e. in this case, re-batching from 4s windows into 10s windows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also that the results of trickle are unbached as window cannot accept batch data, but the inputs to trickle are batched.
* feat: tricklenode * chore: update changelog * test: fix TestTrickle ast test * chore: cleanup imports
This creates a node that converts from a batch to a stream, it could be thought of as the inverse of windownode.