Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
adds ability to turn encryption on and off per plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
pittma committed Sep 28, 2015
1 parent 3e035d0 commit b29baa4
Show file tree
Hide file tree
Showing 16 changed files with 171 additions and 200 deletions.
15 changes: 7 additions & 8 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package control

import (
"crypto/rsa"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -68,7 +67,7 @@ type availablePlugin struct {

// newAvailablePlugin returns an availablePlugin with information from a
// plugin.Response
func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter gomit.Emitter, ep executablePlugin) (*availablePlugin, error) {
func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executablePlugin) (*availablePlugin, error) {
if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType {
return nil, ErrBadType
}
Expand All @@ -90,13 +89,13 @@ func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter
case plugin.CollectorPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
c, e := client.NewCollectorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey)
c, e := client.NewCollectorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.NativeRPC:
c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey)
c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand All @@ -105,13 +104,13 @@ func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter
case plugin.PublisherPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
c, e := client.NewPublisherHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey)
c, e := client.NewPublisherHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.NativeRPC:
c, e := client.NewPublisherNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey)
c, e := client.NewPublisherNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand All @@ -120,13 +119,13 @@ func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter
case plugin.ProcessorPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
c, e := client.NewProcessorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey)
c, e := client.NewProcessorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.NativeRPC:
c, e := client.NewProcessorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey)
c, e := client.NewProcessorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand Down
14 changes: 3 additions & 11 deletions control/available_plugin_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package control

