From 0a30d04881872a17fef708ba725b4e418a263ddb Mon Sep 17 00:00:00 2001 From: Kelly Lyon Date: Fri, 14 Oct 2016 15:16:41 -0700 Subject: [PATCH] Separated Go RPC code from other code: - Identified code used by GoRPC plugins - Created files with deprecated flags to hold code that is used by GoRPC plugins - Added deprecation warning when loading legacy plugin --- control/available_plugin.go | 5 + .../{collector.go => collector_deprecated.go} | 5 + control/plugin/collector_proxy.go | 58 ---- control/plugin/collector_proxy_deprecated.go | 83 +++++ control/plugin/metric.go | 140 -------- control/plugin/metric_deprecated.go | 172 ++++++++++ control/plugin/plugin.go | 280 ---------------- control/plugin/plugin_deprecated.go | 313 ++++++++++++++++++ .../{processor.go => processor_deprecated.go} | 5 + control/plugin/processor_proxy.go | 37 +-- control/plugin/processor_proxy_deprecated.go | 59 ++++ .../{publisher.go => publisher_deprecated.go} | 5 + control/plugin/publisher_proxy.go | 48 +-- control/plugin/publisher_proxy_deprecated.go | 55 +++ control/plugin/session.go | 288 +--------------- control/plugin/session_deprecated.go | 313 ++++++++++++++++++ 16 files changed, 1018 insertions(+), 848 deletions(-) rename control/plugin/{collector.go => collector_deprecated.go} (87%) create mode 100644 control/plugin/collector_proxy_deprecated.go create mode 100644 control/plugin/metric_deprecated.go create mode 100644 control/plugin/plugin_deprecated.go rename control/plugin/{processor.go => processor_deprecated.go} (85%) create mode 100644 control/plugin/processor_proxy_deprecated.go rename control/plugin/{publisher.go => publisher_deprecated.go} (85%) create mode 100644 control/plugin/publisher_proxy_deprecated.go create mode 100644 control/plugin/session_deprecated.go diff --git a/control/available_plugin.go b/control/available_plugin.go index 08935ce84..0e4b8b910 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -107,6 +107,11 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab } ap.client = c case plugin.NativeRPC: + log.WithFields(log.Fields{ + "_module": "control-aplugin", + "_block": "newAvailablePlugin", + "plugin_name": ap.name, + }).Warning("This plugin is using a deprecated RPC protocol. Find more information here: https://github.com/intelsdi-x/snap/issues/1289 ") c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure) if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) diff --git a/control/plugin/collector.go b/control/plugin/collector_deprecated.go similarity index 87% rename from control/plugin/collector.go rename to control/plugin/collector_deprecated.go index e4c264325..d92249c74 100644 --- a/control/plugin/collector.go +++ b/control/plugin/collector_deprecated.go @@ -1,3 +1,8 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + /* http://www.apache.org/licenses/LICENSE-2.0.txt diff --git a/control/plugin/collector_proxy.go b/control/plugin/collector_proxy.go index 4668fb114..e46836a88 100644 --- a/control/plugin/collector_proxy.go +++ b/control/plugin/collector_proxy.go @@ -19,13 +19,6 @@ limitations under the License. package plugin -import ( - "errors" - "fmt" - - "github.com/intelsdi-x/snap/core/cdata" -) - // Arguments passed to CollectMetrics() for a Collector implementation type CollectMetricsArgs struct { MetricTypes []MetricType @@ -45,54 +38,3 @@ type GetMetricTypesArgs struct { type GetMetricTypesReply struct { MetricTypes []MetricType } - -type collectorPluginProxy struct { - Plugin CollectorPlugin - Session Session -} - -func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error { - defer catchPluginPanic(c.Session.Logger()) - - c.Session.Logger().Debugln("GetMetricTypes called") - // Reset heartbeat - c.Session.ResetHeartbeat() - - dargs := &GetMetricTypesArgs{PluginConfig: ConfigType{ConfigDataNode: cdata.NewNode()}} - c.Session.Decode(args, dargs) - - mts, err := c.Plugin.GetMetricTypes(dargs.PluginConfig) - if err != nil { - return errors.New(fmt.Sprintf("GetMetricTypes call error : %s", err.Error())) - } - - r := GetMetricTypesReply{MetricTypes: mts} - *reply, err = c.Session.Encode(r) - if err != nil { - return err - } - - return nil -} - -func (c *collectorPluginProxy) CollectMetrics(args []byte, reply *[]byte) error { - defer catchPluginPanic(c.Session.Logger()) - c.Session.Logger().Debugln("CollectMetrics called") - // Reset heartbeat - c.Session.ResetHeartbeat() - - dargs := &CollectMetricsArgs{} - c.Session.Decode(args, dargs) - - ms, err := c.Plugin.CollectMetrics(dargs.MetricTypes) - if err != nil { - return errors.New(fmt.Sprintf("CollectMetrics call error : %s", err.Error())) - } - - r := CollectMetricsReply{PluginMetrics: ms} - *reply, err = c.Session.Encode(r) - if err != nil { - return err - } - return nil -} diff --git a/control/plugin/collector_proxy_deprecated.go b/control/plugin/collector_proxy_deprecated.go new file mode 100644 index 000000000..24532fbf1 --- /dev/null +++ b/control/plugin/collector_proxy_deprecated.go @@ -0,0 +1,83 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "errors" + "fmt" + + "github.com/intelsdi-x/snap/core/cdata" +) + +type collectorPluginProxy struct { + Plugin CollectorPlugin + Session Session +} + +func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error { + defer catchPluginPanic(c.Session.Logger()) + + c.Session.Logger().Debugln("GetMetricTypes called") + // Reset heartbeat + c.Session.ResetHeartbeat() + + dargs := &GetMetricTypesArgs{PluginConfig: ConfigType{ConfigDataNode: cdata.NewNode()}} + c.Session.Decode(args, dargs) + + mts, err := c.Plugin.GetMetricTypes(dargs.PluginConfig) + if err != nil { + return errors.New(fmt.Sprintf("GetMetricTypes call error : %s", err.Error())) + } + + r := GetMetricTypesReply{MetricTypes: mts} + *reply, err = c.Session.Encode(r) + if err != nil { + return err + } + + return nil +} + +func (c *collectorPluginProxy) CollectMetrics(args []byte, reply *[]byte) error { + defer catchPluginPanic(c.Session.Logger()) + c.Session.Logger().Debugln("CollectMetrics called") + // Reset heartbeat + c.Session.ResetHeartbeat() + + dargs := &CollectMetricsArgs{} + c.Session.Decode(args, dargs) + + ms, err := c.Plugin.CollectMetrics(dargs.MetricTypes) + if err != nil { + return errors.New(fmt.Sprintf("CollectMetrics call error : %s", err.Error())) + } + + r := CollectMetricsReply{PluginMetrics: ms} + *reply, err = c.Session.Encode(r) + if err != nil { + return err + } + return nil +} diff --git a/control/plugin/metric.go b/control/plugin/metric.go index 8c70c3afc..329e86dfc 100644 --- a/control/plugin/metric.go +++ b/control/plugin/metric.go @@ -23,12 +23,8 @@ import ( "bytes" "encoding/gob" "encoding/json" - "errors" - "fmt" "time" - log "github.com/Sirupsen/logrus" - "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" ) @@ -42,8 +38,6 @@ const ( SnapGOBContentType = "snap.gob" // SnapJSON snap metrics serialized into json SnapJSONContentType = "snap.json" - // SnapProtoBuff snap metrics serialized into protocol buffers - // SnapProtoBuff = "snap.pb" // TO BE IMPLEMENTED ) type ConfigType struct { @@ -81,12 +75,6 @@ func (p *ConfigType) GobDecode(data []byte) error { return nil } -func NewPluginConfigType() ConfigType { - return ConfigType{ - ConfigDataNode: cdata.NewNode(), - } -} - // Represents a metric type. Only used within plugins and across plugin calls. // Converted to core.MetricType before being used within modules. type MetricType struct { @@ -122,18 +110,6 @@ type MetricType struct { Timestamp_ time.Time `json:"timestamp"` } -// NewMetricType returns a Constructor -func NewMetricType(namespace core.Namespace, timestamp time.Time, tags map[string]string, unit string, data interface{}) *MetricType { - return &MetricType{ - Namespace_: namespace, - Tags_: tags, - Data_: data, - Timestamp_: timestamp, - LastAdvertisedTime_: timestamp, - Unit_: unit, - } -} - // Returns the namespace. func (p MetricType) Namespace() core.Namespace { return p.Namespace_ @@ -182,119 +158,3 @@ func (p MetricType) Unit() string { func (p *MetricType) AddData(data interface{}) { p.Data_ = data } - -// MarshalMetricTypes returns a []byte containing a serialized version of []MetricType using the content type provided. -func MarshalMetricTypes(contentType string, metrics []MetricType) ([]byte, string, error) { - // If we have an empty slice we return an error - if len(metrics) == 0 { - es := fmt.Sprintf("attempt to marshall empty slice of metrics: %s", contentType) - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "marshal-content-type", - "error": es, - }).Error("error while marshalling") - return nil, "", errors.New(es) - } - // Switch on content type - switch contentType { - case SnapAllContentType, SnapGOBContentType: - // NOTE: A snap All wildcard will result in GOB - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err := enc.Encode(metrics) - if err != nil { - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "marshal-content-type", - "error": err.Error(), - }).Error("error while marshalling") - return nil, "", err - } - // contentType := SnapGOBContentType - return buf.Bytes(), SnapGOBContentType, nil - case SnapJSONContentType: - // Serialize into JSON - b, err := json.Marshal(metrics) - if err != nil { - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "marshal-content-type", - "error": err.Error(), - }).Error("error while marshalling") - return nil, "", err - } - return b, SnapJSONContentType, nil - default: - // We don't recognize this content type. Log and return error. - es := fmt.Sprintf("invalid snap content type: %s", contentType) - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "marshal-content-type", - "error": es, - }).Error("error while marshalling") - return nil, "", errors.New(es) - } -} - -// UnmarshallMetricTypes takes a content type and []byte payload and returns a []MetricType -func UnmarshallMetricTypes(contentType string, payload []byte) ([]MetricType, error) { - switch contentType { - case SnapGOBContentType: - var metrics []MetricType - r := bytes.NewBuffer(payload) - err := gob.NewDecoder(r).Decode(&metrics) - if err != nil { - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "unmarshal-content-type", - "error": err.Error(), - }).Error("error while unmarshalling") - return nil, err - } - return metrics, nil - case SnapJSONContentType: - var metrics []MetricType - err := json.Unmarshal(payload, &metrics) - if err != nil { - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "unmarshal-content-type", - "error": err.Error(), - }).Error("error while unmarshalling") - return nil, err - } - return metrics, nil - default: - // We don't recognize this content type as one we can unmarshal. Log and return error. - es := fmt.Sprintf("invalid snap content type for unmarshalling: %s", contentType) - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "unmarshal-content-type", - "error": es, - }).Error("error while unmarshalling") - return nil, errors.New(es) - } -} - -// SwapMetricContentType swaps a payload with one content type to another one. -func SwapMetricContentType(contentType, requestedContentType string, payload []byte) ([]byte, string, error) { - metrics, err1 := UnmarshallMetricTypes(contentType, payload) - if err1 != nil { - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "swap-content-type", - "error": err1.Error(), - }).Error("error while swaping") - return nil, "", err1 - } - newPayload, newContentType, err2 := MarshalMetricTypes(requestedContentType, metrics) - if err2 != nil { - log.WithFields(log.Fields{ - "_module": "control-plugin", - "block": "swap-content-type", - "error": err2.Error(), - }).Error("error while swaping") - return nil, "", err2 - } - return newPayload, newContentType, nil -} diff --git a/control/plugin/metric_deprecated.go b/control/plugin/metric_deprecated.go new file mode 100644 index 000000000..00d3e7d42 --- /dev/null +++ b/control/plugin/metric_deprecated.go @@ -0,0 +1,172 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "bytes" + "encoding/gob" + "encoding/json" + "errors" + "fmt" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/cdata" +) + +func NewPluginConfigType() ConfigType { + return ConfigType{ + ConfigDataNode: cdata.NewNode(), + } +} + +// NewMetricType returns a Constructor +func NewMetricType(namespace core.Namespace, timestamp time.Time, tags map[string]string, unit string, data interface{}) *MetricType { + return &MetricType{ + Namespace_: namespace, + Tags_: tags, + Data_: data, + Timestamp_: timestamp, + LastAdvertisedTime_: timestamp, + Unit_: unit, + } +} + +// MarshalMetricTypes returns a []byte containing a serialized version of []MetricType using the content type provided. +func MarshalMetricTypes(contentType string, metrics []MetricType) ([]byte, string, error) { + // If we have an empty slice we return an error + if len(metrics) == 0 { + es := fmt.Sprintf("attempt to marshall empty slice of metrics: %s", contentType) + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "marshal-content-type", + "error": es, + }).Error("error while marshalling") + return nil, "", errors.New(es) + } + // Switch on content type + switch contentType { + case SnapAllContentType, SnapGOBContentType: + // NOTE: A snap All wildcard will result in GOB + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(metrics) + if err != nil { + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "marshal-content-type", + "error": err.Error(), + }).Error("error while marshalling") + return nil, "", err + } + // contentType := SnapGOBContentType + return buf.Bytes(), SnapGOBContentType, nil + case SnapJSONContentType: + // Serialize into JSON + b, err := json.Marshal(metrics) + if err != nil { + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "marshal-content-type", + "error": err.Error(), + }).Error("error while marshalling") + return nil, "", err + } + return b, SnapJSONContentType, nil + default: + // We don't recognize this content type. Log and return error. + es := fmt.Sprintf("invalid snap content type: %s", contentType) + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "marshal-content-type", + "error": es, + }).Error("error while marshalling") + return nil, "", errors.New(es) + } +} + +// UnmarshallMetricTypes takes a content type and []byte payload and returns a []MetricType +func UnmarshallMetricTypes(contentType string, payload []byte) ([]MetricType, error) { + switch contentType { + case SnapGOBContentType: + var metrics []MetricType + r := bytes.NewBuffer(payload) + err := gob.NewDecoder(r).Decode(&metrics) + if err != nil { + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "unmarshal-content-type", + "error": err.Error(), + }).Error("error while unmarshalling") + return nil, err + } + return metrics, nil + case SnapJSONContentType: + var metrics []MetricType + err := json.Unmarshal(payload, &metrics) + if err != nil { + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "unmarshal-content-type", + "error": err.Error(), + }).Error("error while unmarshalling") + return nil, err + } + return metrics, nil + default: + // We don't recognize this content type as one we can unmarshal. Log and return error. + es := fmt.Sprintf("invalid snap content type for unmarshalling: %s", contentType) + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "unmarshal-content-type", + "error": es, + }).Error("error while unmarshalling") + return nil, errors.New(es) + } +} + +// SwapMetricContentType swaps a payload with one content type to another one. +func SwapMetricContentType(contentType, requestedContentType string, payload []byte) ([]byte, string, error) { + metrics, err1 := UnmarshallMetricTypes(contentType, payload) + if err1 != nil { + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "swap-content-type", + "error": err1.Error(), + }).Error("error while swaping") + return nil, "", err1 + } + newPayload, newContentType, err2 := MarshalMetricTypes(requestedContentType, metrics) + if err2 != nil { + log.WithFields(log.Fields{ + "_module": "control-plugin", + "block": "swap-content-type", + "error": err2.Error(), + }).Error("error while swaping") + return nil, "", err2 + } + return newPayload, newContentType, nil +} diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index 7ce77afe7..fb64db57f 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -21,21 +21,10 @@ package plugin // WARNING! Do not import "fmt" and print from a plugin to stdout! import ( - "bytes" "crypto/rsa" - "encoding/json" - "fmt" - "io" // Don't use "fmt.Print*" - "net" - "net/http" - "net/rpc" - "net/rpc/jsonrpc" - "regexp" - "runtime" "time" log "github.com/Sirupsen/logrus" - "github.com/intelsdi-x/snap/control/plugin/cpolicy" ) @@ -92,8 +81,6 @@ var ( // Timeout settings // How much time must elapse before a lack of Ping results in a timeout PingTimeoutDurationDefault = time.Millisecond * 1500 - // How many successive PingTimeouts must occur to equal a failure. - PingTimeoutLimit = 3 // Array matching plugin type enum to a string // note: in string representation we use lower case @@ -143,87 +130,6 @@ type PluginMeta struct { RoutingStrategy RoutingStrategyType } -type metaOp func(m *PluginMeta) - -// ConcurrencyCount is an option that can be be provided to the func NewPluginMeta. -func ConcurrencyCount(cc int) metaOp { - return func(m *PluginMeta) { - m.ConcurrencyCount = cc - } -} - -// Exclusive is an option that can be be provided to the func NewPluginMeta. -func Exclusive(e bool) metaOp { - return func(m *PluginMeta) { - m.Exclusive = e - } -} - -// Unsecure is an option that can be be provided to the func NewPluginMeta. -func Unsecure(e bool) metaOp { - return func(m *PluginMeta) { - m.Unsecure = e - } -} - -// RoutingStrategy is an option that can be be provided to the func NewPluginMeta. -func RoutingStrategy(r RoutingStrategyType) metaOp { - return func(m *PluginMeta) { - m.RoutingStrategy = r - } -} - -// CacheTTL is an option that can be be provided to the func NewPluginMeta. -func CacheTTL(t time.Duration) metaOp { - return func(m *PluginMeta) { - m.CacheTTL = t - } -} - -// NewPluginMeta constructs and returns a PluginMeta struct -func NewPluginMeta(name string, version int, pluginType PluginType, acceptContentTypes, returnContentTypes []string, opts ...metaOp) *PluginMeta { - // An empty accepted content type default to "snap.*" - if len(acceptContentTypes) == 0 { - acceptContentTypes = append(acceptContentTypes, "snap.*") - } - // Validate content type formats - for _, s := range acceptContentTypes { - b, e := regexp.MatchString(`^[a-z0-9*]+\.[a-z0-9*]+$`, s) - if e != nil { - panic(e) - } - if !b { - panic(fmt.Sprintf("Bad accept content type [%s] for [%d] [%s]", name, version, s)) - } - } - for _, s := range returnContentTypes { - b, e := regexp.MatchString(`^[a-z0-9*]+\.[a-z0-9*]+$`, s) - if e != nil { - panic(e) - } - if !b { - panic(fmt.Sprintf("Bad return content type [%s] for [%d] [%s]", name, version, s)) - } - } - - p := &PluginMeta{ - Name: name, - Version: version, - Type: pluginType, - AcceptedContentTypes: acceptContentTypes, - ReturnedContentTypes: returnContentTypes, - - //set the default for concurrency count to 1 - ConcurrencyCount: 1, - } - - for _, opt := range opts { - opt(p) - } - - return p -} - // Arguments passed to startup of Plugin type Arg struct { // Plugin log level @@ -255,189 +161,3 @@ type Response struct { ErrorMessage string PublicKey *rsa.PublicKey } - -// Start starts a plugin where: -// PluginMeta - base information about plugin -// Plugin - CollectorPlugin, ProcessorPlugin or PublisherPlugin -// requestString - plugins arguments (marshaled json of control/plugin Arg struct) -// returns an error and exitCode (exitCode from SessionState initilization or plugin termination code) -func Start(m *PluginMeta, c Plugin, requestString string) (error, int) { - s, sErr, retCode := NewSessionState(requestString, c, m) - if sErr != nil { - return sErr, retCode - } - - var ( - r *Response - exitCode int = 0 - ) - - switch m.Type { - case CollectorPluginType: - // Create our proxy - proxy := &collectorPluginProxy{ - Plugin: c.(CollectorPlugin), - Session: s, - } - // Register the proxy under the "Collector" namespace - rpc.RegisterName("Collector", proxy) - - r = &Response{ - Type: CollectorPluginType, - State: PluginSuccess, - Meta: *m, - } - if !m.Unsecure { - r.PublicKey = &s.privateKey.PublicKey - } - case PublisherPluginType: - r = &Response{ - Type: PublisherPluginType, - State: PluginSuccess, - Meta: *m, - } - if !m.Unsecure { - r.PublicKey = &s.privateKey.PublicKey - } - // Create our proxy - proxy := &publisherPluginProxy{ - Plugin: c.(PublisherPlugin), - Session: s, - } - - // Register the proxy under the "Publisher" namespace - rpc.RegisterName("Publisher", proxy) - case ProcessorPluginType: - r = &Response{ - Type: ProcessorPluginType, - State: PluginSuccess, - Meta: *m, - } - if !m.Unsecure { - r.PublicKey = &s.privateKey.PublicKey - } - // Create our proxy - proxy := &processorPluginProxy{ - Plugin: c.(ProcessorPlugin), - Session: s, - } - // Register the proxy under the "Publisher" namespace - rpc.RegisterName("Processor", proxy) - } - - // Register common plugin methods used for utility reasons - e := rpc.Register(s) - if e != nil { - if e.Error() != "rpc: service already defined: SessionState" { - s.Logger().Error(e.Error()) - return e, 2 - } - } - - l, err := net.Listen("tcp", "127.0.0.1:"+s.ListenPort()) - if err != nil { - s.Logger().Error(err.Error()) - panic(err) - } - s.SetListenAddress(l.Addr().String()) - s.Logger().Debugf("Listening %s\n", l.Addr()) - s.Logger().Debugf("Session token %s\n", s.Token()) - - switch r.Meta.RPCType { - case JSONRPC: - rpc.HandleHTTP() - http.HandleFunc("/rpc", func(w http.ResponseWriter, req *http.Request) { - defer req.Body.Close() - w.Header().Set("Content-Type", "application/json") - if req.ContentLength == 0 { - encoder := json.NewEncoder(w) - encoder.Encode(&struct { - Id interface{} `json:"id"` - Result interface{} `json:"result"` - Error interface{} `json:"error"` - }{ - Id: nil, - Result: nil, - Error: "rpc: method request ill-formed", - }) - return - } - res := NewRPCRequest(req.Body).Call() - io.Copy(w, res) - }) - go http.Serve(l, nil) - case NativeRPC: - go func() { - for { - conn, err := l.Accept() - if err != nil { - panic(err) - } - go rpc.ServeConn(conn) - } - }() - default: - panic("Unsupported RPC type") - } - - resp := s.generateResponse(r) - // Output response to stdout - fmt.Println(string(resp)) - s.Logger().Println(string(resp)) - go s.heartbeatWatch(s.KillChan()) - - if s.isDaemon() { - exitCode = <-s.KillChan() // Closing of channel kills - } - - return nil, exitCode -} - -// rpcRequest represents a RPC request. -// rpcRequest implements the io.ReadWriteCloser interface. -type rpcRequest struct { - r io.Reader // holds the JSON formated RPC request - rw io.ReadWriter // holds the JSON formated RPC response - done chan bool // signals then end of the RPC request -} - -// NewRPCRequest returns a new rpcRequest. -func NewRPCRequest(r io.Reader) *rpcRequest { - var buf bytes.Buffer - done := make(chan bool) - return &rpcRequest{r, &buf, done} -} - -// Read implements the io.ReadWriteCloser Read method. -func (r *rpcRequest) Read(p []byte) (n int, err error) { - return r.r.Read(p) -} - -// Write implements the io.ReadWriteCloser Write method. -func (r *rpcRequest) Write(p []byte) (n int, err error) { - n, err = r.rw.Write(p) - defer func(done chan bool) { done <- true }(r.done) - return -} - -// Close implements the io.ReadWriteCloser Close method. -func (r *rpcRequest) Close() error { - return nil -} - -// Call invokes the RPC request, waits for it to complete, and returns the results. -func (r *rpcRequest) Call() io.Reader { - go jsonrpc.ServeConn(r) - <-r.done - return r.rw -} - -func catchPluginPanic(l *log.Logger) { - if err := recover(); err != nil { - trace := make([]byte, 4096) - count := runtime.Stack(trace, true) - l.Printf("Recover from panic: %s\n", err) - l.Printf("Stack of %d bytes: %s\n", count, trace) - panic(err) - } -} diff --git a/control/plugin/plugin_deprecated.go b/control/plugin/plugin_deprecated.go new file mode 100644 index 000000000..468d020e3 --- /dev/null +++ b/control/plugin/plugin_deprecated.go @@ -0,0 +1,313 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/rpc" + "net/rpc/jsonrpc" + "regexp" + "runtime" + "time" + + log "github.com/Sirupsen/logrus" +) + +var ( + // How many successive PingTimeouts must occur to equal a failure. + PingTimeoutLimit = 3 +) + +type metaOp func(m *PluginMeta) + +// ConcurrencyCount is an option that can be be provided to the func NewPluginMeta. +func ConcurrencyCount(cc int) metaOp { + return func(m *PluginMeta) { + m.ConcurrencyCount = cc + } +} + +// Exclusive is an option that can be be provided to the func NewPluginMeta. +func Exclusive(e bool) metaOp { + return func(m *PluginMeta) { + m.Exclusive = e + } +} + +// Unsecure is an option that can be be provided to the func NewPluginMeta. +func Unsecure(e bool) metaOp { + return func(m *PluginMeta) { + m.Unsecure = e + } +} + +// RoutingStrategy is an option that can be be provided to the func NewPluginMeta. +func RoutingStrategy(r RoutingStrategyType) metaOp { + return func(m *PluginMeta) { + m.RoutingStrategy = r + } +} + +// CacheTTL is an option that can be be provided to the func NewPluginMeta. +func CacheTTL(t time.Duration) metaOp { + return func(m *PluginMeta) { + m.CacheTTL = t + } +} + +// NewPluginMeta constructs and returns a PluginMeta struct +func NewPluginMeta(name string, version int, pluginType PluginType, acceptContentTypes, returnContentTypes []string, opts ...metaOp) *PluginMeta { + // An empty accepted content type default to "snap.*" + if len(acceptContentTypes) == 0 { + acceptContentTypes = append(acceptContentTypes, "snap.*") + } + // Validate content type formats + for _, s := range acceptContentTypes { + b, e := regexp.MatchString(`^[a-z0-9*]+\.[a-z0-9*]+$`, s) + if e != nil { + panic(e) + } + if !b { + panic(fmt.Sprintf("Bad accept content type [%s] for [%d] [%s]", name, version, s)) + } + } + for _, s := range returnContentTypes { + b, e := regexp.MatchString(`^[a-z0-9*]+\.[a-z0-9*]+$`, s) + if e != nil { + panic(e) + } + if !b { + panic(fmt.Sprintf("Bad return content type [%s] for [%d] [%s]", name, version, s)) + } + } + + p := &PluginMeta{ + Name: name, + Version: version, + Type: pluginType, + AcceptedContentTypes: acceptContentTypes, + ReturnedContentTypes: returnContentTypes, + + //set the default for concurrency count to 1 + ConcurrencyCount: 1, + } + + for _, opt := range opts { + opt(p) + } + + return p +} + +// Start starts a plugin where: +// PluginMeta - base information about plugin +// Plugin - CollectorPlugin, ProcessorPlugin or PublisherPlugin +// requestString - plugins arguments (marshaled json of control/plugin Arg struct) +// returns an error and exitCode (exitCode from SessionState initilization or plugin termination code) +func Start(m *PluginMeta, c Plugin, requestString string) (error, int) { + s, sErr, retCode := NewSessionState(requestString, c, m) + if sErr != nil { + return sErr, retCode + } + + var ( + r *Response + exitCode int = 0 + ) + + switch m.Type { + case CollectorPluginType: + // Create our proxy + proxy := &collectorPluginProxy{ + Plugin: c.(CollectorPlugin), + Session: s, + } + // Register the proxy under the "Collector" namespace + rpc.RegisterName("Collector", proxy) + + r = &Response{ + Type: CollectorPluginType, + State: PluginSuccess, + Meta: *m, + } + if !m.Unsecure { + r.PublicKey = &s.privateKey.PublicKey + } + case PublisherPluginType: + r = &Response{ + Type: PublisherPluginType, + State: PluginSuccess, + Meta: *m, + } + if !m.Unsecure { + r.PublicKey = &s.privateKey.PublicKey + } + // Create our proxy + proxy := &publisherPluginProxy{ + Plugin: c.(PublisherPlugin), + Session: s, + } + + // Register the proxy under the "Publisher" namespace + rpc.RegisterName("Publisher", proxy) + case ProcessorPluginType: + r = &Response{ + Type: ProcessorPluginType, + State: PluginSuccess, + Meta: *m, + } + if !m.Unsecure { + r.PublicKey = &s.privateKey.PublicKey + } + // Create our proxy + proxy := &processorPluginProxy{ + Plugin: c.(ProcessorPlugin), + Session: s, + } + // Register the proxy under the "Publisher" namespace + rpc.RegisterName("Processor", proxy) + } + + // Register common plugin methods used for utility reasons + e := rpc.Register(s) + if e != nil { + if e.Error() != "rpc: service already defined: SessionState" { + s.Logger().Error(e.Error()) + return e, 2 + } + } + + l, err := net.Listen("tcp", "127.0.0.1:"+s.ListenPort()) + if err != nil { + s.Logger().Error(err.Error()) + panic(err) + } + s.SetListenAddress(l.Addr().String()) + s.Logger().Debugf("Listening %s\n", l.Addr()) + s.Logger().Debugf("Session token %s\n", s.Token()) + + switch r.Meta.RPCType { + case JSONRPC: + rpc.HandleHTTP() + http.HandleFunc("/rpc", func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + w.Header().Set("Content-Type", "application/json") + if req.ContentLength == 0 { + encoder := json.NewEncoder(w) + encoder.Encode(&struct { + Id interface{} `json:"id"` + Result interface{} `json:"result"` + Error interface{} `json:"error"` + }{ + Id: nil, + Result: nil, + Error: "rpc: method request ill-formed", + }) + return + } + res := NewRPCRequest(req.Body).Call() + io.Copy(w, res) + }) + go http.Serve(l, nil) + case NativeRPC: + go func() { + for { + conn, err := l.Accept() + if err != nil { + panic(err) + } + go rpc.ServeConn(conn) + } + }() + default: + panic("Unsupported RPC type") + } + + resp := s.generateResponse(r) + // Output response to stdout + fmt.Println(string(resp)) + s.Logger().Println(string(resp)) + go s.heartbeatWatch(s.KillChan()) + + if s.isDaemon() { + exitCode = <-s.KillChan() // Closing of channel kills + } + + return nil, exitCode +} + +// rpcRequest represents a RPC request. +// rpcRequest implements the io.ReadWriteCloser interface. +type rpcRequest struct { + r io.Reader // holds the JSON formated RPC request + rw io.ReadWriter // holds the JSON formated RPC response + done chan bool // signals then end of the RPC request +} + +// NewRPCRequest returns a new rpcRequest. +func NewRPCRequest(r io.Reader) *rpcRequest { + var buf bytes.Buffer + done := make(chan bool) + return &rpcRequest{r, &buf, done} +} + +// Read implements the io.ReadWriteCloser Read method. +func (r *rpcRequest) Read(p []byte) (n int, err error) { + return r.r.Read(p) +} + +// Write implements the io.ReadWriteCloser Write method. +func (r *rpcRequest) Write(p []byte) (n int, err error) { + n, err = r.rw.Write(p) + defer func(done chan bool) { done <- true }(r.done) + return +} + +// Close implements the io.ReadWriteCloser Close method. +func (r *rpcRequest) Close() error { + return nil +} + +// Call invokes the RPC request, waits for it to complete, and returns the results. +func (r *rpcRequest) Call() io.Reader { + go jsonrpc.ServeConn(r) + <-r.done + return r.rw +} + +func catchPluginPanic(l *log.Logger) { + if err := recover(); err != nil { + trace := make([]byte, 4096) + count := runtime.Stack(trace, true) + l.Printf("Recover from panic: %s\n", err) + l.Printf("Stack of %d bytes: %s\n", count, trace) + panic(err) + } +} diff --git a/control/plugin/processor.go b/control/plugin/processor_deprecated.go similarity index 85% rename from control/plugin/processor.go rename to control/plugin/processor_deprecated.go index 6c2cd81d5..cdcd4eba3 100644 --- a/control/plugin/processor.go +++ b/control/plugin/processor_deprecated.go @@ -1,3 +1,8 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + /* http://www.apache.org/licenses/LICENSE-2.0.txt diff --git a/control/plugin/processor_proxy.go b/control/plugin/processor_proxy.go index a6dc57139..2cd44eead 100644 --- a/control/plugin/processor_proxy.go +++ b/control/plugin/processor_proxy.go @@ -19,15 +19,9 @@ limitations under the License. package plugin -import ( - "errors" - "fmt" - - "github.com/intelsdi-x/snap/core/ctypes" -) +import "github.com/intelsdi-x/snap/core/ctypes" type ProcessorArgs struct { - //PluginMetrics []PluginMetric ContentType string Content []byte Config map[string]ctypes.ConfigValue @@ -37,32 +31,3 @@ type ProcessorReply struct { ContentType string Content []byte } - -type processorPluginProxy struct { - Plugin ProcessorPlugin - Session Session -} - -func (p *processorPluginProxy) Process(args []byte, reply *[]byte) error { - defer catchPluginPanic(p.Session.Logger()) - p.Session.ResetHeartbeat() - - dargs := &ProcessorArgs{} - err := p.Session.Decode(args, dargs) - if err != nil { - return err - } - - r := ProcessorReply{} - r.ContentType, r.Content, err = p.Plugin.Process(dargs.ContentType, dargs.Content, dargs.Config) - if err != nil { - return errors.New(fmt.Sprintf("Processor call error: %v", err.Error())) - } - - *reply, err = p.Session.Encode(r) - if err != nil { - return err - } - - return nil -} diff --git a/control/plugin/processor_proxy_deprecated.go b/control/plugin/processor_proxy_deprecated.go new file mode 100644 index 000000000..75298bf6b --- /dev/null +++ b/control/plugin/processor_proxy_deprecated.go @@ -0,0 +1,59 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "errors" + "fmt" +) + +type processorPluginProxy struct { + Plugin ProcessorPlugin + Session Session +} + +func (p *processorPluginProxy) Process(args []byte, reply *[]byte) error { + defer catchPluginPanic(p.Session.Logger()) + p.Session.ResetHeartbeat() + + dargs := &ProcessorArgs{} + err := p.Session.Decode(args, dargs) + if err != nil { + return err + } + + r := ProcessorReply{} + r.ContentType, r.Content, err = p.Plugin.Process(dargs.ContentType, dargs.Content, dargs.Config) + if err != nil { + return errors.New(fmt.Sprintf("Processor call error: %v", err.Error())) + } + + *reply, err = p.Session.Encode(r) + if err != nil { + return err + } + + return nil +} diff --git a/control/plugin/publisher.go b/control/plugin/publisher_deprecated.go similarity index 85% rename from control/plugin/publisher.go rename to control/plugin/publisher_deprecated.go index 826ef19e3..a5009deb2 100644 --- a/control/plugin/publisher.go +++ b/control/plugin/publisher_deprecated.go @@ -1,3 +1,8 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + /* http://www.apache.org/licenses/LICENSE-2.0.txt diff --git a/control/plugin/publisher_proxy.go b/control/plugin/publisher_proxy.go index 4339ce86c..4662aa4f7 100644 --- a/control/plugin/publisher_proxy.go +++ b/control/plugin/publisher_proxy.go @@ -19,56 +19,10 @@ limitations under the License. package plugin -import ( - "errors" - "fmt" - - "github.com/intelsdi-x/snap/core/ctypes" -) +import "github.com/intelsdi-x/snap/core/ctypes" type PublishArgs struct { ContentType string Content []byte Config map[string]ctypes.ConfigValue } - -type PublishReply struct { -} - -type publisherPluginProxy struct { - Plugin PublisherPlugin - Session Session -} - -func (p *publisherPluginProxy) Publish(args []byte, reply *[]byte) error { - defer catchPluginPanic(p.Session.Logger()) - p.Session.ResetHeartbeat() - - dargs := &PublishArgs{} - err := p.Session.Decode(args, dargs) - if err != nil { - return err - } - - err = p.Plugin.Publish(dargs.ContentType, dargs.Content, dargs.Config) - if err != nil { - return errors.New(fmt.Sprintf("Publish call error: %v", err.Error())) - } - return nil -} - -/* -type gRPCPublisherProxy struct { - Plugin PublisherPlugin - Session Session - gRPCPluginProxy -} - -func (p *gRPCPublisherProxy) Publish(ctx context.Context, arg *rpc.PublishArg) (*common.Empty, error) { - defer catchPluginPanic(p.Session.Logger()) - err := p.Plugin.Publish(arg.ContentType, arg.Content, common.ParseConfig(arg.Config)) - if err != nil { - return &common.Empty{}, err - } - return &common.Empty{}, nil -}*/ diff --git a/control/plugin/publisher_proxy_deprecated.go b/control/plugin/publisher_proxy_deprecated.go new file mode 100644 index 000000000..d861bf2fa --- /dev/null +++ b/control/plugin/publisher_proxy_deprecated.go @@ -0,0 +1,55 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "errors" + "fmt" +) + +type PublishReply struct { +} + +type publisherPluginProxy struct { + Plugin PublisherPlugin + Session Session +} + +func (p *publisherPluginProxy) Publish(args []byte, reply *[]byte) error { + defer catchPluginPanic(p.Session.Logger()) + p.Session.ResetHeartbeat() + + dargs := &PublishArgs{} + err := p.Session.Decode(args, dargs) + if err != nil { + return err + } + + err = p.Plugin.Publish(dargs.ContentType, dargs.Content, dargs.Config) + if err != nil { + return errors.New(fmt.Sprintf("Publish call error: %v", err.Error())) + } + return nil +} diff --git a/control/plugin/session.go b/control/plugin/session.go index 9d593af1d..391416d88 100644 --- a/control/plugin/session.go +++ b/control/plugin/session.go @@ -19,302 +19,16 @@ limitations under the License. package plugin -import ( - "bytes" - "crypto/rand" - "crypto/rsa" - "encoding/base64" - "encoding/gob" - "encoding/json" - "errors" - "fmt" - "os" - "time" - - log "github.com/Sirupsen/logrus" - - "github.com/intelsdi-x/snap/control/plugin/cpolicy" - "github.com/intelsdi-x/snap/control/plugin/encoding" - "github.com/intelsdi-x/snap/control/plugin/encrypter" - "github.com/intelsdi-x/snap/core/cdata" - "github.com/intelsdi-x/snap/core/ctypes" -) - -// Session interface -type Session interface { - Ping([]byte, *[]byte) error - Kill([]byte, *[]byte) error - GetConfigPolicy([]byte, *[]byte) error - Logger() *log.Logger - ListenAddress() string - SetListenAddress(string) - ListenPort() string - Token() string - KillChan() chan int - ResetHeartbeat() - - generateResponse(r *Response) []byte - heartbeatWatch(killChan chan int) - isDaemon() bool - - SetKey(SetKeyArgs, *[]byte) error - setKey([]byte) - - Encode(interface{}) ([]byte, error) - Decode([]byte, interface{}) error - - DecryptKey([]byte) ([]byte, error) -} - -// Arguments passed to ping -type PingArgs struct{} +import "github.com/intelsdi-x/snap/control/plugin/cpolicy" type KillArgs struct { Reason string } -// Started plugin session state -type SessionState struct { - *Arg - *encrypter.Encrypter - encoding.Encoder - - LastPing time.Time - - plugin Plugin - token string - listenAddress string - killChan chan int - logger *log.Logger - privateKey *rsa.PrivateKey - encoder encoding.Encoder -} - -type GetConfigPolicyArgs struct{} - type GetConfigPolicyReply struct { Policy *cpolicy.ConfigPolicy } -// GetConfigPolicy returns the plugin's policy -func (s *SessionState) GetConfigPolicy(args []byte, reply *[]byte) error { - defer catchPluginPanic(s.Logger()) - - s.logger.Debug("GetConfigPolicy called") - - policy, err := s.plugin.GetConfigPolicy() - if err != nil { - return errors.New(fmt.Sprintf("GetConfigPolicy call error : %s", err.Error())) - } - - r := GetConfigPolicyReply{Policy: policy} - *reply, err = s.Encode(r) - if err != nil { - return err - } - - return nil -} - -// Ping returns nothing in normal operation -func (s *SessionState) Ping(arg []byte, reply *[]byte) error { - // For now we return nil. We can return an error if we are shutting - // down or otherwise in a state we should signal poor health. - // Reply should contain any context. - s.ResetHeartbeat() - s.logger.Debug("Ping received") - *reply = []byte{} - return nil -} - -// Kill will stop a running plugin -func (s *SessionState) Kill(args []byte, reply *[]byte) error { - a := &KillArgs{} - err := s.Decode(args, a) - if err != nil { - return err - } - s.logger.Debugf("Kill called by agent, reason: %s\n", a.Reason) - go func() { - time.Sleep(time.Second * 2) - s.killChan <- 0 - }() - *reply = []byte{} - return nil -} - -// Logger gets the SessionState logger -func (s *SessionState) Logger() *log.Logger { - return s.logger -} - -// ListenAddress gets the SessionState listen address -func (s *SessionState) ListenAddress() string { - return s.listenAddress -} - -//ListenPort gets the SessionState listen port -func (s *SessionState) ListenPort() string { - return s.listenPort -} - -// SetListenAddress sets SessionState listen address -func (s *SessionState) SetListenAddress(a string) { - s.listenAddress = a -} - -func (s *SessionState) ResetHeartbeat() { - s.LastPing = time.Now() -} - -// Token gets the SessionState token -func (s *SessionState) Token() string { - return s.token -} - -// KillChan gets the SessionState killchan -func (s *SessionState) KillChan() chan int { - return s.killChan -} - -func (s *SessionState) isDaemon() bool { - return !s.NoDaemon -} - type SetKeyArgs struct { Key []byte } - -func (s *SessionState) SetKey(args SetKeyArgs, reply *[]byte) error { - s.logger.Debug("SetKey called") - out, err := s.DecryptKey(args.Key) - if err != nil { - return err - } - s.Key = out - return nil -} - -func (s *SessionState) setKey(key []byte) { - s.Key = key -} - -func (s *SessionState) generateResponse(r *Response) []byte { - // Add common plugin response properties - r.ListenAddress = s.listenAddress - r.Token = s.token - rs, _ := json.Marshal(r) - return rs -} - -func (s *SessionState) heartbeatWatch(killChan chan int) { - s.logger.Debug("Heartbeat started") - count := 0 - for { - if time.Since(s.LastPing) >= s.PingTimeoutDuration { - count++ - s.logger.Infof("Heartbeat timeout %v of %v. (Duration between checks %v)", count, PingTimeoutLimit, s.PingTimeoutDuration) - if count >= PingTimeoutLimit { - s.logger.Error("Heartbeat timeout expired") - defer close(killChan) - return - } - } else { - // Reset count - count = 0 - } - time.Sleep(s.PingTimeoutDuration) - } -} - -// NewSessionState takes the plugin args and returns a SessionState -// returns State or error and returnCode: -// 0 - ok -// 2 - error when unmarshaling pluginArgs -// 3 - cannot open error files -func NewSessionState(pluginArgsMsg string, plugin Plugin, meta *PluginMeta) (*SessionState, error, int) { - pluginArg := &Arg{} - err := json.Unmarshal([]byte(pluginArgsMsg), pluginArg) - if err != nil { - return nil, err, 2 - } - - // If no port was provided we let the OS select a port for us. - // This is safe as address is returned in the Response and keep - // alive prevents unattended plugins. - if pluginArg.listenPort == "" { - pluginArg.listenPort = "0" - } - - // If no PingTimeoutDuration was provided we need to set it - if pluginArg.PingTimeoutDuration == 0 { - pluginArg.PingTimeoutDuration = PingTimeoutDurationDefault - } - - // Generate random token for this session - rb := make([]byte, 32) - rand.Read(rb) - rs := base64.URLEncoding.EncodeToString(rb) - - logger := &log.Logger{ - Out: os.Stderr, - Formatter: &simpleFormatter{}, - Hooks: make(log.LevelHooks), - Level: pluginArg.LogLevel, - } - - var enc encoding.Encoder - switch meta.RPCType { - case JSONRPC: - enc = encoding.NewJsonEncoder() - case NativeRPC: - enc = encoding.NewGobEncoder() - case GRPC: - enc = encoding.NewGobEncoder() - //TODO(CDR): re-think once content-types is settled - } - ss := &SessionState{ - Arg: pluginArg, - Encoder: enc, - - plugin: plugin, - token: rs, - killChan: make(chan int), - logger: logger, - } - - if !meta.Unsecure { - key, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - return nil, err, 2 - } - encrypt := encrypter.New(nil, key) - enc.SetEncrypter(encrypt) - ss.Encrypter = encrypt - ss.privateKey = key - } - return ss, nil, 0 -} - -func init() { - gob.RegisterName("conf_value_string", *(&ctypes.ConfigValueStr{})) - gob.RegisterName("conf_value_int", *(&ctypes.ConfigValueInt{})) - gob.RegisterName("conf_value_float", *(&ctypes.ConfigValueFloat{})) - gob.RegisterName("conf_value_bool", *(&ctypes.ConfigValueBool{})) - - gob.RegisterName("conf_policy_node", cpolicy.NewPolicyNode()) - gob.RegisterName("conf_data_node", &cdata.ConfigDataNode{}) - gob.RegisterName("conf_policy_string", &cpolicy.StringRule{}) - gob.RegisterName("conf_policy_int", &cpolicy.IntRule{}) - gob.RegisterName("conf_policy_float", &cpolicy.FloatRule{}) - gob.RegisterName("conf_policy_bool", &cpolicy.BoolRule{}) -} - -// simpleFormatter is a logrus formatter that includes only the message. -type simpleFormatter struct{} - -func (*simpleFormatter) Format(entry *log.Entry) ([]byte, error) { - b := &bytes.Buffer{} - fmt.Fprintf(b, "%s\n", entry.Message) - return b.Bytes(), nil -} diff --git a/control/plugin/session_deprecated.go b/control/plugin/session_deprecated.go new file mode 100644 index 000000000..b9853909e --- /dev/null +++ b/control/plugin/session_deprecated.go @@ -0,0 +1,313 @@ +/* ** DEPRECATED ** +For more information, see our deprecation notice +on Github: https://github.com/intelsdi-x/snap/issues/1289 +*/ + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "bytes" + "crypto/rand" + "crypto/rsa" + "encoding/base64" + "encoding/gob" + "encoding/json" + "errors" + "fmt" + "os" + "time" + + log "github.com/Sirupsen/logrus" + + "github.com/intelsdi-x/snap/control/plugin/cpolicy" + "github.com/intelsdi-x/snap/control/plugin/encoding" + "github.com/intelsdi-x/snap/control/plugin/encrypter" + "github.com/intelsdi-x/snap/core/cdata" + "github.com/intelsdi-x/snap/core/ctypes" +) + +// Started plugin session state +type SessionState struct { + *Arg + *encrypter.Encrypter + encoding.Encoder + + LastPing time.Time + + plugin Plugin + token string + listenAddress string + killChan chan int + logger *log.Logger + privateKey *rsa.PrivateKey + encoder encoding.Encoder +} + +type GetConfigPolicyArgs struct{} + +// Session interface +type Session interface { + Ping([]byte, *[]byte) error + Kill([]byte, *[]byte) error + GetConfigPolicy([]byte, *[]byte) error + Logger() *log.Logger + ListenAddress() string + SetListenAddress(string) + ListenPort() string + Token() string + KillChan() chan int + ResetHeartbeat() + + generateResponse(r *Response) []byte + heartbeatWatch(killChan chan int) + isDaemon() bool + + SetKey(SetKeyArgs, *[]byte) error + setKey([]byte) + + Encode(interface{}) ([]byte, error) + Decode([]byte, interface{}) error + + DecryptKey([]byte) ([]byte, error) +} + +// Arguments passed to ping +type PingArgs struct{} + +// GetConfigPolicy returns the plugin's policy +func (s *SessionState) GetConfigPolicy(args []byte, reply *[]byte) error { + defer catchPluginPanic(s.Logger()) + + s.logger.Debug("GetConfigPolicy called") + + policy, err := s.plugin.GetConfigPolicy() + if err != nil { + return errors.New(fmt.Sprintf("GetConfigPolicy call error : %s", err.Error())) + } + + r := GetConfigPolicyReply{Policy: policy} + *reply, err = s.Encode(r) + if err != nil { + return err + } + + return nil +} + +// Ping returns nothing in normal operation +func (s *SessionState) Ping(arg []byte, reply *[]byte) error { + // For now we return nil. We can return an error if we are shutting + // down or otherwise in a state we should signal poor health. + // Reply should contain any context. + s.ResetHeartbeat() + s.logger.Debug("Ping received") + *reply = []byte{} + return nil +} + +// Kill will stop a running plugin +func (s *SessionState) Kill(args []byte, reply *[]byte) error { + a := &KillArgs{} + err := s.Decode(args, a) + if err != nil { + return err + } + s.logger.Debugf("Kill called by agent, reason: %s\n", a.Reason) + go func() { + time.Sleep(time.Second * 2) + s.killChan <- 0 + }() + *reply = []byte{} + return nil +} + +// Logger gets the SessionState logger +func (s *SessionState) Logger() *log.Logger { + return s.logger +} + +// ListenAddress gets the SessionState listen address +func (s *SessionState) ListenAddress() string { + return s.listenAddress +} + +//ListenPort gets the SessionState listen port +func (s *SessionState) ListenPort() string { + return s.listenPort +} + +// SetListenAddress sets SessionState listen address +func (s *SessionState) SetListenAddress(a string) { + s.listenAddress = a +} + +func (s *SessionState) ResetHeartbeat() { + s.LastPing = time.Now() +} + +// Token gets the SessionState token +func (s *SessionState) Token() string { + return s.token +} + +// KillChan gets the SessionState killchan +func (s *SessionState) KillChan() chan int { + return s.killChan +} + +func (s *SessionState) isDaemon() bool { + return !s.NoDaemon +} + +func (s *SessionState) SetKey(args SetKeyArgs, reply *[]byte) error { + s.logger.Debug("SetKey called") + out, err := s.DecryptKey(args.Key) + if err != nil { + return err + } + s.Key = out + return nil +} + +func (s *SessionState) setKey(key []byte) { + s.Key = key +} + +func (s *SessionState) generateResponse(r *Response) []byte { + // Add common plugin response properties + r.ListenAddress = s.listenAddress + r.Token = s.token + rs, _ := json.Marshal(r) + return rs +} + +func (s *SessionState) heartbeatWatch(killChan chan int) { + s.logger.Debug("Heartbeat started") + count := 0 + for { + if time.Since(s.LastPing) >= s.PingTimeoutDuration { + count++ + s.logger.Infof("Heartbeat timeout %v of %v. (Duration between checks %v)", count, PingTimeoutLimit, s.PingTimeoutDuration) + if count >= PingTimeoutLimit { + s.logger.Error("Heartbeat timeout expired") + defer close(killChan) + return + } + } else { + // Reset count + count = 0 + } + time.Sleep(s.PingTimeoutDuration) + } +} + +// NewSessionState takes the plugin args and returns a SessionState +// returns State or error and returnCode: +// 0 - ok +// 2 - error when unmarshaling pluginArgs +// 3 - cannot open error files +func NewSessionState(pluginArgsMsg string, plugin Plugin, meta *PluginMeta) (*SessionState, error, int) { + pluginArg := &Arg{} + err := json.Unmarshal([]byte(pluginArgsMsg), pluginArg) + if err != nil { + return nil, err, 2 + } + + // If no port was provided we let the OS select a port for us. + // This is safe as address is returned in the Response and keep + // alive prevents unattended plugins. + if pluginArg.listenPort == "" { + pluginArg.listenPort = "0" + } + + // If no PingTimeoutDuration was provided we need to set it + if pluginArg.PingTimeoutDuration == 0 { + pluginArg.PingTimeoutDuration = PingTimeoutDurationDefault + } + + // Generate random token for this session + rb := make([]byte, 32) + rand.Read(rb) + rs := base64.URLEncoding.EncodeToString(rb) + + logger := &log.Logger{ + Out: os.Stderr, + Formatter: &simpleFormatter{}, + Hooks: make(log.LevelHooks), + Level: pluginArg.LogLevel, + } + + var enc encoding.Encoder + switch meta.RPCType { + case JSONRPC: + enc = encoding.NewJsonEncoder() + case NativeRPC: + enc = encoding.NewGobEncoder() + case GRPC: + enc = encoding.NewGobEncoder() + //TODO(CDR): re-think once content-types is settled + } + ss := &SessionState{ + Arg: pluginArg, + Encoder: enc, + + plugin: plugin, + token: rs, + killChan: make(chan int), + logger: logger, + } + + if !meta.Unsecure { + key, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, err, 2 + } + encrypt := encrypter.New(nil, key) + enc.SetEncrypter(encrypt) + ss.Encrypter = encrypt + ss.privateKey = key + } + return ss, nil, 0 +} + +func init() { + gob.RegisterName("conf_value_string", *(&ctypes.ConfigValueStr{})) + gob.RegisterName("conf_value_int", *(&ctypes.ConfigValueInt{})) + gob.RegisterName("conf_value_float", *(&ctypes.ConfigValueFloat{})) + gob.RegisterName("conf_value_bool", *(&ctypes.ConfigValueBool{})) + + gob.RegisterName("conf_policy_node", cpolicy.NewPolicyNode()) + gob.RegisterName("conf_data_node", &cdata.ConfigDataNode{}) + gob.RegisterName("conf_policy_string", &cpolicy.StringRule{}) + gob.RegisterName("conf_policy_int", &cpolicy.IntRule{}) + gob.RegisterName("conf_policy_float", &cpolicy.FloatRule{}) + gob.RegisterName("conf_policy_bool", &cpolicy.BoolRule{}) +} + +// simpleFormatter is a logrus formatter that includes only the message. +type simpleFormatter struct{} + +func (*simpleFormatter) Format(entry *log.Entry) ([]byte, error) { + b := &bytes.Buffer{} + fmt.Fprintf(b, "%s\n", entry.Message) + return b.Bytes(), nil +}