Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer queue creation / activate proxy queue #35118

Merged
merged 14 commits into from
Apr 21, 2023
4 changes: 2 additions & 2 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type EventListener interface {
}

// CloseRef allows users to close the client asynchronously.
// A CloseReg implements a subset of function required for context.Context.
// A CloseRef implements a subset of function required for context.Context.
type CloseRef interface {
Done() <-chan struct{}
Err() error
Expand Down Expand Up @@ -135,7 +135,7 @@ type ClientListener interface {
Closing() // Closing indicates the client is being shutdown next
Closed() // Closed indicates the client being fully shutdown

Published() // event has been successfully forwarded to the publisher pipeline
Published() // event has successfully entered the queue
FilteredOut(Event) // event has been filtered out/dropped by processors
DroppedOnPublish(Event) // event has been dropped, while waiting for the queue
}
Expand Down
15 changes: 11 additions & 4 deletions libbeat/outputs/output_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/config"
)

Expand Down Expand Up @@ -48,11 +49,17 @@ type IndexSelector interface {
}

// Group configures and combines multiple clients into load-balanced group of clients
// being managed by the publisher pipeline.
// being managed by the publisher pipeline. If QueueSettings is set then the
// pipeline will use it to create the queue. QueueSettings must be one of
// memqueue.Settings, diskqueue.Settings, proxyqueue.Settings.
// Currently it is only used to activate the proxy queue when using the Shipper
// output, but it also provides a natural migration path for moving queue
// configuration into the outputs.
type Group struct {
Clients []Client
BatchSize int
Retry int
Clients []Client
BatchSize int
Retry int
QueueFactory queue.QueueFactory
}

// RegisterType registers a new output type.
Expand Down
9 changes: 8 additions & 1 deletion libbeat/outputs/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
proxyqueue "github.com/elastic/beats/v7/libbeat/publisher/queue/proxy"

"github.com/elastic/elastic-agent-shipper-client/pkg/helpers"
sc "github.com/elastic/elastic-agent-shipper-client/pkg/proto"
"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"
Expand Down Expand Up @@ -106,7 +108,12 @@ func makeShipper(

swb := outputs.WithBackoff(s, config.Backoff.Init, config.Backoff.Max)

return outputs.Success(config.BulkMaxSize, config.MaxRetries, swb)
return outputs.Group{
Clients: []outputs.Client{swb},
Retry: config.MaxRetries,
QueueFactory: proxyqueue.FactoryForSettings(
proxyqueue.Settings{BatchSize: config.BulkMaxSize}),
}, nil
}

// Connect establishes connection to the shipper server and implements `outputs.Connectable`.
Expand Down
63 changes: 26 additions & 37 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ import (

// client connects a beat with the processors and pipeline queue.
type client struct {
pipeline *Pipeline
logger *logp.Logger
processors beat.Processor
producer queue.Producer
mutex sync.Mutex
waiter *clientCloseWaiter

eventFlags publisher.EventFlags
canDrop bool
reportEvents bool
eventFlags publisher.EventFlags
canDrop bool
eventWaitGroup *sync.WaitGroup

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.
done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.

observer observer
eventListener beat.EventListener
clientListener beat.ClientListener
}
Expand Down Expand Up @@ -80,7 +81,6 @@ func (c *client) publish(e beat.Event) {
var (
event = &e
publish = true
log = c.pipeline.monitors.Logger
)

c.onNewEvent()
Expand All @@ -99,7 +99,7 @@ func (c *client) publish(e beat.Event) {
if err != nil {
// If we introduce a dead-letter queue, this is where we should
// route the event to it.
log.Errorf("Failed to publish event: %v", err)
c.logger.Errorf("Failed to publish event: %v", err)
}
}

Expand Down Expand Up @@ -134,8 +134,6 @@ func (c *client) publish(e beat.Event) {
}

func (c *client) Close() error {
log := c.logger()

// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
Expand All @@ -144,86 +142,77 @@ func (c *client) Close() error {
c.isOpen.Store(false)
c.onClosing()

log.Debug("client: closing acker")
c.logger.Debug("client: closing acker")
c.waiter.signalClose()
c.waiter.wait()

c.eventListener.ClientClosed()
log.Debug("client: done closing acker")
c.logger.Debug("client: done closing acker")

log.Debug("client: close queue producer")
c.logger.Debug("client: close queue producer")
cancelledEventCount := c.producer.Cancel()
c.onClosed(cancelledEventCount)
log.Debug("client: done producer close")
c.logger.Debug("client: done producer close")

if c.processors != nil {
log.Debug("client: closing processors")
c.logger.Debug("client: closing processors")
err := processors.Close(c.processors)
if err != nil {
log.Errorf("client: error closing processors: %v", err)
c.logger.Errorf("client: error closing processors: %v", err)
}
log.Debug("client: done closing processors")
c.logger.Debug("client: done closing processors")
}
})
return nil
}

func (c *client) logger() *logp.Logger {
return c.pipeline.monitors.Logger
}

func (c *client) onClosing() {
if c.clientListener != nil {
c.clientListener.Closing()
}
}

func (c *client) onClosed(cancelledEventCount int) {
log := c.logger()
log.Debugf("client: cancelled %v events", cancelledEventCount)
c.logger.Debugf("client: cancelled %v events", cancelledEventCount)

if c.reportEvents {
log.Debugf("client: remove client events")
if c.eventWaitGroup != nil {
c.logger.Debugf("client: remove client events")
if cancelledEventCount > 0 {
c.pipeline.waitCloseGroup.Add(-cancelledEventCount)
c.eventWaitGroup.Add(-cancelledEventCount)
}
}

c.pipeline.observer.clientClosed()
c.observer.clientClosed()
if c.clientListener != nil {
c.clientListener.Closed()
}
}

func (c *client) onNewEvent() {
c.pipeline.observer.newEvent()
c.observer.newEvent()
}

func (c *client) onPublished() {
if c.reportEvents {
c.pipeline.waitCloseGroup.Add(1)
if c.eventWaitGroup != nil {
c.eventWaitGroup.Add(1)
}
c.pipeline.observer.publishedEvent()
c.observer.publishedEvent()
if c.clientListener != nil {
c.clientListener.Published()
}
}

func (c *client) onFilteredOut(e beat.Event) {
log := c.logger()

log.Debugf("Pipeline client receives callback 'onFilteredOut' for event: %+v", e)
c.pipeline.observer.filteredEvent()
c.logger.Debugf("Pipeline client receives callback 'onFilteredOut' for event: %+v", e)
c.observer.filteredEvent()
if c.clientListener != nil {
c.clientListener.FilteredOut(e)
}
}

func (c *client) onDroppedOnPublish(e beat.Event) {
log := c.logger()

log.Debugf("Pipeline client receives callback 'onDroppedOnPublish' for event: %+v", e)
c.pipeline.observer.failedPublishEvent()
c.logger.Debugf("Pipeline client receives callback 'onDroppedOnPublish' for event: %+v", e)
c.observer.failedPublishEvent()
if c.clientListener != nil {
c.clientListener.DroppedOnPublish(e)
}
Expand Down
8 changes: 3 additions & 5 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func TestClient(t *testing.T) {
if err != nil {
panic(err)
}
// Close the built-in queue and replace with the given one.
p.outputController.queue.Close()
// Inject a test queue so the outputController doesn't create one
p.outputController.queue = qu

return p
Expand Down Expand Up @@ -137,16 +136,15 @@ func TestClientWaitClose(t *testing.T) {
if err != nil {
panic(err)
}
// Close the built-in queue and replace with the given one.
p.outputController.queue.Close()
// Inject a test queue so the outputController doesn't create one
p.outputController.queue = qu

return p
}
err := logp.TestingSetup()
assert.Nil(t, err)

q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1})
q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1})
pipeline := makePipeline(Settings{}, q)
defer pipeline.Close()

Expand Down
Loading