diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index e113dd582f4d..1e9295baa352 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -29,6 +29,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Move light modules to OSS. {pull}14369[14369] - Deprecate test flags, `generate` and `update_expected`, in favor of `data`. {pull}15292[15292] - Python 3 is required now to run python tests and tools. {pull}14798[14798] +- The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667] ==== Bugfixes diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index ebb14bf28ead..965c13da6577 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -172,7 +172,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) } queueFactory := func(e queue.Eventer) (queue.Queue, error) { - return memqueue.NewBroker(log, + return memqueue.NewQueue(log, memqueue.Settings{ Eventer: e, Events: 20, diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 5c79dab38db6..e8126733de4d 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -23,7 +23,7 @@ package memqueue // broker event loop. // Producer ACKs are run in the ackLoop go-routine. type ackLoop struct { - broker *Broker + broker *broker sig chan batchAckMsg lst chanList @@ -36,7 +36,7 @@ type ackLoop struct { processACK func(chanList, int) } -func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop { +func newACKLoop(b *broker, processACK func(chanList, int)) *ackLoop { l := &ackLoop{broker: b} l.processACK = processACK return l diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index de0b54caba23..443ff3542f42 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -36,7 +36,7 @@ var Feature = queue.Feature("mem", feature.Stable), ) -type Broker struct { +type broker struct { done chan struct{} logger logger @@ -97,7 +97,7 @@ func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (que logger = logp.L() } - return NewBroker(logger, Settings{ + return NewQueue(logger, Settings{ Eventer: eventer, Events: config.Events, FlushMinEvents: config.FlushMinEvents, @@ -105,13 +105,13 @@ func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (que }), nil } -// NewBroker creates a new broker based in-memory queue holding up to sz number of events. +// NewQueue creates a new broker based in-memory queue holding up to sz number of events. // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. -func NewBroker( +func NewQueue( logger logger, settings Settings, -) *Broker { +) queue.Queue { // define internal channel size for producer/client requests // to the broker chanSize := 20 @@ -137,7 +137,7 @@ func NewBroker( logger = logp.NewLogger("memqueue") } - b := &Broker{ + b := &broker{ done: make(chan struct{}), logger: logger, @@ -182,7 +182,7 @@ func NewBroker( return b } -func (b *Broker) Close() error { +func (b *broker) Close() error { close(b.done) if b.waitOnClose { b.wg.Wait() @@ -190,17 +190,17 @@ func (b *Broker) Close() error { return nil } -func (b *Broker) BufferConfig() queue.BufferConfig { +func (b *broker) BufferConfig() queue.BufferConfig { return queue.BufferConfig{ Events: b.bufSize, } } -func (b *Broker) Producer(cfg queue.ProducerConfig) queue.Producer { +func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel) } -func (b *Broker) Consumer() queue.Consumer { +func (b *broker) Consumer() queue.Consumer { return newConsumer(b) } diff --git a/libbeat/publisher/queue/memqueue/consume.go b/libbeat/publisher/queue/memqueue/consume.go index 013642eadc8f..f225fc130066 100644 --- a/libbeat/publisher/queue/memqueue/consume.go +++ b/libbeat/publisher/queue/memqueue/consume.go @@ -27,7 +27,7 @@ import ( ) type consumer struct { - broker *Broker + broker *broker resp chan getResponse done chan struct{} @@ -49,7 +49,7 @@ const ( batchACK ) -func newConsumer(b *Broker) *consumer { +func newConsumer(b *broker) *consumer { return &consumer{ broker: b, resp: make(chan getResponse), diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 79769da504dd..83ab4fb7b5b1 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -26,7 +26,7 @@ import ( // directEventLoop implements the broker main event loop. It buffers events, // but tries to forward events as early as possible. type directEventLoop struct { - broker *Broker + broker *broker buf ringBuffer @@ -45,7 +45,7 @@ type directEventLoop struct { // bufferingEventLoop implements the broker main event loop. // Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout. type bufferingEventLoop struct { - broker *Broker + broker *broker buf *batchBuffer flushList flushList @@ -77,7 +77,7 @@ type flushList struct { count int } -func newDirectEventLoop(b *Broker, size int) *directEventLoop { +func newDirectEventLoop(b *broker, size int) *directEventLoop { l := &directEventLoop{ broker: b, events: b.events, @@ -285,7 +285,7 @@ func (l *directEventLoop) processACK(lst chanList, N int) { } } -func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop { +func newBufferingEventLoop(b *broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop { l := &bufferingEventLoop{ broker: b, maxEvents: size, diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 1409ea9941f5..67c0c49f62ab 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -25,12 +25,12 @@ import ( ) type forgetfulProducer struct { - broker *Broker + broker *broker openState openState } type ackProducer struct { - broker *Broker + broker *broker cancel bool seq uint32 state produceState @@ -53,7 +53,7 @@ type produceState struct { type ackHandler func(count int) -func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer { +func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer { openState := openState{ log: b.logger, isOpen: atomic.MakeBool(true), diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index b65ad96ea26a..eca1f1918caf 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -74,7 +74,7 @@ func TestProducerCancelRemovesEvents(t *testing.T) { func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { - return NewBroker(nil, Settings{ + return NewQueue(nil, Settings{ Events: sz, FlushMinEvents: minEvents, FlushTimeout: flushTimeout,