Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds timeout to grpc calls to enable failures
Browse files Browse the repository at this point in the history
Adds timeout to context used with grpc calls to enable proper failing
when call is unable to connect after a set timeout.

Modifies test to utilize this timeout.
  • Loading branch information
IRCody committed Jun 10, 2016
1 parent 54fa222 commit 6061a44
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
26 changes: 16 additions & 10 deletions grpc/controlproxy/controlproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ import (
"github.com/intelsdi-x/snap/pkg/rpcutil"
)

var (
MAX_CONNECTION_TIMEOUT = 10 * time.Second
)

// Implements managesMetrics interface provided by scheduler and
// proxies those calls to the grpc client.
type ControlProxy struct {
Client rpc.MetricManagerClient
ctx context.Context
}

func New(addr string, port int) (ControlProxy, error) {
Expand All @@ -44,14 +49,15 @@ func New(addr string, port int) (ControlProxy, error) {
return ControlProxy{}, err
}
c := rpc.NewMetricManagerClient(conn)
return ControlProxy{Client: c}, nil
cd, _ := context.WithTimeout(context.Background(), MAX_CONNECTION_TIMEOUT)
return ControlProxy{Client: c, ctx: cd}, nil
}

func (c ControlProxy) ExpandWildcards(namespace core.Namespace) ([]core.Namespace, serror.SnapError) {
req := &rpc.ExpandWildcardsRequest{
Namespace: common.ToNamespace(namespace),
}
reply, err := c.Client.ExpandWildcards(context.Background(), req)
reply, err := c.Client.ExpandWildcards(c.ctx, req)
if err != nil {
return nil, serror.New(err)
}
Expand All @@ -63,7 +69,7 @@ func (c ControlProxy) ExpandWildcards(namespace core.Namespace) ([]core.Namespac
}
func (c ControlProxy) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
req := GetPubProcReq(contentType, content, pluginName, pluginVersion, config, taskID)
reply, err := c.Client.PublishMetrics(context.Background(), req)
reply, err := c.Client.PublishMetrics(c.ctx, req)
var errs []error
if err != nil {
errs = append(errs, err)
Expand All @@ -76,7 +82,7 @@ func (c ControlProxy) PublishMetrics(contentType string, content []byte, pluginN

func (c ControlProxy) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) {
req := GetPubProcReq(contentType, content, pluginName, pluginVersion, config, taskID)
reply, err := c.Client.ProcessMetrics(context.Background(), req)
reply, err := c.Client.ProcessMetrics(c.ctx, req)
var errs []error
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -109,7 +115,7 @@ func (c ControlProxy) CollectMetrics(mts []core.Metric, deadline time.Time, task
TaskID: taskID,
AllTags: allTags,
}
reply, err := c.Client.CollectMetrics(context.Background(), req)
reply, err := c.Client.CollectMetrics(c.ctx, req)
var errs []error
if err != nil {
errs = append(errs, err)
Expand All @@ -130,7 +136,7 @@ func (c ControlProxy) GetPluginContentTypes(n string, t core.PluginType, v int)
PluginType: getPluginType(t),
Version: int32(v),
}
reply, err := c.Client.GetPluginContentTypes(context.Background(), req)
reply, err := c.Client.GetPluginContentTypes(c.ctx, req)
if err != nil {
return nil, nil, err
}
Expand All @@ -145,7 +151,7 @@ func (c ControlProxy) ValidateDeps(mts []core.Metric, plugins []core.SubscribedP
Metrics: common.NewMetrics(mts),
Plugins: common.ToSubPluginsMsg(plugins),
}
reply, err := c.Client.ValidateDeps(context.Background(), req)
reply, err := c.Client.ValidateDeps(c.ctx, req)
if err != nil {
return []serror.SnapError{serror.New(err)}
}
Expand All @@ -155,7 +161,7 @@ func (c ControlProxy) ValidateDeps(mts []core.Metric, plugins []core.SubscribedP

func (c ControlProxy) SubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
req := depsRequest(taskID, mts, plugins)
reply, err := c.Client.SubscribeDeps(context.Background(), req)
reply, err := c.Client.SubscribeDeps(c.ctx, req)
if err != nil {
return []serror.SnapError{serror.New(err)}
}
Expand All @@ -165,7 +171,7 @@ func (c ControlProxy) SubscribeDeps(taskID string, mts []core.Metric, plugins []

func (c ControlProxy) UnsubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
req := depsRequest(taskID, mts, plugins)
reply, err := c.Client.UnsubscribeDeps(context.Background(), req)
reply, err := c.Client.UnsubscribeDeps(c.ctx, req)
if err != nil {
return []serror.SnapError{serror.New(err)}
}
Expand All @@ -177,7 +183,7 @@ func (c ControlProxy) MatchQueryToNamespaces(namespace core.Namespace) ([]core.N
req := &rpc.ExpandWildcardsRequest{
Namespace: common.ToNamespace(namespace),
}
reply, err := c.Client.MatchQueryToNamespaces(context.Background(), req)
reply, err := c.Client.MatchQueryToNamespaces(c.ctx, req)
if err != nil {
return nil, serror.New(err)
}
Expand Down
3 changes: 3 additions & 0 deletions scheduler/distributed_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/intelsdi-x/snap/control"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/grpc/controlproxy"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/wmap"
. "github.com/smartystreets/goconvey/convey"
Expand Down Expand Up @@ -92,6 +93,7 @@ func TestDistributedWorkflow(t *testing.T) {

Convey("Test task with invalid remote port", func() {
wf := dsWFMap(0)
controlproxy.MAX_CONNECTION_TIMEOUT = 1 * time.Second
t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true)
So(len(errs.Errors()), ShouldEqual, 1)
So(t, ShouldBeNil)
Expand All @@ -108,6 +110,7 @@ func TestDistributedWorkflow(t *testing.T) {

Convey("Test task failing when control is stopped while task is running", func() {
wf := dsWFMap(port1)
controlproxy.MAX_CONNECTION_TIMEOUT = 10 * time.Second
interval := time.Millisecond * 100
t, errs := sch.CreateTask(schedule.NewSimpleSchedule(interval), wf, true)
So(len(errs.Errors()), ShouldEqual, 0)
Expand Down

0 comments on commit 6061a44

Please sign in to comment.