Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1192 from IRCody/enforce_timeouts
Browse files Browse the repository at this point in the history
Enforce timeouts in plugin clients
  • Loading branch information
IRCody authored Sep 14, 2016
2 parents 9a05d66 + 372cdb1 commit 4ac7b4d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
2 changes: 1 addition & 1 deletion control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (

const (
// DefaultClientTimeout - default timeout for a client connection attempt
DefaultClientTimeout = time.Second * 3
DefaultClientTimeout = time.Second * 10
// DefaultHealthCheckTimeout - default timeout for a health check
DefaultHealthCheckTimeout = time.Second * 1
// DefaultHealthCheckFailureLimit - how any consecutive health check timeouts must occur to trigger a failure
Expand Down
23 changes: 20 additions & 3 deletions control/plugin/client/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type PluginNativeClient struct {
pluginType plugin.PluginType
encoder encoding.Encoder
encrypter *encrypter.Encrypter
timeout time.Duration
}

func NewCollectorNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginCollectorClient, error) {
Expand Down Expand Up @@ -140,6 +141,15 @@ func decodeMetrics(bts []byte) ([]core.Metric, error) {
return cmetrics, nil
}

func enforceTimeout(p *PluginNativeClient, dl time.Duration, done chan int) {
select {
case <-time.After(dl):
p.Kill("Passed deadline")
case <-done:
return
}
}

func (p *PluginNativeClient) Publish(metrics []core.Metric, config map[string]ctypes.ConfigValue) error {

args := plugin.PublishArgs{
Expand All @@ -152,11 +162,12 @@ func (p *PluginNativeClient) Publish(metrics []core.Metric, config map[string]ct
if err != nil {
return err
}

var reply []byte
done := make(chan int)
go enforceTimeout(p, p.timeout, done)
err = p.connection.Call("Publisher.Publish", out, &reply)
close(done)
return err
return nil
}

func (p *PluginNativeClient) Process(metrics []core.Metric, config map[string]ctypes.ConfigValue) ([]core.Metric, error) {
Expand All @@ -173,7 +184,10 @@ func (p *PluginNativeClient) Process(metrics []core.Metric, config map[string]ct
}

var reply []byte
done := make(chan int)
go enforceTimeout(p, p.timeout, done)
err = p.connection.Call("Processor.Process", out, &reply)
close(done)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -211,14 +225,16 @@ func (p *PluginNativeClient) CollectMetrics(mts []core.Metric) ([]core.Metric, e
}

args := plugin.CollectMetricsArgs{MetricTypes: metricsToCollect}

out, err := p.encoder.Encode(args)
if err != nil {
return nil, err
}

var reply []byte
done := make(chan int)
go enforceTimeout(p, p.timeout, done)
err = p.connection.Call("Collector.CollectMetrics", out, &reply)
close(done)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,6 +319,7 @@ func newNativeClient(address string, timeout time.Duration, t plugin.PluginType,
p := &PluginNativeClient{
connection: r,
pluginType: t,
timeout: timeout,
}

p.encoder = encoding.NewGobEncoder()
Expand Down
1 change: 0 additions & 1 deletion control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ const (
MaxPluginRestartCount = 3
)

// TBD
type executablePlugin interface {
Run(time.Duration) (plugin.Response, error)
Kill() error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

var (
PluginName = "snap-plugin-publisher-mock-file"
PluginName = "snap-plugin-publisher-mock-file-grpc"
PluginType = "publisher"
SnapPath = os.ExpandEnv(os.Getenv("SNAP_PATH"))
PluginPath = path.Join(SnapPath, "plugin", PluginName)
Expand Down

0 comments on commit 4ac7b4d

Please sign in to comment.