import (
"crypto/rand"
"crypto/rsa"
"errors"
"net"
"testing"
Expand All @@ -14,8 +12,6 @@ import (

func TestAvailablePlugin(t *testing.T) {
Convey("newAvailablePlugin()", t, func() {
key, err := rsa.GenerateKey(rand.Reader, 2048)
So(err, ShouldBeNil)
Convey("returns an availablePlugin", func() {
ln, _ := net.Listen("tcp", ":4000")
defer ln.Close()
Expand All @@ -27,17 +23,15 @@ func TestAvailablePlugin(t *testing.T) {
Type: plugin.CollectorPluginType,
ListenAddress: "127.0.0.1:4000",
}
ap, err := newAvailablePlugin(resp, key, nil, nil)
ap, err := newAvailablePlugin(resp, nil, nil)
So(ap, ShouldHaveSameTypeAs, new(availablePlugin))
So(err, ShouldBeNil)
})
})

Convey("Stop()", t, func() {
Convey("returns nil if plugin successfully stopped", func() {
key, err := rsa.GenerateKey(rand.Reader, 2048)
So(err, ShouldBeNil)
r := newRunner(&routing.RoundRobinStrategy{}, key)
r := newRunner(&routing.RoundRobinStrategy{})
a := plugin.Arg{
PluginLogPath: "/tmp/pulse-test-plugin-stop.log",
}
Expand Down Expand Up @@ -89,8 +83,6 @@ func TestAvailablePlugins(t *testing.T) {
})
})
Convey("it returns an error if client cannot be created", t, func() {
key, err := rsa.GenerateKey(rand.Reader, 2048)
So(err, ShouldBeNil)
resp := &plugin.Response{
Meta: plugin.PluginMeta{
Name: "test",
Expand All @@ -99,7 +91,7 @@ func TestAvailablePlugins(t *testing.T) {
Type: plugin.CollectorPluginType,
ListenAddress: "localhost:",
}
ap, err := newAvailablePlugin(resp, key, nil, nil)
ap, err := newAvailablePlugin(resp, nil, nil)
So(ap, ShouldBeNil)
So(err, ShouldNotBeNil)
})
Expand Down
23 changes: 4 additions & 19 deletions control/control.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package control

import (
"crypto/rand"
"crypto/rsa"
"errors"
"fmt"
"strconv"
Expand All @@ -24,11 +22,7 @@ import (
"github.com/intelsdi-x/pulse/pkg/psigning"
)

// control private key (RSA private key)
// control public key (RSA public key)
// Plugin token = token generated by plugin and passed to control
// Session token = plugin seed encrypted by control private key, verified by plugin using control public key
//

var (
controlLogger = log.WithFields(log.Fields{
Expand All @@ -47,8 +41,6 @@ type pluginControl struct {
Started bool

autodiscoverPaths []string
privKey *rsa.PrivateKey
pubKey *rsa.PublicKey
eventManager *gomit.EventController

pluginManager managesPlugins
Expand Down Expand Up @@ -117,14 +109,7 @@ func CacheExpiration(t time.Duration) controlOpt {
// New returns a new pluginControl instance
func New(opts ...controlOpt) *pluginControl {

key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
panic(err)
}
c := &pluginControl{
pubKey: &key.PublicKey,
privKey: key,
}
c := &pluginControl{}
// Initialize components
//
// Event Manager
Expand All @@ -141,7 +126,7 @@ func New(opts ...controlOpt) *pluginControl {
}).Debug("metric catalog created")

// Plugin Manager
c.pluginManager = newPluginManager(c.pubKey, c.privKey)
c.pluginManager = newPluginManager()
controlLogger.WithFields(log.Fields{
"_block": "new",
}).Debug("plugin manager created")
Expand All @@ -156,7 +141,7 @@ func New(opts ...controlOpt) *pluginControl {

// Plugin Runner
// TODO (danielscottt): handle routing strat changes via events
c.pluginRunner = newRunner(&routing.RoundRobinStrategy{}, c.privKey)
c.pluginRunner = newRunner(&routing.RoundRobinStrategy{})
controlLogger.WithFields(log.Fields{
"_block": "new",
}).Debug("runner created")
Expand All @@ -169,7 +154,7 @@ func New(opts ...controlOpt) *pluginControl {
// Wire event manager

// Start stuff
err = c.pluginRunner.Start()
err := c.pluginRunner.Start()
if err != nil {
panic(err)
}
Expand Down
5 changes: 1 addition & 4 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package control

import (
"bytes"
"crypto/rand"
"crypto/rsa"
"encoding/gob"
"encoding/json"
"errors"
Expand Down Expand Up @@ -363,15 +361,14 @@ func TestStop(t *testing.T) {
}

func TestPluginCatalog(t *testing.T) {
key, _ := rsa.GenerateKey(rand.Reader, 2048)
ts := time.Now()

c := New()

// We need our own plugin manager to drop mock
// loaded plugins into. Aribitrarily adding
// plugins from the pm is no longer supported.
tpm := newPluginManager(&key.PublicKey, key)
tpm := newPluginManager()
c.pluginManager = tpm

lp1 := new(loadedPlugin)
Expand Down
14 changes: 0 additions & 14 deletions control/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,6 @@ import (
. "github.com/smartystreets/goconvey/convey"
)

type mockPluginClient struct{}

func (mp *mockPluginClient) Ping() error {
return nil
}

func (mp *mockPluginClient) Kill(r string) error {
return nil
}

func (mp *mockPluginClient) GetConfigPolicy() error {
return nil
}

func TestMonitor(t *testing.T) {
Convey("monitor", t, func() {
aps := newAvailablePlugins(&routing.RoundRobinStrategy{})
Expand Down
84 changes: 45 additions & 39 deletions control/plugin/client/httpjsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,58 +33,64 @@ type httpJSONRPCClient struct {
}

// NewCollectorHttpJSONRPCClient returns CollectorHttpJSONRPCClient
func NewCollectorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginCollectorClient, error) {
key, err := encrypter.GenerateKey()
if err != nil {
return nil, err
}
e := encrypter.New(pub, priv)
e.Key = key
enc := encoding.NewJsonEncoder()
enc.SetEncrypter(e)
return &httpJSONRPCClient{
func NewCollectorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginCollectorClient, error) {
hjr := &httpJSONRPCClient{
url: u,
timeout: timeout,
pluginType: plugin.CollectorPluginType,
encrypter: e,
encoder: enc,
}, nil
encoder: encoding.NewJsonEncoder(),
}
if secure {
key, err := encrypter.GenerateKey()
if err != nil {
return nil, err
}
e := encrypter.New(pub, nil)
e.Key = key
hjr.encoder.SetEncrypter(e)
hjr.encrypter = e
}
return hjr, nil
}

func NewProcessorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginProcessorClient, error) {
key, err := encrypter.GenerateKey()
if err != nil {
return nil, err
}
e := encrypter.New(pub, priv)
e.Key = key
enc := encoding.NewJsonEncoder()
enc.SetEncrypter(e)
return &httpJSONRPCClient{
func NewProcessorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginProcessorClient, error) {
hjr := &httpJSONRPCClient{
url: u,
timeout: timeout,
pluginType: plugin.ProcessorPluginType,
encrypter: e,
encoder: enc,
}, nil
encoder: encoding.NewJsonEncoder(),
}
if secure {
key, err := encrypter.GenerateKey()
if err != nil {
return nil, err
}
e := encrypter.New(pub, nil)
e.Key = key
hjr.encoder.SetEncrypter(e)
hjr.encrypter = e
}
return hjr, nil
}

func NewPublisherHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginPublisherClient, error) {
key, err := encrypter.GenerateKey()
if err != nil {
return nil, err
}
e := encrypter.New(pub, priv)
e.Key = key
enc := encoding.NewJsonEncoder()
enc.SetEncrypter(e)
return &httpJSONRPCClient{
func NewPublisherHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginPublisherClient, error) {
hjr := &httpJSONRPCClient{
url: u,
timeout: timeout,
pluginType: plugin.PublisherPluginType,
encrypter: e,
encoder: enc,
}, nil
encoder: encoding.NewJsonEncoder(),
}
if secure {
key, err := encrypter.GenerateKey()
if err != nil {
return nil, err
}
e := encrypter.New(pub, nil)
e.Key = key
hjr.encoder.SetEncrypter(e)
hjr.encrypter = e
}
return hjr, nil
}

// Ping
Expand Down
10 changes: 5 additions & 5 deletions control/plugin/client/httpjsonrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

var (
key, _ = rsa.GenerateKey(crand.Reader, 2048)
key, _ = rsa.GenerateKey(crand.Reader, 1024)
symkey, _ = encrypter.GenerateKey()
)

Expand Down Expand Up @@ -122,7 +122,7 @@ func (m *mockSessionStatePluginProxy) Kill(arg []byte, b *[]byte) error {
var httpStarted = false

func startHTTPJSONRPC() (string, *mockSessionStatePluginProxy) {
encr := encrypter.New(&key.PublicKey, key)
encr := encrypter.New(&key.PublicKey, nil)
encr.Key = symkey
ee := encoding.NewJsonEncoder()
ee.SetEncrypter(encr)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestHTTPJSONRPC(t *testing.T) {

Convey("Collector Client", t, func() {
session.c = true
c, err := NewCollectorHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, key)
c, err := NewCollectorHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, true)
So(err, ShouldBeNil)
So(c, ShouldNotBeNil)
cl := c.(*httpJSONRPCClient)
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestHTTPJSONRPC(t *testing.T) {

Convey("Processor Client", t, func() {
session.c = false
p, _ := NewProcessorHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, key)
p, _ := NewProcessorHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, true)
cl := p.(*httpJSONRPCClient)
cl.encrypter.Key = symkey
So(p, ShouldNotBeNil)
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestHTTPJSONRPC(t *testing.T) {

Convey("Publisher Client", t, func() {
session.c = false
p, _ := NewPublisherHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, key)
p, _ := NewPublisherHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, true)
cl := p.(*httpJSONRPCClient)
cl.encrypter.Key = symkey
So(p, ShouldNotBeNil)
Expand Down
Loading

0 comments on commit b29baa4

Please sign in to comment.