diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 114e4f0e2cb..6350cc021bd 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -89,7 +89,7 @@ type EventListener interface { } // CloseRef allows users to close the client asynchronously. -// A CloseReg implements a subset of function required for context.Context. +// A CloseRef implements a subset of function required for context.Context. type CloseRef interface { Done() <-chan struct{} Err() error @@ -135,7 +135,7 @@ type ClientListener interface { Closing() // Closing indicates the client is being shutdown next Closed() // Closed indicates the client being fully shutdown - Published() // event has been successfully forwarded to the publisher pipeline + Published() // event has successfully entered the queue FilteredOut(Event) // event has been filtered out/dropped by processors DroppedOnPublish(Event) // event has been dropped, while waiting for the queue } diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 50a9a6adaf3..6a95fd6bc6e 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/config" ) @@ -48,11 +49,17 @@ type IndexSelector interface { } // Group configures and combines multiple clients into load-balanced group of clients -// being managed by the publisher pipeline. +// being managed by the publisher pipeline. If QueueSettings is set then the +// pipeline will use it to create the queue. QueueSettings must be one of +// memqueue.Settings, diskqueue.Settings, proxyqueue.Settings. +// Currently it is only used to activate the proxy queue when using the Shipper +// output, but it also provides a natural migration path for moving queue +// configuration into the outputs. type Group struct { - Clients []Client - BatchSize int - Retry int + Clients []Client + BatchSize int + Retry int + QueueFactory queue.QueueFactory } // RegisterType registers a new output type. diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index 2cfd42ee737..fe19a36b31d 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -25,6 +25,8 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" + proxyqueue "github.com/elastic/beats/v7/libbeat/publisher/queue/proxy" + "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" sc "github.com/elastic/elastic-agent-shipper-client/pkg/proto" "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" @@ -106,7 +108,12 @@ func makeShipper( swb := outputs.WithBackoff(s, config.Backoff.Init, config.Backoff.Max) - return outputs.Success(config.BulkMaxSize, config.MaxRetries, swb) + return outputs.Group{ + Clients: []outputs.Client{swb}, + Retry: config.MaxRetries, + QueueFactory: proxyqueue.FactoryForSettings( + proxyqueue.Settings{BatchSize: config.BulkMaxSize}), + }, nil } // Connect establishes connection to the shipper server and implements `outputs.Connectable`. diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index 31a21999761..e1d9062a89f 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -31,15 +31,15 @@ import ( // client connects a beat with the processors and pipeline queue. type client struct { - pipeline *Pipeline + logger *logp.Logger processors beat.Processor producer queue.Producer mutex sync.Mutex waiter *clientCloseWaiter - eventFlags publisher.EventFlags - canDrop bool - reportEvents bool + eventFlags publisher.EventFlags + canDrop bool + eventWaitGroup *sync.WaitGroup // Open state, signaling, and sync primitives for coordinating client Close. isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. @@ -47,6 +47,7 @@ type client struct { closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed. done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run. + observer observer eventListener beat.EventListener clientListener beat.ClientListener } @@ -80,7 +81,6 @@ func (c *client) publish(e beat.Event) { var ( event = &e publish = true - log = c.pipeline.monitors.Logger ) c.onNewEvent() @@ -99,7 +99,7 @@ func (c *client) publish(e beat.Event) { if err != nil { // If we introduce a dead-letter queue, this is where we should // route the event to it. - log.Errorf("Failed to publish event: %v", err) + c.logger.Errorf("Failed to publish event: %v", err) } } @@ -134,8 +134,6 @@ func (c *client) publish(e beat.Event) { } func (c *client) Close() error { - log := c.logger() - // first stop ack handling. ACK handler might block on wait (with timeout), waiting // for pending events to be ACKed. c.closeOnce.Do(func() { @@ -144,34 +142,30 @@ func (c *client) Close() error { c.isOpen.Store(false) c.onClosing() - log.Debug("client: closing acker") + c.logger.Debug("client: closing acker") c.waiter.signalClose() c.waiter.wait() c.eventListener.ClientClosed() - log.Debug("client: done closing acker") + c.logger.Debug("client: done closing acker") - log.Debug("client: close queue producer") + c.logger.Debug("client: close queue producer") cancelledEventCount := c.producer.Cancel() c.onClosed(cancelledEventCount) - log.Debug("client: done producer close") + c.logger.Debug("client: done producer close") if c.processors != nil { - log.Debug("client: closing processors") + c.logger.Debug("client: closing processors") err := processors.Close(c.processors) if err != nil { - log.Errorf("client: error closing processors: %v", err) + c.logger.Errorf("client: error closing processors: %v", err) } - log.Debug("client: done closing processors") + c.logger.Debug("client: done closing processors") } }) return nil } -func (c *client) logger() *logp.Logger { - return c.pipeline.monitors.Logger -} - func (c *client) onClosing() { if c.clientListener != nil { c.clientListener.Closing() @@ -179,51 +173,46 @@ func (c *client) onClosing() { } func (c *client) onClosed(cancelledEventCount int) { - log := c.logger() - log.Debugf("client: cancelled %v events", cancelledEventCount) + c.logger.Debugf("client: cancelled %v events", cancelledEventCount) - if c.reportEvents { - log.Debugf("client: remove client events") + if c.eventWaitGroup != nil { + c.logger.Debugf("client: remove client events") if cancelledEventCount > 0 { - c.pipeline.waitCloseGroup.Add(-cancelledEventCount) + c.eventWaitGroup.Add(-cancelledEventCount) } } - c.pipeline.observer.clientClosed() + c.observer.clientClosed() if c.clientListener != nil { c.clientListener.Closed() } } func (c *client) onNewEvent() { - c.pipeline.observer.newEvent() + c.observer.newEvent() } func (c *client) onPublished() { - if c.reportEvents { - c.pipeline.waitCloseGroup.Add(1) + if c.eventWaitGroup != nil { + c.eventWaitGroup.Add(1) } - c.pipeline.observer.publishedEvent() + c.observer.publishedEvent() if c.clientListener != nil { c.clientListener.Published() } } func (c *client) onFilteredOut(e beat.Event) { - log := c.logger() - - log.Debugf("Pipeline client receives callback 'onFilteredOut' for event: %+v", e) - c.pipeline.observer.filteredEvent() + c.logger.Debugf("Pipeline client receives callback 'onFilteredOut' for event: %+v", e) + c.observer.filteredEvent() if c.clientListener != nil { c.clientListener.FilteredOut(e) } } func (c *client) onDroppedOnPublish(e beat.Event) { - log := c.logger() - - log.Debugf("Pipeline client receives callback 'onDroppedOnPublish' for event: %+v", e) - c.pipeline.observer.failedPublishEvent() + c.logger.Debugf("Pipeline client receives callback 'onDroppedOnPublish' for event: %+v", e) + c.observer.failedPublishEvent() if c.clientListener != nil { c.clientListener.DroppedOnPublish(e) } diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 51a03a63f74..4a212092c7e 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -49,8 +49,7 @@ func TestClient(t *testing.T) { if err != nil { panic(err) } - // Close the built-in queue and replace with the given one. - p.outputController.queue.Close() + // Inject a test queue so the outputController doesn't create one p.outputController.queue = qu return p @@ -137,8 +136,7 @@ func TestClientWaitClose(t *testing.T) { if err != nil { panic(err) } - // Close the built-in queue and replace with the given one. - p.outputController.queue.Close() + // Inject a test queue so the outputController doesn't create one p.outputController.queue = qu return p @@ -146,7 +144,7 @@ func TestClientWaitClose(t *testing.T) { err := logp.TestingSetup() assert.Nil(t, err) - q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1}) + q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}) pipeline := makePipeline(Settings{}, q) defer pipeline.Close() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 41ab03a6451..bf080677ef4 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -18,14 +18,13 @@ package pipeline import ( - "fmt" + "sync" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -41,8 +40,23 @@ type outputController struct { monitors Monitors observer outputObserver - queueConfig queueConfig - queue queue.Queue + // If eventWaitGroup is non-nil, it will be decremented as the queue + // reports upstream acknowledgment of published events. + eventWaitGroup *sync.WaitGroup + + // The queue is not created until the outputController is assigned a + // nonempty outputs.Group, in case the output group requests a proxy + // queue. At that time, any prior calls to outputController.queueProducer + // from incoming pipeline connections will be unblocked, and future + // requests will be handled synchronously. + queue queue.Queue + queueLock sync.Mutex + pendingRequests []producerRequest + + // This factory will be used to create the queue when needed, unless + // it is overridden by output configuration when outputController.Set + // is called. + queueFactory queue.QueueFactory workerChan chan publisher.Batch @@ -50,39 +64,32 @@ type outputController struct { workers []outputWorker } +type producerRequest struct { + config queue.ProducerConfig + responseChan chan queue.Producer +} + // outputWorker instances pass events from the shared workQueue to the outputs.Client // instances. type outputWorker interface { Close() error } -type queueConfig struct { - logger *logp.Logger - queueType string - userConfig *conf.C - ackCallback func(eventCount int) - inQueueSize int -} - func newOutputController( beat beat.Info, monitors Monitors, observer outputObserver, - queueConfig queueConfig, + eventWaitGroup *sync.WaitGroup, + queueFactory queue.QueueFactory, ) (*outputController, error) { - controller := &outputController{ - beat: beat, - monitors: monitors, - observer: observer, - queueConfig: queueConfig, - workerChan: make(chan publisher.Batch), - consumer: newEventConsumer(monitors.Logger, observer), - } - - err := controller.createQueue() - if err != nil { - return nil, err + beat: beat, + monitors: monitors, + observer: observer, + eventWaitGroup: eventWaitGroup, + queueFactory: queueFactory, + workerChan: make(chan publisher.Batch), + consumer: newEventConsumer(monitors.Logger, observer), } return controller, nil @@ -99,12 +106,30 @@ func (c *outputController) Close() error { // Closing the queue stops ACKs from propagating, so we close everything // else first to give it a chance to wait for any outstanding events to be // acknowledged. - c.queue.Close() + c.queueLock.Lock() + if c.queue != nil { + c.queue.Close() + } + for _, req := range c.pendingRequests { + // We can only end up here if there was an attempt to connect to the + // pipeline but it was shut down before any output was set. + // In this case, return nil and Pipeline.ConnectWith will pass on a + // real error to the caller. + // NOTE: under the current shutdown process, Pipeline.Close (and hence + // outputController.Close) is ~never called. So even if we did have + // blocked callers here, in a real shutdown they will never be woken + // up. But in hopes of a day when the shutdown process is more robust, + // I've decided to do the right thing here anyway. + req.responseChan <- nil + } + c.queueLock.Unlock() return nil } func (c *outputController) Set(outGrp outputs.Group) { + c.createQueueIfNeeded(outGrp) + // Set consumer to empty target to pause it while we reload c.consumer.setTarget(consumerTarget{}) @@ -169,45 +194,106 @@ func (c *outputController) Reload( return nil } +// queueProducer creates a queue producer with the given config, blocking +// until the queue is created if it does not yet exist. func (c *outputController) queueProducer(config queue.ProducerConfig) queue.Producer { - return c.queue.Producer(config) + if publishDisabled { + // If publishDisabled is set ("-N" command line flag), then no output + // will ever be set, and no queue will ever be created. In this case, + // return a no-op producer, so attempts to connect to the pipeline + // don't deadlock the shutdown process because the Beater is blocked + // on a (*Pipeline).Connect call that will never return. + return emptyProducer{} + } + c.queueLock.Lock() + if c.queue != nil { + // We defer the unlock only after the nil check because if the + // queue doesn't exist we'll need to block until it does, and + // in that case we need to manually unlock before we start waiting. + defer c.queueLock.Unlock() + return c.queue.Producer(config) + } + // If there's no queue yet, create a producer request, release the + // queue lock, and wait to receive our producer. + request := producerRequest{ + config: config, + responseChan: make(chan queue.Producer), + } + c.pendingRequests = append(c.pendingRequests, request) + c.queueLock.Unlock() + return <-request.responseChan } -func (c *outputController) createQueue() error { - config := c.queueConfig +// onACK receives event acknowledgment notifications from the queue and +// forwards them to the metrics observer and the pipeline's global event +// wait group if one is set. +func (c *outputController) onACK(eventCount int) { + c.observer.queueACKed(eventCount) + if c.eventWaitGroup != nil { + c.eventWaitGroup.Add(-eventCount) + } +} - switch config.queueType { - case memqueue.QueueType: - settings, err := memqueue.SettingsForUserConfig(config.userConfig) - if err != nil { - return err - } - // The memory queue has a special override during pipeline - // initialization for the size of its API channel buffer. - settings.InputQueueSize = config.inQueueSize - settings.ACKCallback = config.ackCallback - c.queue = memqueue.NewQueue(config.logger, settings) - case diskqueue.QueueType: - settings, err := diskqueue.SettingsForUserConfig(config.userConfig) - if err != nil { - return err - } - settings.WriteToDiskCallback = config.ackCallback - queue, err := diskqueue.NewQueue(config.logger, settings) - if err != nil { - return err - } - c.queue = queue - default: - return fmt.Errorf("'%v' is not a valid queue type", config.queueType) +func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { + logger := c.monitors.Logger + if len(outGrp.Clients) == 0 { + // If the output group is empty, there's nothing to do + return + } + c.queueLock.Lock() + defer c.queueLock.Unlock() + + if c.queue != nil { + // Some day we might support hot-swapping of output configurations, + // but for now we can only accept a nonempty output group once, and + // after that we log it as an error. + logger.Errorf("outputController received new output configuration when queue is already active") + return + } + + // Queue settings from the output take precedence, otherwise fall back + // on what we were given during initialization. + factory := outGrp.QueueFactory + if factory == nil { + factory = c.queueFactory + } + + queue, err := factory(logger, c.onACK) + if err != nil { + logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration") + s, _ := memqueue.SettingsForUserConfig(nil) + queue = memqueue.NewQueue(logger, c.onACK, s) } + c.queue = queue if c.monitors.Telemetry != nil { queueReg := c.monitors.Telemetry.NewRegistry("queue") - monitoring.NewString(queueReg, "name").Set(config.queueType) + monitoring.NewString(queueReg, "name").Set(c.queue.QueueType()) } maxEvents := c.queue.BufferConfig().MaxEvents c.observer.queueMaxEvents(maxEvents) - return nil + // Now that we've created a queue, go through and unblock any callers + // that are waiting for a producer. + for _, req := range c.pendingRequests { + req.responseChan <- c.queue.Producer(req.config) + } + c.pendingRequests = nil +} + +// emptyProducer is a placeholder queue producer that is used only when +// publishDisabled is set, so beats don't block forever waiting for +// a producer for a nonexistent queue. +type emptyProducer struct{} + +func (emptyProducer) Publish(_ interface{}) (queue.EntryID, bool) { + return 0, false +} + +func (emptyProducer) TryPublish(_ interface{}) (queue.EntryID, bool) { + return 0, false +} + +func (emptyProducer) Cancel() int { + return 0 } diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index fdc408fcc30..366f4bff1d9 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -29,7 +29,10 @@ import ( "github.com/elastic/beats/v7/libbeat/internal/testutil" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" //"github.com/elastic/beats/v7/libbeat/tests/resources" @@ -76,13 +79,16 @@ func TestOutputReload(t *testing.T) { require.NoError(t, err) defer pipeline.Close() - pipelineClient, err := pipeline.Connect() - require.NoError(t, err) - defer pipelineClient.Close() - var wg sync.WaitGroup wg.Add(1) go func() { + // Our initial pipeline has no outputs set, so we need + // to create the client in a goroutine since any + // Connect calls will block until the pipeline has an + // output. + pipelineClient, err := pipeline.Connect() + require.NoError(t, err) + defer pipelineClient.Close() for i := uint(0); i < numEventsToPublish; i++ { pipelineClient.Publish(beat.Event{}) } @@ -132,3 +138,108 @@ func TestSetEmptyOutputsSendsNilChannel(t *testing.T) { target = <-controller.consumer.targetChan assert.Nil(t, target.ch, "consumerTarget should receive a nil channel to block batch assembly") } + +func TestQueueCreatedOnlyAfterOutputExists(t *testing.T) { + controller := outputController{ + // Set event limit to 1 so we can easily tell if our settings + // were used to create the queue. + queueFactory: memqueue.FactoryForSettings( + memqueue.Settings{Events: 1}, + ), + consumer: &eventConsumer{ + // We aren't testing the values sent to eventConsumer, we + // just need a placeholder here so outputController can + // send configuration updates without blocking. + targetChan: make(chan consumerTarget, 4), + }, + observer: nilObserver, + } + // Set to an empty output group. This should not create a queue. + controller.Set(outputs.Group{}) + require.Nil(t, controller.queue, "Queue should be nil after setting empty output") + + controller.Set(outputs.Group{ + Clients: []outputs.Client{newMockClient(nil)}, + }) + require.NotNil(t, controller.queue, "Queue should be created after setting nonempty output") + assert.Equal(t, 1, controller.queue.BufferConfig().MaxEvents, "Queue should be created using provided settings") +} + +func TestOutputQueueFactoryTakesPrecedence(t *testing.T) { + // If there are queue settings provided by both the pipeline and + // the output, the output settings should be used. + controller := outputController{ + queueFactory: memqueue.FactoryForSettings( + memqueue.Settings{Events: 1}, + ), + consumer: &eventConsumer{ + targetChan: make(chan consumerTarget, 4), + }, + observer: nilObserver, + } + controller.Set(outputs.Group{ + Clients: []outputs.Client{newMockClient(nil)}, + QueueFactory: memqueue.FactoryForSettings(memqueue.Settings{Events: 2}), + }) + + // The pipeline queue settings has max events 1, the output has + // max events 2, the result should be a queue with max events 2. + assert.Equal(t, 2, controller.queue.BufferConfig().MaxEvents, "Queue should be created using settings from the output") +} + +func TestFailedQueueFactoryRevertsToDefault(t *testing.T) { + defaultSettings, _ := memqueue.SettingsForUserConfig(nil) + failedFactory := func(_ *logp.Logger, _ func(int)) (queue.Queue, error) { + return nil, fmt.Errorf("This queue creation intentionally failed") + } + controller := outputController{ + queueFactory: failedFactory, + consumer: &eventConsumer{ + targetChan: make(chan consumerTarget, 4), + }, + observer: nilObserver, + monitors: Monitors{ + Logger: logp.NewLogger("tests"), + }, + } + controller.Set(outputs.Group{ + Clients: []outputs.Client{newMockClient(nil)}, + }) + + assert.Equal(t, defaultSettings.Events, controller.queue.BufferConfig().MaxEvents, "Queue should fall back on default settings when input is invalid") +} + +func TestQueueProducerBlocksUntilOutputIsSet(t *testing.T) { + controller := outputController{ + queueFactory: memqueue.FactoryForSettings(memqueue.Settings{Events: 1}), + consumer: &eventConsumer{ + targetChan: make(chan consumerTarget, 4), + }, + observer: nilObserver, + } + // Send producer requests from different goroutines. They should all + // block, because there is no queue, but they should become unblocked + // once we set a nonempty output. + const producerCount = 10 + remaining := atomic.MakeInt(producerCount) + for i := 0; i < producerCount; i++ { + go func() { + controller.queueProducer(queue.ProducerConfig{}) + remaining.Dec() + }() + } + allStarted := waitUntilTrue(time.Second, func() bool { + return len(controller.pendingRequests) == producerCount + }) + assert.True(t, allStarted, "All queueProducer requests should be saved as pending requests by outputController") + assert.Equal(t, producerCount, remaining.Load(), "No queueProducer request should return before an output is set") + + // Set the output, then ensure that it unblocks all the waiting goroutines. + controller.Set(outputs.Group{ + Clients: []outputs.Client{newMockClient(nil)}, + }) + allFinished := waitUntilTrue(time.Second, func() bool { + return remaining.Load() == 0 + }) + assert.True(t, allFinished, "All queueProducer requests should be unblocked once an output is set") +} diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 92cbf914314..934d3c0db3d 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -48,7 +48,7 @@ type Monitors struct { // OutputFactory is used by the publisher pipeline to create an output instance. // If the group returned can be empty. The pipeline will accept events, but // eventually block. -type OutputFactory func(outputs.Observer) (string, outputs.Group, error) +type outputFactory func(outputs.Observer) (string, outputs.Group, error) func init() { flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing") @@ -108,7 +108,7 @@ func LoadWithSettings( func loadOutput( monitors Monitors, - makeOutput OutputFactory, + makeOutput outputFactory, ) (outputs.Group, error) { if publishDisabled { return outputs.Group{}, nil diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index d3c35b63830..209688bb5c2 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -21,6 +21,7 @@ package pipeline import ( + "fmt" "reflect" "sync" "time" @@ -33,6 +34,8 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -62,10 +65,12 @@ type Pipeline struct { observer observer - // wait close support - waitOnClose bool + // wait close support. If eventWaitGroup is non-nil, then publishing + // an event through this pipeline will increment it and acknowledging + // a published event will decrement it, so the pipeline can wait on + // the group on shutdown to allow pending events to be acknowledged. waitCloseTimeout time.Duration - waitCloseGroup sync.WaitGroup + eventWaitGroup *sync.WaitGroup // closeRef signal propagation support guardStartSigPropagation sync.Once @@ -128,35 +133,33 @@ func New( beatInfo: beat, monitors: monitors, observer: nilObserver, - waitOnClose: settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0, waitCloseTimeout: settings.WaitClose, processors: settings.Processors, } + if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { + // If wait-on-close is enabled, give the pipeline a WaitGroup for + // events that have been Published but not yet ACKed. + p.eventWaitGroup = &sync.WaitGroup{} + } if monitors.Metrics != nil { p.observer = newMetricsObserver(monitors.Metrics) } - ackCallback := func(eventCount int) { - p.observer.queueACKed(eventCount) - if p.waitOnClose { - p.waitCloseGroup.Add(-eventCount) - } - } - + // Convert the raw queue config to a parsed Settings object that will + // be used during queue creation. This lets us fail immediately on startup + // if there's a configuration problem. queueType := defaultQueueType if b := userQueueConfig.Name(); b != "" { queueType = b } - queueConfig := queueConfig{ - logger: monitors.Logger, - queueType: queueType, - userConfig: userQueueConfig.Config(), - ackCallback: ackCallback, - inQueueSize: settings.InputQueueSize, + queueFactory, err := queueFactoryForUserConfig( + queueType, userQueueConfig.Config(), settings.InputQueueSize) + if err != nil { + return nil, err } - output, err := newOutputController(beat, monitors, p.observer, queueConfig) + output, err := newOutputController(beat, monitors, p.observer, p.eventWaitGroup, queueFactory) if err != nil { return nil, err } @@ -175,10 +178,10 @@ func (p *Pipeline) Close() error { log.Debug("close pipeline") - if p.waitOnClose { + if p.eventWaitGroup != nil { ch := make(chan struct{}) go func() { - p.waitCloseGroup.Wait() + p.eventWaitGroup.Wait() ch <- struct{}{} }() @@ -229,7 +232,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } waitClose := cfg.WaitClose - reportEvents := p.waitOnClose processors, err := p.createEventProcessing(cfg.Processing, publishDisabled) if err != nil { @@ -237,7 +239,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } client := &client{ - pipeline: p, + logger: p.monitors.Logger, closeRef: cfg.CloseRef, done: make(chan struct{}), isOpen: atomic.MakeBool(true), @@ -245,21 +247,22 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { processors: processors, eventFlags: eventFlags, canDrop: canDrop, - reportEvents: reportEvents, + eventWaitGroup: p.eventWaitGroup, + observer: p.observer, } ackHandler := cfg.EventListener producerCfg := queue.ProducerConfig{} - if reportEvents || cfg.ClientListener != nil { + if client.eventWaitGroup != nil || cfg.ClientListener != nil { producerCfg.OnDrop = func(event interface{}) { publisherEvent, _ := event.(publisher.Event) if cfg.ClientListener != nil { cfg.ClientListener.DroppedOnPublish(publisherEvent.Content) } - if reportEvents { - p.waitCloseGroup.Add(-1) + if client.eventWaitGroup != nil { + client.eventWaitGroup.Add(-1) } } } @@ -286,6 +289,11 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { client.eventListener = ackHandler client.waiter = waiter client.producer = p.outputController.queueProducer(producerCfg) + if client.producer == nil { + // This can only happen if the pipeline was shut down while clients + // were still waiting to connect. + return nil, fmt.Errorf("client failed to connect because the pipeline is shutting down") + } p.observer.clientConnected() @@ -386,3 +394,29 @@ func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bo func (p *Pipeline) OutputReloader() OutputReloader { return p.outputController } + +// Parses the given config and returns a QueueFactory based on it. +// This helper exists to frontload config parsing errors: if there is an +// error in the queue config, we want it to show up as fatal during +// initialization, even if the queue itself isn't created until later. +func queueFactoryForUserConfig(queueType string, userConfig *conf.C, inQueueSize int) (queue.QueueFactory, error) { + switch queueType { + case memqueue.QueueType: + settings, err := memqueue.SettingsForUserConfig(userConfig) + if err != nil { + return nil, err + } + // The memory queue has a special override during pipeline + // initialization for the size of its API channel buffer. + settings.InputQueueSize = inQueueSize + return memqueue.FactoryForSettings(settings), nil + case diskqueue.QueueType: + settings, err := diskqueue.SettingsForUserConfig(userConfig) + if err != nil { + return nil, err + } + return diskqueue.FactoryForSettings(settings), nil + default: + return nil, fmt.Errorf("unrecognized queue type '%v'", queueType) + } +} diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index cd6703697f4..1278f5196ab 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -47,6 +47,10 @@ func (q *testQueue) Close() error { return nil } +func (q *testQueue) QueueType() string { + return "test" +} + func (q *testQueue) BufferConfig() queue.BufferConfig { if q.bufferConfig != nil { return q.bufferConfig() diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index 3d2cb52535d..ca357646a81 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -40,7 +40,10 @@ type mockClient struct { func (c *mockClient) String() string { return "mock_client" } func (c *mockClient) Close() error { return nil } func (c *mockClient) Publish(_ context.Context, batch publisher.Batch) error { - return c.publishFn(batch) + if c.publishFn != nil { + return c.publishFn(batch) + } + return nil } func newMockNetworkClient(publishFn mockPublishFn) outputs.Client { diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index c14b6272920..8bd2a23276c 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -100,7 +100,7 @@ func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue } s.UseCompression = compress s.UseProtobuf = protobuf - q, err := NewQueue(logp.L(), s) + q, err := NewQueue(logp.L(), nil, s) if err != nil { panic(err) } diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index 47d861045af..08d77229dcc 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -58,10 +58,6 @@ type Settings struct { // this limit can keep it from overflowing memory. WriteAheadLimit int - // A callback that is called when an event is successfully - // written to disk. - WriteToDiskCallback func(eventCount int) - // RetryInterval specifies how long to wait before retrying a fatal error // writing to disk. If MaxRetryInterval is nonzero, subsequent retries will // use exponential backoff up to the specified limit. diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index ec20e0d3926..2b754890882 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -102,9 +102,25 @@ type metricsRequestResponse struct { sizeOnDisk uint64 } +// FactoryForSettings is a simple wrapper around NewQueue so a concrete +// Settings object can be wrapped in a queue-agnostic interface for +// later use by the pipeline. +func FactoryForSettings(settings Settings) queue.QueueFactory { + return func( + logger *logp.Logger, + ackCallback func(eventCount int), + ) (queue.Queue, error) { + return NewQueue(logger, ackCallback, settings) + } +} + // NewQueue returns a disk-based queue configured with the given logger // and settings, creating it if it doesn't exist. -func NewQueue(logger *logp.Logger, settings Settings) (*diskQueue, error) { +func NewQueue( + logger *logp.Logger, + writeToDiskCallback func(eventCount int), + settings Settings, +) (*diskQueue, error) { logger = logger.Named("diskqueue") logger.Debugf( "Initializing disk queue at path %v", settings.directoryPath()) @@ -209,7 +225,7 @@ func NewQueue(logger *logp.Logger, settings Settings) (*diskQueue, error) { acks: newDiskQueueACKs(logger, nextReadPosition, positionFile), readerLoop: newReaderLoop(settings), - writerLoop: newWriterLoop(logger, settings), + writerLoop: newWriterLoop(logger, writeToDiskCallback, settings), deleterLoop: newDeleterLoop(settings), producerWriteRequestChan: make(chan producerWriteRequest), @@ -256,6 +272,10 @@ func (dq *diskQueue) Close() error { return nil } +func (dq *diskQueue) QueueType() string { + return QueueType +} + func (dq *diskQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{MaxEvents: 0} } diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index eac65fd8518..c0b780ffb38 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -89,7 +89,7 @@ func TestMetrics(t *testing.T) { // lower max segment size so we can get multiple segments settings.MaxSegmentSize = 100 - testQueue, err := NewQueue(logp.L(), settings) + testQueue, err := NewQueue(logp.L(), nil, settings) require.NoError(t, err) defer testQueue.Close() @@ -124,7 +124,7 @@ func makeTestQueue() queuetest.QueueFactory { } settings := DefaultSettings() settings.Path = dir - queue, _ := NewQueue(logp.L(), settings) + queue, _ := NewQueue(logp.L(), nil, settings) return testQueue{ diskQueue: queue, teardown: func() { diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go index 96e8cd1ac96..c0e7103c41b 100644 --- a/libbeat/publisher/queue/diskqueue/writer_loop.go +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -71,6 +71,10 @@ type writerLoop struct { // The logger for the writer loop, assigned when the queue creates it. logger *logp.Logger + // A callback that, if set, should be invoked with an event count when + // events are successfully written to disk. + writeToDiskCallback func(eventCount int) + // The writer loop listens on requestChan for frames to write, and // writes them to disk immediately (all queue capacity checking etc. is // done by the core loop before sending it to the writer). @@ -96,11 +100,16 @@ type writerLoop struct { buffer *bytes.Buffer } -func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop { +func newWriterLoop( + logger *logp.Logger, + writeToDiskCallback func(eventCount int), + settings Settings, +) *writerLoop { buffer := &bytes.Buffer{} return &writerLoop{ - logger: logger, - settings: settings, + logger: logger, + writeToDiskCallback: writeToDiskCallback, + settings: settings, requestChan: make(chan writerLoopRequest, 1), responseChan: make(chan writerLoopResponse), @@ -235,8 +244,8 @@ outerLoop: _ = wl.outputFile.Sync() // If the queue has an ACK listener, notify it the frames were written. - if wl.settings.WriteToDiskCallback != nil { - wl.settings.WriteToDiskCallback(totalACKCount) + if wl.writeToDiskCallback != nil { + wl.writeToDiskCallback(totalACKCount) } // Notify any producers with ACK listeners that their frames were written. diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 722fb501b07..0bb3ff9ed8e 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -81,7 +81,6 @@ type broker struct { } type Settings struct { - ACKCallback func(eventCount int) Events int FlushMinEvents int FlushTimeout time.Duration @@ -117,11 +116,24 @@ type chanList struct { tail *batchACKState } +// FactoryForSettings is a simple wrapper around NewQueue so a concrete +// Settings object can be wrapped in a queue-agnostic interface for +// later use by the pipeline. +func FactoryForSettings(settings Settings) queue.QueueFactory { + return func( + logger *logp.Logger, + ackCallback func(eventCount int), + ) (queue.Queue, error) { + return NewQueue(logger, ackCallback, settings), nil + } +} + // NewQueue creates a new broker based in-memory queue holding up to sz number of events. // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. func NewQueue( logger *logp.Logger, + ackCallback func(eventCount int), settings Settings, ) *broker { var ( @@ -159,7 +171,7 @@ func NewQueue( // internal broker and ACK handler channels scheduledACKs: make(chan chanList), - ackCallback: settings.ACKCallback, + ackCallback: ackCallback, metricChan: make(chan metricsRequest), } @@ -197,6 +209,10 @@ func (b *broker) Close() error { return nil } +func (b *broker) QueueType() string { + return QueueType +} + func (b *broker) BufferConfig() queue.BufferConfig { return queue.BufferConfig{ MaxEvents: b.bufSize, diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 56dac92bbb0..ef9ee52a944 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -103,7 +103,7 @@ func TestQueueMetricsBuffer(t *testing.T) { } func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { - testQueue := NewQueue(nil, settings) + testQueue := NewQueue(nil, nil, settings) defer testQueue.Close() // Send events to queue @@ -143,7 +143,7 @@ func TestProducerCancelRemovesEvents(t *testing.T) { func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { - return NewQueue(nil, Settings{ + return NewQueue(nil, nil, Settings{ Events: sz, FlushMinEvents: minEvents, FlushTimeout: flushTimeout, @@ -258,22 +258,22 @@ func TestEntryIDs(t *testing.T) { } t.Run("acking in forward order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, Settings{Events: 1000}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}) testForward(testQueue) }) t.Run("acking in reverse order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, Settings{Events: 1000}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}) testBackward(testQueue) }) t.Run("acking in forward order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) testForward(testQueue) }) t.Run("acking in reverse order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) testBackward(testQueue) }) } diff --git a/libbeat/publisher/queue/proxy/broker.go b/libbeat/publisher/queue/proxy/broker.go index 1f0f1412e7c..20400e3ab75 100644 --- a/libbeat/publisher/queue/proxy/broker.go +++ b/libbeat/publisher/queue/proxy/broker.go @@ -59,8 +59,7 @@ type broker struct { } type Settings struct { - ACKCallback func(eventCount int) - BatchSize int + BatchSize int } type queueEntry struct { @@ -82,11 +81,26 @@ type blockedRequests struct { last *blockedRequest } +const QueueType = "proxy" + +// FactoryForSettings is a simple wrapper around NewQueue so a concrete +// Settings object can be wrapped in a queue-agnostic interface for +// later use by the pipeline. +func FactoryForSettings(settings Settings) queue.QueueFactory { + return func( + logger *logp.Logger, + ackCallback func(eventCount int), + ) (queue.Queue, error) { + return NewQueue(logger, ackCallback, settings), nil + } +} + // NewQueue creates a new broker based in-memory queue holding up to sz number of events. // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. func NewQueue( logger *logp.Logger, + ackCallback func(eventCount int), settings Settings, ) *broker { if logger == nil { @@ -102,7 +116,7 @@ func NewQueue( pushChan: make(chan *pushRequest), getChan: make(chan getRequest), - ackCallback: settings.ACKCallback, + ackCallback: ackCallback, } b.wg.Add(1) @@ -120,6 +134,10 @@ func (b *broker) Close() error { return nil } +func (b *broker) QueueType() string { + return QueueType +} + func (b *broker) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} } diff --git a/libbeat/publisher/queue/proxy/queue_test.go b/libbeat/publisher/queue/proxy/queue_test.go index 5d1aa42870d..437216e2d7a 100644 --- a/libbeat/publisher/queue/proxy/queue_test.go +++ b/libbeat/publisher/queue/proxy/queue_test.go @@ -54,7 +54,7 @@ func TestBasicEventFlow(t *testing.T) { logger := logp.NewLogger("proxy-queue-tests") // Create a proxy queue where each batch is at most 2 events - testQueue := NewQueue(logger, Settings{BatchSize: 2}) + testQueue := NewQueue(logger, nil, Settings{BatchSize: 2}) defer testQueue.Close() listener := newTestACKListener() @@ -84,7 +84,7 @@ func TestBlockedProducers(t *testing.T) { logger := logp.NewLogger("proxy-queue-tests") // Create a proxy queue where each batch is at most 2 events - testQueue := NewQueue(logger, Settings{BatchSize: 2}) + testQueue := NewQueue(logger, nil, Settings{BatchSize: 2}) defer testQueue.Close() listener := newTestACKListener() @@ -125,7 +125,7 @@ func TestOutOfOrderACK(t *testing.T) { logger := logp.NewLogger("proxy-queue-tests") // Create a proxy queue where each batch is at most 2 events - testQueue := NewQueue(logger, Settings{BatchSize: 2}) + testQueue := NewQueue(logger, nil, Settings{BatchSize: 2}) defer testQueue.Close() listener := newTestACKListener() @@ -167,7 +167,7 @@ func TestOutOfOrderACK(t *testing.T) { func TestWriteAfterClose(t *testing.T) { logger := logp.NewLogger("proxy-queue-tests") - testQueue := NewQueue(logger, Settings{BatchSize: 2}) + testQueue := NewQueue(logger, nil, Settings{BatchSize: 2}) producer := testQueue.Producer(queue.ProducerConfig{}) testQueue.Close() diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index c985f02f3e9..d0e1c047610 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -21,6 +21,7 @@ import ( "errors" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/opt" ) @@ -61,6 +62,7 @@ var ErrMetricsNotImplemented = errors.New("Queue metrics not implemented") type Queue interface { Close() error + QueueType() string BufferConfig() BufferConfig Producer(cfg ProducerConfig) Producer @@ -72,6 +74,8 @@ type Queue interface { Metrics() (Metrics, error) } +type QueueFactory func(logger *logp.Logger, ack func(eventCount int)) (Queue, error) + // BufferConfig returns the pipelines buffering settings, // for the pipeline to use. // In case of the pipeline itself storing events for reporting ACKs to clients,