Skip to content

Commit

Permalink
Refactor beat exit
Browse files Browse the repository at this point in the history
* Introduce Signal function which is called if using CTRL-C or similar
* Run now returns an error and doesn't exist itself anymore
* Fix spooler and crawler shutdown issue
* Update mockbeat to check Run return error.

Thanks to @cyrilleverrier for his contribution here.
  • Loading branch information
ruflin committed Jan 19, 2016
1 parent 9f98444 commit 689d4ef
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]

*Affecting all Beats*
- Some publisher options refactoring in libbeat {pull}684[684]
- Run function to start a beat no returns an error instead of directly exiting. {pull}771[771]

*Packetbeat*

Expand Down
16 changes: 9 additions & 7 deletions filebeat/beat/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ func (s *Spooler) Run() {

case <-s.exit:
break
case event := <-s.Channel:
s.spool = append(s.spool, event)

// Spooler is full -> flush
if len(s.spool) == cap(s.spool) {
logp.Debug("spooler", "Flushing spooler because spooler full. Events flushed: %v", len(s.spool))
s.flush()
case event, ok := <-s.Channel:
if ok {
s.spool = append(s.spool, event)

// Spooler is full -> flush
if len(s.spool) == cap(s.spool) {
logp.Debug("spooler", "Flushing spooler because spooler full. Events flushed: %v", len(s.spool))
s.flush()
}
}
case <-ticker.C:
// Flush periodically
Expand Down
3 changes: 1 addition & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan c
logp.Debug("prospector", "File Configs: %v", prospectorConfig.Paths)

prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan)
c.prospectors = append(c.prospectors, prospector)

if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
c.prospectors = append(c.prospectors, prospector)
}

logp.Info("Loading Prospectors completed. Number of prospectors: %v", len(c.prospectors))
Expand Down
38 changes: 22 additions & 16 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ package beat
import (
"flag"
"fmt"
"os"
"runtime"
"sync"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -63,7 +63,9 @@ type Beat struct {
Events publisher.Client
UUID uuid.UUID

exit chan struct{}
exit chan struct{}
error error
callback sync.Once
}

// Basic configuration of every beat
Expand Down Expand Up @@ -98,7 +100,7 @@ func NewBeat(name string, version string, bt Beater) *Beat {
}

// Initiates and runs a new beat object
func Run(name string, version string, bt Beater) {
func Run(name string, version string, bt Beater) error {

b := NewBeat(name, version, bt)

Expand All @@ -110,7 +112,7 @@ func Run(name string, version string, bt Beater) {
// TODO: detect if logging was already fully setup or not
fmt.Printf("Start error: %v\n", err)
logp.Critical("Start error: %v", err)
os.Exit(1)
b.error = err
}

// If start finishes, exit has to be called. This requires start to be blocking
Expand All @@ -123,7 +125,7 @@ func Run(name string, version string, bt Beater) {
case <-b.exit:
b.Stop()
logp.Info("Exit beat completed")
return
return b.error
}
}

Expand Down Expand Up @@ -248,27 +250,31 @@ func (b *Beat) Run() error {
logp.Critical("Running the beat returned an error: %v", err)
}

return err
}

// Stop calls the beater Stop action.
// It can happen that this function is called more then once.
func (b *Beat) Stop() {
logp.Info("Stopping Beat")
b.BT.Stop()

service.Cleanup()

logp.Info("Cleaning up %s before shutting down.", b.Name)

// Call beater cleanup function
err = b.BT.Cleanup(b)
err := b.BT.Cleanup(b)
if err != nil {
logp.Err("Cleanup returned an error: %v", err)
}
return err
}

// Stop calls the beater Stop action.
// It can happen that this function is called more then once.
func (beat *Beat) Stop() {
logp.Info("Stopping Beat")
beat.BT.Stop()
}

// Exiting beat -> shutdown
func (b *Beat) Exit() {
logp.Info("Start exiting beat")
close(b.exit)

b.callback.Do(func() {
logp.Info("Start exiting beat")
close(b.exit)
})
}
7 changes: 6 additions & 1 deletion libbeat/libbeat.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package main

import (
"os"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/mock"
)

func main() {
beat.Run(mock.Name, mock.Version, &mock.Mockbeat{})
err := beat.Run(mock.Name, mock.Version, &mock.Mockbeat{})
if err != nil {
os.Exit(1)
}
}
6 changes: 4 additions & 2 deletions winlogbeat/beat/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ func (eb *Winlogbeat) Cleanup(b *beat.Beat) error {
}

func (eb *Winlogbeat) Stop() {
logp.Info("Initiating shutdown, please wait.")
close(eb.done)
logp.Info("Stopping Winlogbeat")
if eb.done != nil {
close(eb.done)
}
}

func (eb *Winlogbeat) processEventLog(
Expand Down
7 changes: 6 additions & 1 deletion winlogbeat/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"os"

"github.com/elastic/beats/libbeat/beat"
winlogbeat "github.com/elastic/beats/winlogbeat/beat"
)
Expand All @@ -9,5 +11,8 @@ import (
var Name = "winlogbeat"

func main() {
beat.Run(Name, "", winlogbeat.New())
err := beat.Run(Name, "", winlogbeat.New())
if err != nil {
os.Exit(1)
}
}

0 comments on commit 689d4ef

Please sign in to comment.