From d4b65a5d81ad0c66fae013be2c9f196697acdef6 Mon Sep 17 00:00:00 2001 From: Cody Roseborough Date: Wed, 15 Feb 2017 11:33:01 -0800 Subject: [PATCH] Adds grpc streaming client --- control/plugin/client/client.go | 13 +++ control/plugin/client/grpc.go | 182 ++++++++++++++++++++++++++++++-- 2 files changed, 189 insertions(+), 6 deletions(-) diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index a72259326..512a88440 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -20,6 +20,8 @@ limitations under the License. package client import ( + "time" + "github.com/intelsdi-x/snap/control/plugin" "github.com/intelsdi-x/snap/control/plugin/cpolicy" "github.com/intelsdi-x/snap/core" @@ -41,6 +43,17 @@ type PluginCollectorClient interface { GetMetricTypes(plugin.ConfigType) ([]core.Metric, error) } +type PluginStreamCollectorClient interface { + PluginClient + StreamMetrics([]core.Metric) (chan []core.Metric, chan error, error) + GetMetricTypes(plugin.ConfigType) ([]core.Metric, error) + UpdateCollectedMetrics([]core.Metric) error + UpdatePluginConfig([]byte) error + UpdateMetricsBuffer(int64) error + UpdateCollectDuration(time.Duration) error + Killed() +} + // PluginProcessorClient A client providing processor specific plugin method calls. type PluginProcessorClient interface { PluginClient diff --git a/control/plugin/client/grpc.go b/control/plugin/client/grpc.go index 064514be0..6d4c7e6c6 100644 --- a/control/plugin/client/grpc.go +++ b/control/plugin/client/grpc.go @@ -50,10 +50,16 @@ type pluginClient interface { } type grpcClient struct { - collector rpc.CollectorClient - processor rpc.ProcessorClient - publisher rpc.PublisherClient - plugin pluginClient + collector rpc.CollectorClient + streamCollector rpc.StreamCollectorClient + processor rpc.ProcessorClient + publisher rpc.PublisherClient + plugin pluginClient + + // Channel used to signal death of stream collector to scheduler + killChan chan struct{} + // stream connection to stream collector + stream rpc.StreamCollector_StreamMetricsClient pluginType plugin.PluginType timeout time.Duration @@ -75,6 +81,29 @@ func NewCollectorGrpcClient(address string, timeout time.Duration, pub *rsa.Publ return p, nil } +// NewStreamCollectorGrpcClient returns a stream collector gRPC client +func NewStreamCollectorGrpcClient( + address string, + timeout time.Duration, + _ *rsa.PublicKey, + secure bool) (PluginStreamCollectorClient, error) { + address, port, err := parseAddress(address) + if err != nil { + return nil, err + } + p, err := newGrpcClient( + address, + int(port), + timeout, + plugin.StreamCollectorPluginType) + p.killChan = make(chan struct{}) + if err != nil { + return nil, err + } + + return p, nil +} + // NewProcessorGrpcClient returns a processor gRPC Client. func NewProcessorGrpcClient(address string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginProcessorClient, error) { address, port, err := parseAddress(address) @@ -148,6 +177,9 @@ func newGrpcClient(addr string, port int, timeout time.Duration, typ plugin.Plug case plugin.CollectorPluginType: p.collector = rpc.NewCollectorClient(conn) p.plugin = p.collector + case plugin.StreamCollectorPluginType: + p.streamCollector = rpc.NewStreamCollectorClient(conn) + p.plugin = p.streamCollector case plugin.ProcessorPluginType: p.processor = rpc.NewProcessorClient(conn) p.plugin = p.processor @@ -179,7 +211,15 @@ func (g *grpcClient) SetKey() error { return nil } +// Killed closes the killChan for a streaming rpc +func (g *grpcClient) Killed() { + if g.killChan != nil { + close(g.killChan) + } +} + func (g *grpcClient) Kill(reason string) error { + _, err := g.plugin.Kill(getContext(g.timeout), &rpc.KillArg{Reason: reason}) g.conn.Close() if err != nil { @@ -241,12 +281,142 @@ func (g *grpcClient) CollectMetrics(mts []core.Metric) ([]core.Metric, error) { return metrics, nil } +func (g *grpcClient) UpdateCollectedMetrics(mts []core.Metric) error { + if g.stream != nil { + arg := &rpc.CollectArg{ + Metrics_Arg: &rpc.MetricsArg{Metrics: NewMetrics(mts)}, + } + err := g.stream.Send(arg) + if err != nil { + return err + } + } + return nil +} +func (g *grpcClient) UpdatePluginConfig(bytes []byte) error { + if g.stream != nil { + arg := &rpc.CollectArg{ + Other: bytes, + } + err := g.stream.Send(arg) + if err != nil { + return err + } + } + return nil +} +func (g *grpcClient) UpdateCollectDuration(maxCollectDuration time.Duration) error { + if g.stream != nil { + arg := &rpc.CollectArg{ + MaxCollectDuration: maxCollectDuration.Nanoseconds(), + } + err := g.stream.Send(arg) + if err != nil { + return err + } + } + return nil +} +func (g *grpcClient) UpdateMetricsBuffer(maxMetricsBuffer int64) error { + if g.stream != nil { + arg := &rpc.CollectArg{ + MaxMetricsBuffer: maxMetricsBuffer, + } + err := g.stream.Send(arg) + if err != nil { + return err + } + } + return nil +} + +func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan error, error) { + arg := &rpc.CollectArg{ + Metrics_Arg: &rpc.MetricsArg{Metrics: NewMetrics(mts)}, + } + if len(mts) == 0 { + return nil, nil, errors.New("No metrics requested to stream") + } + + mt := mts[0] + if cfg := mt.Config(); cfg != nil { + values := cfg.Table() + if values != nil { + maxCollectDuration, ok := values["MaxCollectDuration"] + if ok { + t, ok := maxCollectDuration.(*ctypes.ConfigValueInt) + if ok { + arg.MaxCollectDuration = int64(t.Value) + } + } + maxMetricsBuffer, ok := values["MaxMetricsBuffer"] + if ok { + t, ok := maxMetricsBuffer.(*ctypes.ConfigValueInt) + if ok { + arg.MaxMetricsBuffer = int64(t.Value) + } + } + } + } + + s, err := g.streamCollector.StreamMetrics(context.Background()) + if err != nil { + return nil, nil, err + } + err = s.Send(arg) + if err != nil { + return nil, nil, err + } + metricChan := make(chan []core.Metric) + errChan := make(chan error) + doneChan := make(chan struct{}) + g.killChan = doneChan + g.stream = s + go g.handleInStream(metricChan, errChan) + return metricChan, errChan, nil +} + +func (g *grpcClient) handleInStream( + metricChan chan []core.Metric, + errChan chan error) { + go func() { + done := false + for !done { + in, err := g.stream.Recv() + if err != nil { + errChan <- err + break + } + if in.Metrics_Reply != nil { + mts := ToCoreMetrics(in.Metrics_Reply.Metrics) + if len(mts) == 0 { + // skip empty metrics + continue + } + metricChan <- mts + } else if in.Error != nil { + e := errors.New(in.Error.Error) + errChan <- e + } + } + }() + + <-g.killChan + errChan <- errors.New("connection broken") + +} + func (g *grpcClient) GetMetricTypes(config plugin.ConfigType) ([]core.Metric, error) { arg := &rpc.GetMetricTypesArg{ Config: ToConfigMap(config.Table()), } - reply, err := g.collector.GetMetricTypes(getContext(g.timeout), arg) - + var reply *rpc.MetricsReply + var err error + if g.streamCollector != nil { + reply, err = g.streamCollector.GetMetricTypes(getContext(g.timeout), arg) + } else { + reply, err = g.collector.GetMetricTypes(getContext(g.timeout), arg) + } if err != nil { return nil, err }