Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds grpc streaming client
Browse files Browse the repository at this point in the history
  • Loading branch information
croseborough committed Mar 3, 2017
1 parent 94c5bfc commit d4b65a5
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 6 deletions.
13 changes: 13 additions & 0 deletions control/plugin/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ limitations under the License.
package client

import (
"time"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core"
Expand All @@ -41,6 +43,17 @@ type PluginCollectorClient interface {
GetMetricTypes(plugin.ConfigType) ([]core.Metric, error)
}

type PluginStreamCollectorClient interface {
PluginClient
StreamMetrics([]core.Metric) (chan []core.Metric, chan error, error)
GetMetricTypes(plugin.ConfigType) ([]core.Metric, error)
UpdateCollectedMetrics([]core.Metric) error
UpdatePluginConfig([]byte) error
UpdateMetricsBuffer(int64) error
UpdateCollectDuration(time.Duration) error
Killed()
}

// PluginProcessorClient A client providing processor specific plugin method calls.
type PluginProcessorClient interface {
PluginClient
Expand Down
182 changes: 176 additions & 6 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,16 @@ type pluginClient interface {
}

type grpcClient struct {
collector rpc.CollectorClient
processor rpc.ProcessorClient
publisher rpc.PublisherClient
plugin pluginClient
collector rpc.CollectorClient
streamCollector rpc.StreamCollectorClient
processor rpc.ProcessorClient
publisher rpc.PublisherClient
plugin pluginClient

// Channel used to signal death of stream collector to scheduler
killChan chan struct{}
// stream connection to stream collector
stream rpc.StreamCollector_StreamMetricsClient

pluginType plugin.PluginType
timeout time.Duration
Expand All @@ -75,6 +81,29 @@ func NewCollectorGrpcClient(address string, timeout time.Duration, pub *rsa.Publ
return p, nil
}

// NewStreamCollectorGrpcClient returns a stream collector gRPC client
func NewStreamCollectorGrpcClient(
address string,
timeout time.Duration,
_ *rsa.PublicKey,
secure bool) (PluginStreamCollectorClient, error) {
address, port, err := parseAddress(address)
if err != nil {
return nil, err
}
p, err := newGrpcClient(
address,
int(port),
timeout,
plugin.StreamCollectorPluginType)
p.killChan = make(chan struct{})
if err != nil {
return nil, err
}

return p, nil
}

// NewProcessorGrpcClient returns a processor gRPC Client.
func NewProcessorGrpcClient(address string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginProcessorClient, error) {
address, port, err := parseAddress(address)
Expand Down Expand Up @@ -148,6 +177,9 @@ func newGrpcClient(addr string, port int, timeout time.Duration, typ plugin.Plug
case plugin.CollectorPluginType:
p.collector = rpc.NewCollectorClient(conn)
p.plugin = p.collector
case plugin.StreamCollectorPluginType:
p.streamCollector = rpc.NewStreamCollectorClient(conn)
p.plugin = p.streamCollector
case plugin.ProcessorPluginType:
p.processor = rpc.NewProcessorClient(conn)
p.plugin = p.processor
Expand Down Expand Up @@ -179,7 +211,15 @@ func (g *grpcClient) SetKey() error {
return nil
}

// Killed closes the killChan for a streaming rpc
func (g *grpcClient) Killed() {
if g.killChan != nil {
close(g.killChan)
}
}

func (g *grpcClient) Kill(reason string) error {

_, err := g.plugin.Kill(getContext(g.timeout), &rpc.KillArg{Reason: reason})
g.conn.Close()
if err != nil {
Expand Down Expand Up @@ -241,12 +281,142 @@ func (g *grpcClient) CollectMetrics(mts []core.Metric) ([]core.Metric, error) {
return metrics, nil
}

func (g *grpcClient) UpdateCollectedMetrics(mts []core.Metric) error {
if g.stream != nil {
arg := &rpc.CollectArg{
Metrics_Arg: &rpc.MetricsArg{Metrics: NewMetrics(mts)},
}
err := g.stream.Send(arg)
if err != nil {
return err
}
}
return nil
}
func (g *grpcClient) UpdatePluginConfig(bytes []byte) error {
if g.stream != nil {
arg := &rpc.CollectArg{
Other: bytes,
}
err := g.stream.Send(arg)
if err != nil {
return err
}
}
return nil
}
func (g *grpcClient) UpdateCollectDuration(maxCollectDuration time.Duration) error {
if g.stream != nil {
arg := &rpc.CollectArg{
MaxCollectDuration: maxCollectDuration.Nanoseconds(),
}
err := g.stream.Send(arg)
if err != nil {
return err
}
}
return nil
}
func (g *grpcClient) UpdateMetricsBuffer(maxMetricsBuffer int64) error {
if g.stream != nil {
arg := &rpc.CollectArg{
MaxMetricsBuffer: maxMetricsBuffer,
}
err := g.stream.Send(arg)
if err != nil {
return err
}
}
return nil
}

func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan error, error) {
arg := &rpc.CollectArg{
Metrics_Arg: &rpc.MetricsArg{Metrics: NewMetrics(mts)},
}
if len(mts) == 0 {
return nil, nil, errors.New("No metrics requested to stream")
}

mt := mts[0]
if cfg := mt.Config(); cfg != nil {
values := cfg.Table()
if values != nil {
maxCollectDuration, ok := values["MaxCollectDuration"]
if ok {
t, ok := maxCollectDuration.(*ctypes.ConfigValueInt)
if ok {
arg.MaxCollectDuration = int64(t.Value)
}
}
maxMetricsBuffer, ok := values["MaxMetricsBuffer"]
if ok {
t, ok := maxMetricsBuffer.(*ctypes.ConfigValueInt)
if ok {
arg.MaxMetricsBuffer = int64(t.Value)
}
}
}
}

s, err := g.streamCollector.StreamMetrics(context.Background())
if err != nil {
return nil, nil, err
}
err = s.Send(arg)
if err != nil {
return nil, nil, err
}
metricChan := make(chan []core.Metric)
errChan := make(chan error)
doneChan := make(chan struct{})
g.killChan = doneChan
g.stream = s
go g.handleInStream(metricChan, errChan)
return metricChan, errChan, nil
}

func (g *grpcClient) handleInStream(
metricChan chan []core.Metric,
errChan chan error) {
go func() {
done := false
for !done {
in, err := g.stream.Recv()
if err != nil {
errChan <- err
break
}
if in.Metrics_Reply != nil {
mts := ToCoreMetrics(in.Metrics_Reply.Metrics)
if len(mts) == 0 {
// skip empty metrics
continue
}
metricChan <- mts
} else if in.Error != nil {
e := errors.New(in.Error.Error)
errChan <- e
}
}
}()

<-g.killChan
errChan <- errors.New("connection broken")

}

func (g *grpcClient) GetMetricTypes(config plugin.ConfigType) ([]core.Metric, error) {
arg := &rpc.GetMetricTypesArg{
Config: ToConfigMap(config.Table()),
}
reply, err := g.collector.GetMetricTypes(getContext(g.timeout), arg)

var reply *rpc.MetricsReply
var err error
if g.streamCollector != nil {
reply, err = g.streamCollector.GetMetricTypes(getContext(g.timeout), arg)
} else {
reply, err = g.collector.GetMetricTypes(getContext(g.timeout), arg)
}
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d4b65a5

Please sign in to comment.