Skip to content

Commit

Permalink
Refactor: Move queue management logic into outputController, clarify …
Browse files Browse the repository at this point in the history
…language around pipeline notifications (#35078)
  • Loading branch information
faec authored and chrisberkhout committed Jun 1, 2023
1 parent d07a802 commit f4c71c5
Show file tree
Hide file tree
Showing 51 changed files with 424 additions and 466 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502]
- The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699]
- Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903]
- `queue.ACKListener` has been removed. Queue configurations now accept an explicit callback function for ACK handling. {pull}35078[35078]

==== Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type statelessLogger interface {

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar or directly to the stateless logger.
func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.ACKer {
func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.EventListener {
log := logp.NewLogger("acker")

return acker.EventPrivateReporter(func(_ int, data []interface{}) {
Expand Down
34 changes: 17 additions & 17 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ type countingClient struct {
client beat.Client
}

type countingEventer struct {
type countingClientListener struct {
wgEvents *eventCounter
}

type combinedEventer struct {
a, b beat.ClientEventer
type combinedClientListener struct {
a, b beat.ClientListener
}

func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
Expand Down Expand Up @@ -111,13 +111,13 @@ func (c *eventCounter) Wait() {
// all events published, dropped and ACKed by any active client.
// The type accepted by counter is compatible with sync.WaitGroup.
func withPipelineEventCounter(pipeline beat.PipelineConnector, counter *eventCounter) beat.PipelineConnector {
counterListener := &countingEventer{counter}
counterListener := &countingClientListener{counter}

pipeline = pipetool.WithClientConfigEdit(pipeline, func(config beat.ClientConfig) (beat.ClientConfig, error) {
if evts := config.Events; evts != nil {
config.Events = &combinedEventer{evts, counterListener}
if evts := config.ClientListener; evts != nil {
config.ClientListener = &combinedClientListener{evts, counterListener}
} else {
config.Events = counterListener
config.ClientListener = counterListener
}
return config, nil
})
Expand Down Expand Up @@ -145,36 +145,36 @@ func (c *countingClient) Close() error {
return c.client.Close()
}

func (*countingEventer) Closing() {}
func (*countingEventer) Closed() {}
func (*countingEventer) Published() {}
func (*countingClientListener) Closing() {}
func (*countingClientListener) Closed() {}
func (*countingClientListener) Published() {}

func (c *countingEventer) FilteredOut(_ beat.Event) {}
func (c *countingEventer) DroppedOnPublish(_ beat.Event) {
func (c *countingClientListener) FilteredOut(_ beat.Event) {}
func (c *countingClientListener) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}

func (c *combinedEventer) Closing() {
func (c *combinedClientListener) Closing() {
c.a.Closing()
c.b.Closing()
}

func (c *combinedEventer) Closed() {
func (c *combinedClientListener) Closed() {
c.a.Closed()
c.b.Closed()
}

func (c *combinedEventer) Published() {
func (c *combinedClientListener) Published() {
c.a.Published()
c.b.Published()
}

func (c *combinedEventer) FilteredOut(event beat.Event) {
func (c *combinedClientListener) FilteredOut(event beat.Event) {
c.a.FilteredOut(event)
c.b.FilteredOut(event)
}

func (c *combinedEventer) DroppedOnPublish(event beat.Event) {
func (c *combinedClientListener) DroppedOnPublish(event beat.Event) {
c.a.DroppedOnPublish(event)
c.b.DroppedOnPublish(event)
}
10 changes: 5 additions & 5 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (s *testInputStore) CleanupInterval() time.Duration {
type mockClient struct {
publishing []beat.Event
published []beat.Event
ackHandler beat.ACKer
ackHandler beat.EventListener
closed bool
mtx sync.Mutex
canceler context.CancelFunc
Expand Down Expand Up @@ -599,15 +599,15 @@ func (pc *mockPipelineConnector) cancelClient(i int) {
pc.clients[i].canceler()
}

func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.ACKer {
func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener {
if !blocking {
return config.ACKHandler
return config.EventListener
}

return acker.Combine(blockingACKer(starter), config.ACKHandler)
return acker.Combine(blockingACKer(starter), config.EventListener)
}

func blockingACKer(starter context.Context) beat.ACKer {
func blockingACKer(starter context.Context) beat.EventListener {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
for starter.Err() == nil {
}
Expand Down
33 changes: 16 additions & 17 deletions filebeat/input/filestream/internal/input-logfile/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
Expand Down Expand Up @@ -73,7 +72,7 @@ func newReaderGroupWithLimit(limit uint64) *readerGroup {
// function is nil in that case and must not be called.
//
// The context will be automatically cancelled once the ID is removed from the group. Calling `cancel` is optional.
func (r *readerGroup) newContext(id string, cancelation v2.Canceler) (context.Context, context.CancelFunc, error) {
func (r *readerGroup) newContext(id string, cancelation input.Canceler) (context.Context, context.CancelFunc, error) {
r.mu.Lock()
defer r.mu.Unlock()

Expand Down Expand Up @@ -144,7 +143,7 @@ func (hg *defaultHarvesterGroup) Start(ctx input.Context, s Source) {
ctx.Logger = ctx.Logger.With("source_file", sourceName)
ctx.Logger.Debug("Starting harvester for file")

hg.tg.Go(startHarvester(ctx, hg, s, false))
_ = hg.tg.Go(startHarvester(ctx, hg, s, false))
}

// Restart starts the Harvester for a Source if a Harvester is already running it waits for it
Expand All @@ -155,7 +154,7 @@ func (hg *defaultHarvesterGroup) Restart(ctx input.Context, s Source) {
ctx.Logger = ctx.Logger.With("source_file", sourceName)
ctx.Logger.Debug("Restarting harvester for file")

hg.tg.Go(startHarvester(ctx, hg, s, true))
_ = hg.tg.Go(startHarvester(ctx, hg, s, true))
}

func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, restart bool) func(context.Context) error {
Expand All @@ -178,25 +177,25 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest

harvesterCtx, cancelHarvester, err := hg.readers.newContext(srcID, canceler)
if err != nil {
return fmt.Errorf("error while adding new reader to the bookkeeper %v", err)
return fmt.Errorf("error while adding new reader to the bookkeeper %w", err)
}
ctx.Cancelation = harvesterCtx
defer cancelHarvester()

resource, err := lock(ctx, hg.store, srcID)
if err != nil {
hg.readers.remove(srcID)
return fmt.Errorf("error while locking resource: %v", err)
return fmt.Errorf("error while locking resource: %w", err)
}
defer releaseResource(resource)

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKHandler: newInputACKHandler(hg.ackCH, ctx.Logger),
CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(hg.ackCH, ctx.Logger),
})
if err != nil {
hg.readers.remove(srcID)
return fmt.Errorf("error while connecting to output with pipeline: %v", err)
return fmt.Errorf("error while connecting to output with pipeline: %w", err)
}
defer client.Close()

Expand All @@ -205,14 +204,14 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest
publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor}

err = hg.harvester.Run(ctx, s, cursor, publisher)
if err != nil && err != context.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
hg.readers.remove(srcID)
return fmt.Errorf("error while running harvester: %v", err)
return fmt.Errorf("error while running harvester: %w", err)
}
// If the context was not cancelled it means that the Harvester is stopping because of
// some internal decision, not due to outside interaction.
// If it is stopping itself, it must clean up the bookkeeper.
if ctx.Cancelation.Err() != context.Canceled {
if !errors.Is(ctx.Cancelation.Err(), context.Canceled) {
hg.readers.remove(srcID)
}

Expand All @@ -226,18 +225,18 @@ func (hg *defaultHarvesterGroup) Continue(ctx input.Context, previous, next Sour
prevID := hg.identifier.ID(previous)
nextID := hg.identifier.ID(next)

hg.tg.Go(func(canceler context.Context) error {
_ = hg.tg.Go(func(canceler context.Context) error {
previousResource, err := lock(ctx, hg.store, prevID)
if err != nil {
return fmt.Errorf("error while locking previous resource: %v", err)
return fmt.Errorf("error while locking previous resource: %w", err)
}
// mark previous state out of date
// so when reading starts again the offset is set to zero
hg.store.remove(prevID)
_ = hg.store.remove(prevID)

nextResource, err := lock(ctx, hg.store, nextID)
if err != nil {
return fmt.Errorf("error while locking next resource: %v", err)
return fmt.Errorf("error while locking next resource: %w", err)
}
hg.store.UpdateTTL(nextResource, hg.cleanTimeout)

Expand All @@ -252,7 +251,7 @@ func (hg *defaultHarvesterGroup) Continue(ctx input.Context, previous, next Sour

// Stop stops the running Harvester for a given Source.
func (hg *defaultHarvesterGroup) Stop(s Source) {
hg.tg.Go(func(_ context.Context) error {
_ = hg.tg.Go(func(_ context.Context) error {
hg.readers.remove(hg.identifier.ID(s))
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (inp *managedInput) Run(
return nil
}

func newInputACKHandler(ch *updateChan, log *logp.Logger) beat.ACKer {
func newInputACKHandler(ch *updateChan, log *logp.Logger) beat.EventListener {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
var n uint
var last int
Expand Down
10 changes: 5 additions & 5 deletions filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *testInputStore) CleanupInterval() time.Duration {
type mockClient struct {
publishing []beat.Event
published []beat.Event
ackHandler beat.ACKer
ackHandler beat.EventListener
closed bool
mtx sync.Mutex
canceler context.CancelFunc
Expand Down Expand Up @@ -268,15 +268,15 @@ func (pc *mockPipelineConnector) cancelClient(i int) {
pc.clients[i].canceler()
}

func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.ACKer {
func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener {
if !blocking {
return config.ACKHandler
return config.EventListener
}

return acker.Combine(blockingACKer(starter), config.ACKHandler)
return acker.Combine(blockingACKer(starter), config.EventListener)
}

func blockingACKer(starter context.Context) beat.ACKer {
func blockingACKer(starter context.Context) beat.EventListener {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
for starter.Err() == nil {
}
Expand Down
11 changes: 6 additions & 5 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
Expand All @@ -30,7 +31,6 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/Shopify/sarama"
"github.com/pkg/errors"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -66,7 +66,7 @@ func configure(cfg *conf.C) (input.Input, error) {

saramaConfig, err := newSaramaConfig(config)
if err != nil {
return nil, errors.Wrap(err, "initializing Sarama config")
return nil, fmt.Errorf("initializing Sarama config: %w", err)
}
return NewInput(config, saramaConfig)
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error {
log := ctx.Logger.Named("kafka input").With("hosts", input.config.Hosts)

client, err := pipeline.ConnectWith(beat.ClientConfig{
ACKHandler: acker.ConnectionOnly(
EventListener: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
Expand Down Expand Up @@ -165,7 +165,7 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error {
input.runConsumerGroup(log, client, goContext, consumerGroup)
}

if ctx.Cancelation.Err() == context.Canceled {
if errors.Is(ctx.Cancelation.Err(), context.Canceled) {
return nil
} else {
return ctx.Cancelation.Err()
Expand Down Expand Up @@ -254,6 +254,7 @@ func doneChannelContext(ctx input.Context) context.Context {
}

func (c channelCtx) Deadline() (deadline time.Time, ok bool) {
//nolint:nakedret // omitting the return gives a build error
return
}

Expand Down Expand Up @@ -311,7 +312,7 @@ func (h *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s
parser := h.parsers.Create(reader)
for h.session.Context().Err() == nil {
message, err := parser.Next()
if err == io.EOF {
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKHandler: newInputACKHandler(ctx.Logger),
CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(ctx.Logger),
})
if err != nil {
return err
Expand Down Expand Up @@ -175,7 +175,7 @@ func (inp *managedInput) createSourceID(s Source) string {
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}

func newInputACKHandler(log *logp.Logger) beat.ACKer {
func newInputACKHandler(log *logp.Logger) beat.EventListener {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
var n uint
var last int
Expand Down
Loading

0 comments on commit f4c71c5

Please sign in to comment.