Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Plugin initiated workflow #1526

Merged
merged 4 commits into from
Mar 3, 2017
Merged

Plugin initiated workflow #1526

merged 4 commits into from
Mar 3, 2017

Conversation

IRCody
Copy link
Contributor

@IRCody IRCody commented Feb 16, 2017

Implementation of #1522

Summary of changes:
Adds Streaming capability to Snap by utilizing grpc stream. Creates a
new rpc type for the grpc stream, and allows for plugins of this type
(streamcollectors) to send metrics to snap on a plugin initiated basis
instead of on Snap's collection interval.

Testing done:

  • Manual

Work still to be done:

  • Test

To try it out requires this PR from the plugin-lib-go.

Currently to run a streaming task you need to start the task via curl since the logic around streaming schedule hasn't been added to snaptel. You can use something like:

curl -vX POST localhost:8181/v1/tasks -d @multi-file.json --header "Content-Type: application/json"

where multi-file.json is:

{
  "version": 1,
  "schedule": {
    "type": "streaming"
  },
  "start": true,
  "max-failures": 10,
  "max-collect-duration": "1m",
  "max-metrics-buffer": 100,
  "workflow": {
    "collect": {
      "metrics": {
        "/random/integer": {},
        "/random/string": {},
        "/random/float": {}
      },
      "publish": [
        {
          "plugin_name": "mock-file",
          "config": {
            "file": "/tmp/stream.log"
          }
        }
      ]
    }
  }
}

@IRCody
Copy link
Contributor Author

IRCody commented Feb 23, 2017

Currently this relies on the plugin-lib-go PR being merged so that it can effectively be tested (since it relies on commits from that one that do not exist in the vendor dir).

return nil, nil, serr
}

cli, ok := p.(*availablePlugin).client.(client.PluginStreamCollectorClient)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guessing that only collector is supported currently? Directly casting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is in StreamMetrics which is only implemented/intended for collectors.

We do verify that the assertion was successful.

return nil, nil, errs
}

