Skip to content

Commit

Permalink
Add do.Once protection to beater interface (#33971)
Browse files Browse the repository at this point in the history
* add channel protection to beater interface

* add changelog
  • Loading branch information
fearful-symmetry authored Dec 7, 2022
1 parent ae67121 commit 0c77112
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364]
- Add `add_formatted_index` processor that allows the resulting index for an event to be changed based on content from the event. {pull}33800[33800]
- deps: Updated to github.com/elastic/go-sysinfo v1.9.0. {pull}33864[33864]
- Fix panic due to close of already closed channel during shutdown {pull}33971[33971]

*Auditbeat*

Expand Down
4 changes: 3 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"flag"
"fmt"
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/filebeat/channel"
Expand Down Expand Up @@ -67,6 +68,7 @@ type Filebeat struct {
moduleRegistry *fileset.ModuleRegistry
pluginFactory PluginFactory
done chan struct{}
stopOnce sync.Once // wraps the Stop() method
pipeline beat.PipelineConnector
}

Expand Down Expand Up @@ -431,7 +433,7 @@ func (fb *Filebeat) Stop() {
logp.Info("Stopping filebeat")

// Stop Filebeat
close(fb.done)
fb.stopOnce.Do(func() { close(fb.done) })
}

// Create a new pipeline loader (es client) factory
Expand Down
6 changes: 4 additions & 2 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package beater
import (
"errors"
"fmt"
"sync"

"syscall"
"time"
Expand All @@ -45,7 +46,8 @@ import (

// Heartbeat represents the root datastructure of this beat.
type Heartbeat struct {
done chan struct{}
done chan struct{}
stopOnce sync.Once
// config is used for iterating over elements of the config.
config config.Config
scheduler *scheduler.Scheduler
Expand Down Expand Up @@ -257,7 +259,7 @@ func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover,

// Stop stops the beat.
func (bt *Heartbeat) Stop() {
close(bt.done)
bt.stopOnce.Do(func() { close(bt.done) })
}

func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFrom *config.LocationWithID) error {
Expand Down
4 changes: 3 additions & 1 deletion metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
// Metricbeat implements the Beater interface for metricbeat.
type Metricbeat struct {
done chan struct{} // Channel used to initiate shutdown.
stopOnce sync.Once // wraps the Stop() method
runners []module.Runner // Active list of module runners.
config Config
autodiscover *autodiscover.Autodiscover
Expand Down Expand Up @@ -272,7 +273,8 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
// Stop should only be called a single time. Calling it more than once may
// result in undefined behavior.
func (bt *Metricbeat) Stop() {
close(bt.done)
bt.stopOnce.Do(func() { close(bt.done) })

}

// Modules return a list of all configured modules.
Expand Down
10 changes: 6 additions & 4 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beater

import (
"flag"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -77,9 +78,10 @@ func initialConfig() config.Config {

// Beater object. Contains all objects needed to run the beat
type packetbeat struct {
config *conf.C
factory *processorFactory
done chan struct{}
config *conf.C
factory *processorFactory
done chan struct{}
stopOnce sync.Once
}

// New returns a new Packetbeat beat.Beater.
Expand Down Expand Up @@ -186,5 +188,5 @@ func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error
// Called by the Beat stop function
func (pb *packetbeat) Stop() {
logp.Info("Packetbeat send stop signal")
close(pb.done)
pb.stopOnce.Do(func() { close(pb.done) })
}

0 comments on commit 0c77112

Please sign in to comment.