Skip to content

Commit

Permalink
Defer queue creation / activate proxy queue (#35118)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Apr 21, 2023
1 parent 0807bb8 commit 519b5cd
Show file tree
Hide file tree
Showing 21 changed files with 468 additions and 166 deletions.
4 changes: 2 additions & 2 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type EventListener interface {
}

// CloseRef allows users to close the client asynchronously.
// A CloseReg implements a subset of function required for context.Context.
// A CloseRef implements a subset of function required for context.Context.
type CloseRef interface {
Done() <-chan struct{}
Err() error
Expand Down Expand Up @@ -135,7 +135,7 @@ type ClientListener interface {
Closing() // Closing indicates the client is being shutdown next
Closed() // Closed indicates the client being fully shutdown

Published() // event has been successfully forwarded to the publisher pipeline
Published() // event has successfully entered the queue
FilteredOut(Event) // event has been filtered out/dropped by processors
DroppedOnPublish(Event) // event has been dropped, while waiting for the queue
}
Expand Down
15 changes: 11 additions & 4 deletions libbeat/outputs/output_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/config"
)

Expand Down Expand Up @@ -48,11 +49,17 @@ type IndexSelector interface {
}

// Group configures and combines multiple clients into load-balanced group of clients
// being managed by the publisher pipeline.
// being managed by the publisher pipeline. If QueueSettings is set then the
// pipeline will use it to create the queue. QueueSettings must be one of
// memqueue.Settings, diskqueue.Settings, proxyqueue.Settings.
// Currently it is only used to activate the proxy queue when using the Shipper
// output, but it also provides a natural migration path for moving queue
// configuration into the outputs.
type Group struct {
Clients []Client
BatchSize int
Retry int
Clients []Client
BatchSize int
Retry int
QueueFactory queue.QueueFactory
}

// RegisterType registers a new output type.
Expand Down
9 changes: 8 additions & 1 deletion libbeat/outputs/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
proxyqueue "github.com/elastic/beats/v7/libbeat/publisher/queue/proxy"

"github.com/elastic/elastic-agent-shipper-client/pkg/helpers"
sc "github.com/elastic/elastic-agent-shipper-client/pkg/proto"
"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"
Expand Down Expand Up @@ -106,7 +108,12 @@ func makeShipper(

swb := outputs.WithBackoff(s, config.Backoff.Init, config.Backoff.Max)

return outputs.Success(config.BulkMaxSize, config.MaxRetries, swb)
return outputs.Group{
Clients: []outputs.Client{swb},
Retry: config.MaxRetries,
QueueFactory: proxyqueue.FactoryForSettings(
proxyqueue.Settings{BatchSize: config.BulkMaxSize}),
}, nil
}

// Connect establishes connection to the shipper server and implements `outputs.Connectable`.
Expand Down
63 changes: 26 additions & 37 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ import (

// client connects a beat with the processors and pipeline queue.
type client struct {
pipeline *Pipeline
logger *logp.Logger
processors beat.Processor
producer queue.Producer
mutex sync.Mutex
waiter *clientCloseWaiter

eventFlags publisher.EventFlags
canDrop bool
reportEvents bool
eventFlags publisher.EventFlags
canDrop bool
eventWaitGroup *sync.WaitGroup

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.
done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.

observer observer
eventListener beat.EventListener
clientListener beat.ClientListener
}
Expand Down Expand Up @@ -80,7 +81,6 @@ func (c *client) publish(e beat.Event) {
var (
event = &e
publish = true
log = c.pipeline.monitors.Logger
)

c.onNewEvent()
Expand All @@ -99,7 +99,7 @@ func (c *client) publish(e beat.Event) {
if err != nil {
// If we introduce a dead-letter queue, this is where we should
// route the event to it.
log.Errorf("Failed to publish event: %v", err)
c.logger.Errorf("Failed to publish event: %v", err)
}
}

Expand Down Expand Up @@ -134,8 +134,6 @@ func (c *client) publish(e beat.Event) {
}

func (c *client) Close() error {
log := c.logger()

// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
Expand All @@ -144,86 +142,77 @@ func (c *client) Close() error {
c.isOpen.Store(false)
c.onClosing()

log.Debug("client: closing acker")
c.logger.Debug("client: closing acker")
c.waiter.signalClose()
c.waiter.wait()

c.eventListener.ClientClosed()
log.Debug("client: done closing acker")
c.logger.Debug("client: done closing acker")

log.Debug("client: close queue producer")
c.logger.Debug("client: close queue producer")
cancelledEventCount := c.producer.Cancel()
c.onClosed(cancelledEventCount)
log.Debug("client: done producer close")
c.logger.Debug("client: done producer close")

if c.processors != nil {
log.Debug("client: closing processors")
c.logger.Debug("client: closing processors")
err := processors.Close(c.processors)
if err != nil {
log.Errorf("client: error closing processors: %v", err)
c.logger.Errorf("client: error closing processors: %v", err)
}
log.Debug("client: done closing processors")
c.logger.Debug("client: done closing processors")
}
})
return nil
}

func (c *client) logger() *logp.Logger {
return c.pipeline.monitors.Logger
}

func (c *client) onClosing() {
if c.clientListener != nil {
c.clientListener.Closing()
}
}

func (c *client) onClosed(cancelledEventCount int) {
log := c.logger()
log.Debugf("client: cancelled %v events", cancelledEventCount)
c.logger.Debugf("client: cancelled %v events", cancelledEventCount)

if c.reportEvents {
log.Debugf("client: remove client events")
if c.eventWaitGroup != nil {
c.logger.Debugf("client: remove client events")
if cancelledEventCount > 0 {
c.pipeline.waitCloseGroup.Add(-cancelledEventCount)
c.eventWaitGroup.Add(-cancelledEventCount)
}
}

c.pipeline.observer.clientClosed()
c.observer.clientClosed()
if c.clientListener != nil {
c.clientListener.Closed()
}
}

func (c *client) onNewEvent() {
c.pipeline.observer.newEvent()
c.observer.newEvent()
}

func (c *client) onPublished() {
if c.reportEvents {
c.pipeline.waitCloseGroup.Add(1)
if c.eventWaitGroup != nil {
c.eventWaitGroup.Add(1)
}
c.pipeline.observer.publishedEvent()
c.observer.publishedEvent()
if c.clientListener != nil {
c.clientListener.Published()
}
}

func (c *client) onFilteredOut(e beat.Event) {
log := c.logger()

log.Debugf("Pipeline client receives callback 'onFilteredOut' for event: %+v", e)
c.pipeline.observer.filteredEvent()
c.logger.Debugf("Pipeline client receives callback 'onFilteredOut' for event: %+v", e)
c.observer.filteredEvent()
if c.clientListener != nil {
c.clientListener.FilteredOut(e)
}
}

func (c *client) onDroppedOnPublish(e beat.Event) {
log := c.logger()

log.Debugf("Pipeline client receives callback 'onDroppedOnPublish' for event: %+v", e)
c.pipeline.observer.failedPublishEvent()
c.logger.Debugf("Pipeline client receives callback 'onDroppedOnPublish' for event: %+v", e)
c.observer.failedPublishEvent()
if c.clientListener != nil {
c.clientListener.DroppedOnPublish(e)
}
Expand Down
8 changes: 3 additions & 5 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func TestClient(t *testing.T) {
if err != nil {
panic(err)
}
// Close the built-in queue and replace with the given one.
p.outputController.queue.Close()
// Inject a test queue so the outputController doesn't create one
p.outputController.queue = qu

return p
Expand Down Expand Up @@ -137,16 +136,15 @@ func TestClientWaitClose(t *testing.T) {
if err != nil {
panic(err)
}
// Close the built-in queue and replace with the given one.
p.outputController.queue.Close()
// Inject a test queue so the outputController doesn't create one
p.outputController.queue = qu

return p
}
err := logp.TestingSetup()
assert.Nil(t, err)

q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1})
q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1})
pipeline := makePipeline(Settings{}, q)
defer pipeline.Close()

Expand Down
Loading

0 comments on commit 519b5cd

Please sign in to comment.