if serrs != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mind to add a comment for the differences between errs and serrs? The handling is dramatically different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what you mean about the handling being different. serrs are added to errs if they exist? What behavior are you referring to (I'm probably just not seeing it).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where there is an err, the code immediately returns. serrs appends errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • errs is a slice of errors where we return any errors that happen during this call.
  • err is the critical error coming from the call to subscriptionGroups.Get() (which is that this subscription doesn't exist). It is a stop this call error b/c if the subscription group doesn't exist we have nothing else we can do.
  • serrs is coming from this and represents past errors with this subscription. I'll have to think about if maybe it doesn't make sense to include these? What do you think? cc: @jcooklin.

errs = append(errs, e)
}
}
if len(pluginToMetricMap) > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you like to check this(#1043) before line 1038?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming you didn't mean to link to the issue # 1043. What do you see that would be better by moving this check prior to 1038?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IRCody, saving the effort of 1038 block if 1043 does not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a change that has side effects. Are you trying to make the argument that the serrs returned from the subscriptionGroups.Get() call aren't needed? I'd be interested to hear the reasoning behind it b/c I'm not sure myself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you still want to see previous "serrs", you cannot change it. Otherwise "len(pluginToMetricMap) > 1" is a broken error which should be handled earlier. Anyway, it does not hurt much to keep this way.

if err != nil {
return nil, nil, err
}
err = s.Send(arg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to send arg everytime? will they change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by 'everytime'?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that each send-event, we need to pass the same args. Is it by design?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every call to stream metrics does require the arg to be sent. I'm not sure what you mean by 'send-event'. When establishing the stream connection we do need to send the args.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Snap allows apps changing args on the fly, this is fine. Otherwise, it's redundant to deal with args for every coming stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're misunderstanding how often this is called. StreamMetrics is only called once per task-startup (and potentially again once-per plugin restart if a plugin dies). I don't see how you can start a stream without sending the args.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, if it's only called in starting a stream, it makes sense. Thanks for explaining it.

message CollectArg{
// Request these metrics to be collected on the plugins schedule
MetricsArg Metrics_Arg = 1;
// Set minimum collection duration --duration in ns
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should state 'Set maximum ...'.

Consider:

Set maximum collection duration in ns. The events will be buffered until the max duration is reached and/or the max metric buffer amount is reached. 0 means that events will be sent immediately.

// Set minimum collection duration --duration in ns
int64 MaxCollectDuration = 2;
// Set max number of metrics to buffer before forcing send
// 0 means no forced send
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider:

MaxMetricsBuffer sets the max number of metrics to buffer before sending. Events are sent as soon MaxMetricsBuffer is reached or MaxCollectDuration is exceeded, whichever happens first. If MaxMetricsBuffer is 0 metrics will be immediately sent. If MaxCollectDuration is set to 0 it doesn't matter what MaxMetricsBuffer as events will be sent immediately.

Copy link
Contributor

@candysmurf candysmurf Mar 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A beautiful catch here. Do we need these two MAXs? Will MaxDuration be good enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcooklin: Are you proposing an alternate behavior or trying to discuss possibilities with the current behavior? I was viewing the default state as giving the plugin no restrictions so MaxCollectDuration/MaxMetricsBuffer being set to 0 means the plugin can make it's own determination. Maybe this isn't the intended behavior?

@candysmurf: I think they cover different but partially overlapping use cases.

Copy link
Collaborator

@jcooklin jcooklin Mar 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IRCody I'm proposing how I think it should work. I feel the plugin author should be given a channel on which they place metrics and the lib deals with dispatching to the framework using these rules on MaxMetricsBuffer and MaxCollectDuration. I don't see plugin authors ever having to deal with either of these configurations as it's something the libs should handle for them. BTW, I'm not suggesting that this functionality exist in the go lib in this first PR.

UpdatePluginConfig([]byte) error
UpdateMetricsBuffer(int64) error
UpdateCollectDuration(time.Duration) error
Killed()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need Killed? Seems like this logic (closing kill chan) could be done in the existing grpc client Kill method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grpc kill method isn't called in all cases. Under some circumstances we will kill the plugin process directly and want to ensure that we notify the client/scheduler that it is happening.

address string,
timeout time.Duration,
_ *rsa.PublicKey,
secure bool) (PluginStreamCollectorClient, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the secure arg for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc it's legacy related to matching args with Go RPC calls. Maybe doesn't make sense.

return nil
}

func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan error, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we shouldn't consider that idea that streaming plugins only expose a single metric request. Across the stream we can expect different metrics (namespaces) but a plugin exposing a single streaming metric might simplify the solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you mean. Could you give an example so I can wrap my head around it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind... I made an incorrect assumption.

errChan chan error) {
go func() {
done := false
for !done {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can remove done here.

done := false
for !done {
in, err := g.stream.Recv()
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check for err == io.EOF first so we can break without communicating an error? I'm thinking this would be useful when we send a Kill request to the plugin and it's flushing the remaining events it has queued up.

}()

<-g.killChan
errChan <- errors.New("connection broken")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anytime the killChan is closed we are going to expect the error "connection broken". Is this for debugging?

Copy link
Contributor Author

@IRCody IRCody Mar 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's how we signal the scheduler to wait a bit before retrying to give the plugin time to be restarted. See this.

// wait for a second and then try again until either
// the connection is successful or we pass the
// acceptable number of consecutive failures
time.Sleep(time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider setting a local variable for the amount of time we will wait and use it here and below on line 318. It caught my attention that the amounts were different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good call. We could also have some sort of increasing back off time.

@IRCody IRCody force-pushed the rerestream branch 4 times, most recently from 353fa22 to 4d75359 Compare March 3, 2017 20:03
Copy link
Collaborator

@jcooklin jcooklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Adds Streaming capability to Snap by utilizing grpc stream. Creates a
new rpc type for the grpc stream, and allows for plugins of this type
(streamcollectors) to send metrics to snap on a plugin initiated basis
instead of on Snap's collection interval.

Adds StreamMetrics test and rand test collector

Update glide lock/plugin dependency
@IRCody IRCody merged commit 2304181 into intelsdi-x:master Mar 3, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants