Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds control grpc server files
Browse files Browse the repository at this point in the history
Adds control grpc server files to control/

Adds control grpc use on startup along with configuration info
(addr, port) needed for this.

Moves control flags to control/flags.go

Adds grpc configuration options to snapd option parsing (ListenAddr and
port for control grpc server).
  • Loading branch information
IRCody committed Jun 10, 2016
1 parent 4123af2 commit ebdd1fb
Show file tree
Hide file tree
Showing 7 changed files with 655 additions and 26 deletions.
21 changes: 21 additions & 0 deletions control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (

// default configuration values
const (
defaultListenAddr string = "127.0.0.1"
defaultListenPort int = 8082
defaultMaxRunningPlugins int = 3
defaultPluginTrust int = 1
defaultAutoDiscoverPath string = ""
Expand Down Expand Up @@ -73,6 +75,8 @@ type Config struct {
KeyringPaths string `json:"keyring_paths"yaml:"keyring_paths"`
CacheExpiration jsonutil.Duration `json:"cache_expiration"yaml:"cache_expiration"`
Plugins *pluginConfig `json:"plugins"yaml:"plugins"`
ListenAddr string `json:"listen_addr,omitempty"yaml:"listen_addr"`
ListenPort int `json:"listen_port,omitempty"yaml:"listen_port"`
}

const (
Expand Down Expand Up @@ -102,6 +106,12 @@ const (
"type": ["object", "null"],
"properties" : {},
"additionalProperties": true
},
"listen_addr": {
"type": "string"
},
"listen_port": {
"type": "integer"
}
},
"additionalProperties": false
Expand All @@ -112,6 +122,8 @@ const (
// get the default snapd configuration
func GetDefaultConfig() *Config {
return &Config{
ListenAddr: defaultListenAddr,
ListenPort: defaultListenPort,
MaxRunningPlugins: defaultMaxRunningPlugins,
PluginTrust: defaultPluginTrust,
AutoDiscoverPath: defaultAutoDiscoverPath,
Expand Down Expand Up @@ -159,6 +171,15 @@ func (c *Config) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(v, c.Plugins); err != nil {
return err
}
case "listen_addr":
if err := json.Unmarshal(v, &(c.ListenAddr)); err != nil {
return err
}
case "listen_port":
if err := json.Unmarshal(v, &(c.ListenPort)); err != nil {
return err
}

default:
return fmt.Errorf("Unrecognized key '%v' in global config file while parsing 'control'", k)
}
Expand Down
18 changes: 18 additions & 0 deletions control/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ func TestControlConfigJSON(t *testing.T) {
Convey("MaxRunningPlugins should be set to 1", func() {
So(cfg.MaxRunningPlugins, ShouldEqual, 1)
})
Convey("ListenAddr should be set to 0.0.0.0", func() {
So(cfg.ListenAddr, ShouldEqual, "0.0.0.0")
})
Convey("ListenPort should be set to 10082", func() {
So(cfg.ListenPort, ShouldEqual, 10082)
})
Convey("KeyringPaths should be set to /some/path/with/keyring/files", func() {
So(cfg.KeyringPaths, ShouldEqual, "/some/path/with/keyring/files")
})
Expand Down Expand Up @@ -225,6 +231,12 @@ func TestControlConfigYaml(t *testing.T) {
Convey("MaxRunningPlugins should be set to 1", func() {
So(cfg.MaxRunningPlugins, ShouldEqual, 1)
})
Convey("ListenAddr should be set to 0.0.0.0", func() {
So(cfg.ListenAddr, ShouldEqual, "0.0.0.0")
})
Convey("ListenPort should be set to 10082", func() {
So(cfg.ListenPort, ShouldEqual, 10082)
})
Convey("KeyringPaths should be set to /some/path/with/keyring/files", func() {
So(cfg.KeyringPaths, ShouldEqual, "/some/path/with/keyring/files")
})
Expand Down Expand Up @@ -277,6 +289,12 @@ func TestControlDefaultConfig(t *testing.T) {
Convey("MaxRunningPlugins should equal 3", func() {
So(cfg.MaxRunningPlugins, ShouldEqual, 3)
})
Convey("ListenAddr should be set to 127.0.0.1", func() {
So(cfg.ListenAddr, ShouldEqual, "127.0.0.1")
})
Convey("ListenPort should be set to 8082", func() {
So(cfg.ListenPort, ShouldEqual, 8082)
})
Convey("KeyringPaths should be empty", func() {
So(cfg.KeyringPaths, ShouldEqual, "")
})
Expand Down
85 changes: 62 additions & 23 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"

"google.golang.org/grpc"

log "github.com/Sirupsen/logrus"

"github.com/intelsdi-x/gomit"
Expand All @@ -42,6 +45,7 @@ import (
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/grpc/controlproxy/rpc"
"github.com/intelsdi-x/snap/pkg/aci"
"github.com/intelsdi-x/snap/pkg/psigning"
)
Expand All @@ -64,7 +68,7 @@ var (
ErrLoadedPluginNotFound = errors.New("Loaded plugin not found")

// ErrControllerNotStarted - error message when the Controller was not started
ErrControllerNotStarted = errors.New("Must start Controller before calling Load()")
ErrControllerNotStarted = errors.New("Must start Controller before use")
)

type executablePlugins []plugin.ExecutablePlugin
Expand Down Expand Up @@ -236,6 +240,7 @@ func (p *pluginControl) Start() error {
controlLogger.WithFields(log.Fields{
"_block": "start",
}).Info("control started")

//Autodiscover
if p.Config.AutoDiscoverPath != "" {
controlLogger.WithFields(log.Fields{
Expand Down Expand Up @@ -323,6 +328,23 @@ func (p *pluginControl) Start() error {
"_block": "start",
}).Info("auto discover path is disabled")
}

lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v", p.Config.ListenAddr, p.Config.ListenPort))
if err != nil {
controlLogger.WithField("error", err.Error()).Error("Failed to start control grpc listener")
return err
}

opts := []grpc.ServerOption{}
grpcServer := grpc.NewServer(opts...)
rpc.RegisterMetricManagerServer(grpcServer, &ControlGRPCServer{p})
go func() {
err := grpcServer.Serve(lis)
if err != nil {
controlLogger.Fatal(err)
}
}()

return nil
}

Expand Down Expand Up @@ -731,33 +753,35 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]gatheredPlugin, [

func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
var serrs []serror.SnapError
collectors, errs := p.gatherCollectors(mts)
if len(errs) > 0 {
serrs = append(serrs, errs...)
}

for _, gc := range collectors {
pool, err := p.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s:%s:%d", gc.plugin.TypeName(), gc.plugin.Name(), gc.plugin.Version()))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
if len(mts) != 0 {
collectors, errs := p.gatherCollectors(mts)
if len(errs) > 0 {
serrs = append(serrs, errs...)
}
pool.Subscribe(taskID, gc.subscriptionType)
if pool.Eligible() {
err = p.verifyPlugin(gc.plugin.(*loadedPlugin))

for _, gc := range collectors {
pool, err := p.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s:%s:%d", gc.plugin.TypeName(), gc.plugin.Name(), gc.plugin.Version()))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
err = p.pluginRunner.runPlugin(gc.plugin.(*loadedPlugin).Details)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
pool.Subscribe(taskID, gc.subscriptionType)
if pool.Eligible() {
err = p.verifyPlugin(gc.plugin.(*loadedPlugin))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
err = p.pluginRunner.runPlugin(gc.plugin.(*loadedPlugin).Details)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
}
serr := p.sendPluginSubscriptionEvent(taskID, gc.plugin)
if serr != nil {
serrs = append(serrs, serr)
}
}
serr := p.sendPluginSubscriptionEvent(taskID, gc.plugin)
if serr != nil {
serrs = append(serrs, serr)
}
}
for _, sub := range plugins {
Expand Down Expand Up @@ -819,7 +843,6 @@ func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins
serrs = append(serrs, serr)
}
}

return serrs
}

Expand Down Expand Up @@ -988,6 +1011,12 @@ func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool {
// of metrics and errors. If an error is encountered no metrics will be
// returned.
func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string, allTags map[string]map[string]string) (metrics []core.Metric, errs []error) {
// If control is not started we don't want tasks to be able to
// go through a workflow.
if !p.Started {
return nil, []error{ErrControllerNotStarted}
}

for ns, nsTags := range allTags {
for k, v := range nsTags {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -1064,6 +1093,11 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.

// PublishMetrics
func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
// If control is not started we don't want tasks to be able to
// go through a workflow.
if !p.Started {
return []error{ErrControllerNotStarted}
}
// merge global plugin config into the config for this request
// without over-writing the task specific config
cfg := p.Config.Plugins.getPluginConfigDataNode(core.PublisherPluginType, pluginName, pluginVersion).Table()
Expand All @@ -1080,6 +1114,11 @@ func (p *pluginControl) PublishMetrics(contentType string, content []byte, plugi

// ProcessMetrics
func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) {
// If control is not started we don't want tasks to be able to
// go through a workflow.
if !p.Started {
return "", nil, []error{ErrControllerNotStarted}
}
// merge global plugin config into the config for this request
// without over-writing the task specific config
cfg := p.Config.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, pluginName, pluginVersion).Table()
Expand Down
Loading

0 comments on commit ebdd1fb

Please sign in to comment.