-
Notifications
You must be signed in to change notification settings - Fork 294
Conversation
Currently this relies on the plugin-lib-go PR being merged so that it can effectively be tested (since it relies on commits from that one that do not exist in the vendor dir). |
return nil, nil, serr | ||
} | ||
|
||
cli, ok := p.(*availablePlugin).client.(client.PluginStreamCollectorClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guessing that only collector is supported currently? Directly casting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is in StreamMetrics which is only implemented/intended for collectors.
We do verify that the assertion was successful.
return nil, nil, errs | ||
} | ||
|
||
if serrs != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mind to add a comment for the differences between errs and serrs? The handling is dramatically different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see what you mean about the handling being different. serrs are added to errs if they exist? What behavior are you referring to (I'm probably just not seeing it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where there is an err, the code immediately returns. serrs appends errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- errs is a slice of errors where we return any errors that happen during this call.
- err is the critical error coming from the call to subscriptionGroups.Get() (which is that this subscription doesn't exist). It is a stop this call error b/c if the subscription group doesn't exist we have nothing else we can do.
- serrs is coming from this and represents past errors with this subscription. I'll have to think about if maybe it doesn't make sense to include these? What do you think? cc: @jcooklin.
errs = append(errs, e) | ||
} | ||
} | ||
if len(pluginToMetricMap) > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you like to check this(#1043) before line 1038?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming you didn't mean to link to the issue # 1043. What do you see that would be better by moving this check prior to 1038?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IRCody, saving the effort of 1038 block if 1043 does not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a change that has side effects. Are you trying to make the argument that the serrs returned from the subscriptionGroups.Get() call aren't needed? I'd be interested to hear the reasoning behind it b/c I'm not sure myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you still want to see previous "serrs", you cannot change it. Otherwise "len(pluginToMetricMap) > 1" is a broken error which should be handled earlier. Anyway, it does not hurt much to keep this way.
if err != nil { | ||
return nil, nil, err | ||
} | ||
err = s.Send(arg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to send arg everytime? will they change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by 'everytime'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that each send-event, we need to pass the same args. Is it by design?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every call to stream metrics does require the arg to be sent. I'm not sure what you mean by 'send-event'. When establishing the stream connection we do need to send the args.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Snap allows apps changing args on the fly, this is fine. Otherwise, it's redundant to deal with args for every coming stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're misunderstanding how often this is called. StreamMetrics is only called once per task-startup (and potentially again once-per plugin restart if a plugin dies). I don't see how you can start a stream without sending the args.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, if it's only called in starting a stream, it makes sense. Thanks for explaining it.
control/plugin/rpc/plugin.proto
Outdated
message CollectArg{ | ||
// Request these metrics to be collected on the plugins schedule | ||
MetricsArg Metrics_Arg = 1; | ||
// Set minimum collection duration --duration in ns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should state 'Set maximum ...'.
Consider:
Set maximum collection duration in ns. The events will be buffered until the max duration is reached and/or the max metric buffer amount is reached. 0 means that events will be sent immediately.
control/plugin/rpc/plugin.proto
Outdated
// Set minimum collection duration --duration in ns | ||
int64 MaxCollectDuration = 2; | ||
// Set max number of metrics to buffer before forcing send | ||
// 0 means no forced send |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider:
MaxMetricsBuffer
sets the max number of metrics to buffer before sending. Events are sent as soon MaxMetricsBuffer
is reached or MaxCollectDuration
is exceeded, whichever happens first. If MaxMetricsBuffer is 0 metrics will be immediately sent. If MaxCollectDuration
is set to 0 it doesn't matter what MaxMetricsBuffer
as events will be sent immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A beautiful catch here. Do we need these two MAXs? Will MaxDuration be good enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jcooklin: Are you proposing an alternate behavior or trying to discuss possibilities with the current behavior? I was viewing the default state as giving the plugin no restrictions so MaxCollectDuration/MaxMetricsBuffer being set to 0 means the plugin can make it's own determination. Maybe this isn't the intended behavior?
@candysmurf: I think they cover different but partially overlapping use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IRCody I'm proposing how I think it should work. I feel the plugin author should be given a channel on which they place metrics and the lib deals with dispatching to the framework using these rules on MaxMetricsBuffer and MaxCollectDuration. I don't see plugin authors ever having to deal with either of these configurations as it's something the libs should handle for them. BTW, I'm not suggesting that this functionality exist in the go lib in this first PR.
UpdatePluginConfig([]byte) error | ||
UpdateMetricsBuffer(int64) error | ||
UpdateCollectDuration(time.Duration) error | ||
Killed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need Killed? Seems like this logic (closing kill chan) could be done in the existing grpc client Kill method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The grpc kill method isn't called in all cases. Under some circumstances we will kill the plugin process directly and want to ensure that we notify the client/scheduler that it is happening.
address string, | ||
timeout time.Duration, | ||
_ *rsa.PublicKey, | ||
secure bool) (PluginStreamCollectorClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the secure
arg for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iirc it's legacy related to matching args with Go RPC calls. Maybe doesn't make sense.
return nil | ||
} | ||
|
||
func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan error, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we shouldn't consider that idea that streaming plugins only expose a single metric request. Across the stream we can expect different metrics (namespaces) but a plugin exposing a single streaming metric might simplify the solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what you mean. Could you give an example so I can wrap my head around it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind... I made an incorrect assumption.
control/plugin/client/grpc.go
Outdated
errChan chan error) { | ||
go func() { | ||
done := false | ||
for !done { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can remove done
here.
control/plugin/client/grpc.go
Outdated
done := false | ||
for !done { | ||
in, err := g.stream.Recv() | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check for err == io.EOF
first so we can break without communicating an error? I'm thinking this would be useful when we send a Kill
request to the plugin and it's flushing the remaining events it has queued up.
}() | ||
|
||
<-g.killChan | ||
errChan <- errors.New("connection broken") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anytime the killChan is closed we are going to expect the error "connection broken". Is this for debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's how we signal the scheduler to wait a bit before retrying to give the plugin time to be restarted. See this.
scheduler/task.go
Outdated
// wait for a second and then try again until either | ||
// the connection is successful or we pass the | ||
// acceptable number of consecutive failures | ||
time.Sleep(time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider setting a local variable for the amount of time we will wait and use it here and below on line 318. It caught my attention that the amounts were different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good call. We could also have some sort of increasing back off time.
353fa22
to
4d75359
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Adds Streaming capability to Snap by utilizing grpc stream. Creates a new rpc type for the grpc stream, and allows for plugins of this type (streamcollectors) to send metrics to snap on a plugin initiated basis instead of on Snap's collection interval. Adds StreamMetrics test and rand test collector Update glide lock/plugin dependency
Implementation of #1522
Summary of changes:
Adds Streaming capability to Snap by utilizing grpc stream. Creates a
new rpc type for the grpc stream, and allows for plugins of this type
(streamcollectors) to send metrics to snap on a plugin initiated basis
instead of on Snap's collection interval.
Testing done:
Work still to be done:
To try it out requires this PR from the plugin-lib-go.
Currently to run a streaming task you need to start the task via curl since the logic around streaming schedule hasn't been added to snaptel. You can use something like:
curl -vX POST localhost:8181/v1/tasks -d @multi-file.json --header "Content-Type: application/json"
where multi-file.json is: