diff --git a/.gitignore b/.gitignore index 21daf20bb..85642792b 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,8 @@ profile.cov gin-bin tags .vscode/ +vendor/ +glide.lock # we don't vendor godep _workspace **/Godeps/_workspace/** diff --git a/control/available_plugin_test.go b/control/available_plugin_test.go index ee216fdb8..35e790e8e 100644 --- a/control/available_plugin_test.go +++ b/control/available_plugin_test.go @@ -54,9 +54,7 @@ func TestAvailablePlugin(t *testing.T) { Convey("returns nil if plugin successfully stopped", func() { r := newRunner() r.SetEmitter(new(MockEmitter)) - a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin-stop.log", - } + a := plugin.Arg{} exPlugin, _ := plugin.NewExecutablePlugin(a, fixtures.PluginPath) ap, err := r.startPlugin(exPlugin) diff --git a/control/control.go b/control/control.go index 4a53b0f78..4fdd22c63 100644 --- a/control/control.go +++ b/control/control.go @@ -125,7 +125,7 @@ type managesPlugins interface { LoadPlugin(*pluginDetails, gomit.Emitter) (*loadedPlugin, serror.SnapError) UnloadPlugin(core.Plugin) (*loadedPlugin, serror.SnapError) SetMetricCatalog(catalogsMetrics) - GenerateArgs(pluginPath string) plugin.Arg + GenerateArgs(logLevel int) plugin.Arg SetPluginConfig(*pluginConfig) } diff --git a/control/control_test.go b/control/control_test.go index d39a72f40..6a781ad1c 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -75,7 +75,7 @@ func (m *MockPluginManagerBadSwap) teardown() {} func (m *MockPluginManagerBadSwap) SetPluginConfig(*pluginConfig) {} func (m *MockPluginManagerBadSwap) SetMetricCatalog(catalogsMetrics) {} func (m *MockPluginManagerBadSwap) SetEmitter(gomit.Emitter) {} -func (m *MockPluginManagerBadSwap) GenerateArgs(string) plugin.Arg { return plugin.Arg{} } +func (m *MockPluginManagerBadSwap) GenerateArgs(int) plugin.Arg { return plugin.Arg{} } func (m *MockPluginManagerBadSwap) all() map[string]*loadedPlugin { return m.loadedPlugins.table diff --git a/control/plugin/collector_proxy.go b/control/plugin/collector_proxy.go index d2dae131e..4668fb114 100644 --- a/control/plugin/collector_proxy.go +++ b/control/plugin/collector_proxy.go @@ -54,7 +54,7 @@ type collectorPluginProxy struct { func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error { defer catchPluginPanic(c.Session.Logger()) - c.Session.Logger().Println("GetMetricTypes called") + c.Session.Logger().Debugln("GetMetricTypes called") // Reset heartbeat c.Session.ResetHeartbeat() @@ -77,7 +77,7 @@ func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error func (c *collectorPluginProxy) CollectMetrics(args []byte, reply *[]byte) error { defer catchPluginPanic(c.Session.Logger()) - c.Session.Logger().Println("CollectMetrics called") + c.Session.Logger().Debugln("CollectMetrics called") // Reset heartbeat c.Session.ResetHeartbeat() diff --git a/control/plugin/collector_proxy_test.go b/control/plugin/collector_proxy_test.go index 36477722b..b4830fd8f 100644 --- a/control/plugin/collector_proxy_test.go +++ b/control/plugin/collector_proxy_test.go @@ -23,8 +23,6 @@ package plugin import ( "errors" - "log" - "os" "testing" "time" @@ -32,6 +30,7 @@ import ( "github.com/intelsdi-x/snap/control/plugin/encoding" "github.com/intelsdi-x/snap/core" + log "github.com/Sirupsen/logrus" . "github.com/smartystreets/goconvey/convey" ) @@ -98,9 +97,7 @@ func (p *mockErrorPlugin) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { func TestCollectorProxy(t *testing.T) { Convey("Test collector plugin proxy for get metric types ", t, func() { - logger := log.New(os.Stdout, - "test: ", - log.Ldate|log.Ltime|log.Lshortfile) + logger := log.New() mockPlugin := &mockPlugin{} mockSessionState := &MockSessionState{ diff --git a/control/plugin/execution.go b/control/plugin/execution.go index ed53f5ba3..b4bfdb207 100644 --- a/control/plugin/execution.go +++ b/control/plugin/execution.go @@ -124,8 +124,10 @@ func (e *ExecutablePlugin) Run(timeout time.Duration) (Response, error) { respReceived = true close(doneChan) } else { - execLogger.WithField("plugin", path.Base(e.cmd.Path())). - Debug(stdOutScanner.Text()) + execLogger.WithFields(log.Fields{ + "plugin": path.Base(e.cmd.Path()), + "io": "stdout", + }).Debug(stdOutScanner.Text()) } } }() @@ -157,7 +159,9 @@ func (e *ExecutablePlugin) captureStderr() { for stdErrScanner.Scan() { execLogger. WithField("io", "stderr"). - WithField("plugin", path.Base(e.cmd.Path())).Debug(stdErrScanner.Text()) + WithField("plugin", path.Base(e.cmd.Path())). + Debug(stdErrScanner.Text()) + } }() } diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index 0233b77d7..7ce77afe7 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -26,7 +26,6 @@ import ( "encoding/json" "fmt" "io" // Don't use "fmt.Print*" - "log" "net" "net/http" "net/rpc" @@ -35,6 +34,8 @@ import ( "runtime" "time" + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/control/plugin/cpolicy" ) @@ -225,8 +226,8 @@ func NewPluginMeta(name string, version int, pluginType PluginType, acceptConten // Arguments passed to startup of Plugin type Arg struct { - // Plugin file path to binary - PluginLogPath string + // Plugin log level + LogLevel log.Level // Ping timeout duration PingTimeoutDuration time.Duration @@ -235,9 +236,9 @@ type Arg struct { listenPort string } -func NewArg(logpath string) Arg { +func NewArg(logLevel int) Arg { return Arg{ - PluginLogPath: logpath, + LogLevel: log.Level(logLevel), PingTimeoutDuration: PingTimeoutDurationDefault, } } @@ -328,20 +329,19 @@ func Start(m *PluginMeta, c Plugin, requestString string) (error, int) { e := rpc.Register(s) if e != nil { if e.Error() != "rpc: service already defined: SessionState" { - log.Println(e.Error()) - s.Logger().Println(e.Error()) + s.Logger().Error(e.Error()) return e, 2 } } l, err := net.Listen("tcp", "127.0.0.1:"+s.ListenPort()) if err != nil { - s.Logger().Println(err.Error()) + s.Logger().Error(err.Error()) panic(err) } s.SetListenAddress(l.Addr().String()) - s.Logger().Printf("Listening %s\n", l.Addr()) - s.Logger().Printf("Session token %s\n", s.Token()) + s.Logger().Debugf("Listening %s\n", l.Addr()) + s.Logger().Debugf("Session token %s\n", s.Token()) switch r.Meta.RPCType { case JSONRPC: diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index 0ca6f783a..af13b4238 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/snap/core" . "github.com/smartystreets/goconvey/convey" ) @@ -60,7 +61,7 @@ func TestMetricType(t *testing.T) { func TestArg(t *testing.T) { Convey("NewArg", t, func() { - arg := NewArg("/tmp/snap/plugin.log") + arg := NewArg(int(log.InfoLevel)) So(arg, ShouldNotBeNil) }) } diff --git a/control/plugin/session.go b/control/plugin/session.go index 866a806fd..11a7d61c3 100644 --- a/control/plugin/session.go +++ b/control/plugin/session.go @@ -20,6 +20,7 @@ limitations under the License. package plugin import ( + "bytes" "crypto/rand" "crypto/rsa" "encoding/base64" @@ -27,11 +28,11 @@ import ( "encoding/json" "errors" "fmt" - "log" "os" - "runtime" "time" + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/control/plugin/cpolicy" "github.com/intelsdi-x/snap/control/plugin/encoding" "github.com/intelsdi-x/snap/control/plugin/encrypter" @@ -99,7 +100,7 @@ type GetConfigPolicyReply struct { func (s *SessionState) GetConfigPolicy(args []byte, reply *[]byte) error { defer catchPluginPanic(s.Logger()) - s.logger.Println("GetConfigPolicy called") + s.logger.Debug("GetConfigPolicy called") policy, err := s.plugin.GetConfigPolicy() if err != nil { @@ -121,7 +122,7 @@ func (s *SessionState) Ping(arg []byte, reply *[]byte) error { // down or otherwise in a state we should signal poor health. // Reply should contain any context. s.ResetHeartbeat() - s.logger.Println("Ping received") + s.logger.Debug("Ping received") *reply = []byte{} return nil } @@ -133,7 +134,7 @@ func (s *SessionState) Kill(args []byte, reply *[]byte) error { if err != nil { return err } - s.logger.Printf("Kill called by agent, reason: %s\n", a.Reason) + s.logger.Debug("Kill called by agent, reason: %s\n", a.Reason) go func() { time.Sleep(time.Second * 2) s.killChan <- 0 @@ -185,7 +186,7 @@ type SetKeyArgs struct { } func (s *SessionState) SetKey(args SetKeyArgs, reply *[]byte) error { - s.logger.Println("SetKey called") + s.logger.Debug("SetKey called") out, err := s.DecryptKey(args.Key) if err != nil { return err @@ -207,19 +208,18 @@ func (s *SessionState) generateResponse(r *Response) []byte { } func (s *SessionState) heartbeatWatch(killChan chan int) { - s.logger.Println("Heartbeat started") + s.logger.Debug("Heartbeat started") count := 0 for { if time.Since(s.LastPing) >= s.PingTimeoutDuration { count++ - s.logger.Printf("Heartbeat timeout %v of %v. (Duration between checks %v)", count, PingTimeoutLimit, s.PingTimeoutDuration) + s.logger.Infof("Heartbeat timeout %v of %v. (Duration between checks %v)", count, PingTimeoutLimit, s.PingTimeoutDuration) if count >= PingTimeoutLimit { - s.logger.Println("Heartbeat timeout expired!") + s.logger.Error("Heartbeat timeout expired") defer close(killChan) return } } else { - s.logger.Println("Heartbeat timeout reset") // Reset count count = 0 } @@ -256,22 +256,12 @@ func NewSessionState(pluginArgsMsg string, plugin Plugin, meta *PluginMeta) (*Se rand.Read(rb) rs := base64.URLEncoding.EncodeToString(rb) - // Initialize a logger based on PluginLogPath - truncOrAppend := os.O_TRUNC // truncate log file explicitly given by user - // Empty or /tmp means use default tmp log (needs to be removed post-aAtruncOrAppendpha) - if pluginArg.PluginLogPath == "" || pluginArg.PluginLogPath == "/tmp" { - if runtime.GOOS == "windows" { - pluginArg.PluginLogPath = `c:\TEMP\snap_plugin.log` - } else { - pluginArg.PluginLogPath = "/tmp/snap_plugin.log" - } - truncOrAppend = os.O_APPEND - } - lf, err := os.OpenFile(pluginArg.PluginLogPath, os.O_WRONLY|os.O_CREATE|truncOrAppend, 0666) - if err != nil { - return nil, errors.New(fmt.Sprintf("error opening log file: %v", err)), 3 + logger := &log.Logger{ + Out: os.Stderr, + Formatter: &simpleFormatter{}, + Hooks: make(log.LevelHooks), + Level: pluginArg.LogLevel, } - logger := log.New(lf, ">>>", log.Ldate|log.Ltime) var enc encoding.Encoder switch meta.RPCType { @@ -319,3 +309,12 @@ func init() { gob.RegisterName("conf_policy_float", &cpolicy.FloatRule{}) gob.RegisterName("conf_policy_bool", &cpolicy.BoolRule{}) } + +// simpleFormatter is a logrus formatter that includes only the message. +type simpleFormatter struct{} + +func (*simpleFormatter) Format(entry *log.Entry) ([]byte, error) { + b := &bytes.Buffer{} + fmt.Fprintf(b, "%s\n", entry.Message) + return b.Bytes(), nil +} diff --git a/control/plugin/session_test.go b/control/plugin/session_test.go index 096737b54..55867be26 100644 --- a/control/plugin/session_test.go +++ b/control/plugin/session_test.go @@ -24,11 +24,11 @@ package plugin import ( "encoding/json" "errors" - "log" - "os" "testing" "time" + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/control/plugin/cpolicy" "github.com/intelsdi-x/snap/control/plugin/encoding" . "github.com/smartystreets/goconvey/convey" @@ -127,7 +127,7 @@ func TestSessionState(t *testing.T) { Arg: &Arg{PingTimeoutDuration: 500 * time.Millisecond}, Encoder: encoding.NewJsonEncoder(), } - ss.logger = log.New(os.Stdout, ">>>", log.Ldate|log.Ltime) + ss.logger = log.New() Convey("Ping", func() { ss.Ping([]byte{}, &[]byte{}) @@ -202,9 +202,7 @@ func TestSessionState(t *testing.T) { func TestGetConfigPolicy(t *testing.T) { Convey("Get Config Policy", t, func() { - logger := log.New(os.Stdout, - "test: ", - log.Ldate|log.Ltime|log.Lshortfile) + logger := log.New() mockPlugin := &mockPlugin{} mockSessionState := &MockSessionState{ @@ -227,9 +225,7 @@ func TestGetConfigPolicy(t *testing.T) { So(cpr.Policy, ShouldNotBeNil) }) Convey("Get error in Config Policy ", t, func() { - logger := log.New(os.Stdout, - "test: ", - log.Ldate|log.Ltime|log.Lshortfile) + logger := log.New() errSession := &errSessionState{ &MockSessionState{ Encoder: encoding.NewGobEncoder(), diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 743564014..bf675bd38 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -285,7 +285,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter "_block": "load-plugin", "path": filepath.Base(lPlugin.Details.Exec), }).Info("plugin load called") - ePlugin, err := plugin.NewExecutablePlugin(p.GenerateArgs(lPlugin.Details.Exec), path.Join(lPlugin.Details.ExecPath, lPlugin.Details.Exec)) + ePlugin, err := plugin.NewExecutablePlugin(p.GenerateArgs(int(log.GetLevel())), path.Join(lPlugin.Details.ExecPath, lPlugin.Details.Exec)) if err != nil { pmLogger.WithFields(log.Fields{ "_block": "load-plugin", @@ -559,9 +559,8 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap } // GenerateArgs generates the cli args to send when stating a plugin -func (p *pluginManager) GenerateArgs(pluginPath string) plugin.Arg { - pluginLog := filepath.Join(p.logPath, filepath.Base(pluginPath)) + ".log" - return plugin.NewArg(pluginLog) +func (p *pluginManager) GenerateArgs(logLevel int) plugin.Arg { + return plugin.NewArg(logLevel) } func (p *pluginManager) teardown() { diff --git a/control/runner.go b/control/runner.go index 9fa5f5a79..93008498f 100644 --- a/control/runner.go +++ b/control/runner.go @@ -312,7 +312,7 @@ func (r *runner) runPlugin(details *pluginDetails) error { } details.ExecPath = path.Join(tempPath, "rootfs") } - ePlugin, err := plugin.NewExecutablePlugin(r.pluginManager.GenerateArgs(details.Exec), path.Join(details.ExecPath, details.Exec)) + ePlugin, err := plugin.NewExecutablePlugin(r.pluginManager.GenerateArgs(int(log.GetLevel())), path.Join(details.ExecPath, details.Exec)) if err != nil { runnerLog.WithFields(log.Fields{ "_block": "run-plugin", diff --git a/control/runner_test.go b/control/runner_test.go index 5fb3aa751..105588698 100644 --- a/control/runner_test.go +++ b/control/runner_test.go @@ -40,8 +40,7 @@ type MockController struct { func (p *MockController) GenerateArgs(daemon bool) plugin.Arg { a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin.log", - NoDaemon: daemon, + NoDaemon: daemon, } return a } @@ -353,10 +352,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("should return an AvailablePlugin", func() { r := newRunner() r.SetEmitter(new(MockEmitter)) - a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin.log", - // Daemon: true, - } + a := plugin.Arg{} exPlugin, err := newExecutablePlugin(a, fixtures.PluginPath) if err != nil { panic(err) @@ -378,9 +374,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("availablePlugins should include returned availablePlugin", func() { r := newRunner() r.SetEmitter(new(MockEmitter)) - a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin.log", - } + a := plugin.Arg{} exPlugin, err := newExecutablePlugin(a, fixtures.PluginPath) if err != nil { panic(err) @@ -398,9 +392,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("healthcheck on healthy plugin does not increment failedHealthChecks", func() { r := newRunner() r.SetEmitter(new(MockEmitter)) - a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin.log", - } + a := plugin.Arg{} exPlugin, err := newExecutablePlugin(a, fixtures.PluginPath) if err != nil { panic(err) @@ -417,9 +409,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("healthcheck on unhealthy plugin increments failedHealthChecks", func() { r := newRunner() r.SetEmitter(new(MockEmitter)) - a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin.log", - } + a := plugin.Arg{} exPlugin, err := newExecutablePlugin(a, fixtures.PluginPath) if err != nil { panic(err) @@ -436,9 +426,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("successful healthcheck resets failedHealthChecks", func() { r := newRunner() r.SetEmitter(new(MockEmitter)) - a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin-foo.log", - } + a := plugin.Arg{} exPlugin, err := newExecutablePlugin(a, fixtures.PluginPath) if err != nil { panic(err) @@ -459,9 +447,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("three consecutive failedHealthChecks disables the plugin", func() { r := newRunner() r.SetEmitter(new(MockEmitter)) - a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin.log", - } + a := plugin.Arg{} exPlugin, err := newExecutablePlugin(a, fixtures.PluginPath) if err != nil { panic(err) @@ -494,9 +480,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("should return an AvailablePlugin in a Running state", func() { r := newRunner() r.SetEmitter(new(MockEmitter)) - a := plugin.Arg{ - PluginLogPath: "/tmp/snap-test-plugin-stop.log", - } + a := plugin.Arg{} exPlugin, err := newExecutablePlugin(a, fixtures.PluginPath) if err != nil { panic(err) diff --git a/plugin/collector/snap-plugin-collector-mock2/mock/mock.go b/plugin/collector/snap-plugin-collector-mock2/mock/mock.go index 5a763344c..5cff01f50 100644 --- a/plugin/collector/snap-plugin-collector-mock2/mock/mock.go +++ b/plugin/collector/snap-plugin-collector-mock2/mock/mock.go @@ -53,6 +53,14 @@ func (f *Mock) CollectMetrics(mts []plugin.MetricType) ([]plugin.MetricType, err rand.Seed(time.Now().UTC().UnixNano()) metrics := []plugin.MetricType{} for i := range mts { + if c, ok := mts[i].Config().Table()["long_print"]; ok && c.(ctypes.ConfigValueBool).Value { + letterBytes := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + longLine := []byte{} + for i := 0; i < 8193; i++ { + longLine = append(longLine, letterBytes[rand.Intn(len(letterBytes))]) + } + fmt.Println(string(longLine)) + } if c, ok := mts[i].Config().Table()["panic"]; ok && c.(ctypes.ConfigValueBool).Value { panic("Oops!") } diff --git a/scheduler/workflow.go b/scheduler/workflow.go index ff8773399..76e537350 100644 --- a/scheduler/workflow.go +++ b/scheduler/workflow.go @@ -21,7 +21,6 @@ package scheduler import ( "errors" - "fmt" "sync" log "github.com/Sirupsen/logrus" @@ -241,7 +240,7 @@ func (s *schedulerWorkflow) Start(t *task) { "_block": "workflow-start", "task-id": t.id, "task-name": t.name, - }).Info(fmt.Sprintf("Starting workflow for task (%s\\%s)", t.id, t.name)) + }).Debug("Starting workflow") s.state = WorkflowStarted j := newCollectorJob(s.metrics, t.deadlineDuration, t.metricsManager, t.workflow.configTree, t.id, s.tags)