Skip to content

Commit

Permalink
Make memqueue.Broker internal (now memqueue.broker) (elastic#16667)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Feb 27, 2020
1 parent 6af4783 commit bb1bcb8
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Move light modules to OSS. {pull}14369[14369]
- Deprecate test flags, `generate` and `update_expected`, in favor of `data`. {pull}15292[15292]
- Python 3 is required now to run python tests and tools. {pull}14798[14798]
- The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667]

==== Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return memqueue.NewBroker(log,
return memqueue.NewQueue(log,
memqueue.Settings{
Eventer: e,
Events: 20,
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package memqueue
// broker event loop.
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
broker *Broker
broker *broker
sig chan batchAckMsg
lst chanList

Expand All @@ -36,7 +36,7 @@ type ackLoop struct {
processACK func(chanList, int)
}

func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
func newACKLoop(b *broker, processACK func(chanList, int)) *ackLoop {
l := &ackLoop{broker: b}
l.processACK = processACK
return l
Expand Down
20 changes: 10 additions & 10 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var Feature = queue.Feature("mem",
feature.Stable),
)

type Broker struct {
type broker struct {
done chan struct{}

logger logger
Expand Down Expand Up @@ -97,21 +97,21 @@ func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (que
logger = logp.L()
}

return NewBroker(logger, Settings{
return NewQueue(logger, Settings{
Eventer: eventer,
Events: config.Events,
FlushMinEvents: config.FlushMinEvents,
FlushTimeout: config.FlushTimeout,
}), nil
}

// NewBroker creates a new broker based in-memory queue holding up to sz number of events.
// NewQueue creates a new broker based in-memory queue holding up to sz number of events.
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
func NewQueue(
logger logger,
settings Settings,
) *Broker {
) queue.Queue {
// define internal channel size for producer/client requests
// to the broker
chanSize := 20
Expand All @@ -137,7 +137,7 @@ func NewBroker(
logger = logp.NewLogger("memqueue")
}

b := &Broker{
b := &broker{
done: make(chan struct{}),
logger: logger,

Expand Down Expand Up @@ -182,25 +182,25 @@ func NewBroker(
return b
}

func (b *Broker) Close() error {
func (b *broker) Close() error {
close(b.done)
if b.waitOnClose {
b.wg.Wait()
}
return nil
}

func (b *Broker) BufferConfig() queue.BufferConfig {
func (b *broker) BufferConfig() queue.BufferConfig {
return queue.BufferConfig{
Events: b.bufSize,
}
}

func (b *Broker) Producer(cfg queue.ProducerConfig) queue.Producer {
func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer {
return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel)
}

func (b *Broker) Consumer() queue.Consumer {
func (b *broker) Consumer() queue.Consumer {
return newConsumer(b)
}

Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type consumer struct {
broker *Broker
broker *broker
resp chan getResponse

done chan struct{}
Expand All @@ -49,7 +49,7 @@ const (
batchACK
)

func newConsumer(b *Broker) *consumer {
func newConsumer(b *broker) *consumer {
return &consumer{
broker: b,
resp: make(chan getResponse),
Expand Down
8 changes: 4 additions & 4 deletions libbeat/publisher/queue/memqueue/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// directEventLoop implements the broker main event loop. It buffers events,
// but tries to forward events as early as possible.
type directEventLoop struct {
broker *Broker
broker *broker

buf ringBuffer

Expand All @@ -45,7 +45,7 @@ type directEventLoop struct {
// bufferingEventLoop implements the broker main event loop.
// Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout.
type bufferingEventLoop struct {
broker *Broker
broker *broker

buf *batchBuffer
flushList flushList
Expand Down Expand Up @@ -77,7 +77,7 @@ type flushList struct {
count int
}

func newDirectEventLoop(b *Broker, size int) *directEventLoop {
func newDirectEventLoop(b *broker, size int) *directEventLoop {
l := &directEventLoop{
broker: b,
events: b.events,
Expand Down Expand Up @@ -285,7 +285,7 @@ func (l *directEventLoop) processACK(lst chanList, N int) {
}
}

func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
func newBufferingEventLoop(b *broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
l := &bufferingEventLoop{
broker: b,
maxEvents: size,
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
)

type forgetfulProducer struct {
broker *Broker
broker *broker
openState openState
}

type ackProducer struct {
broker *Broker
broker *broker
cancel bool
seq uint32
state produceState
Expand All @@ -53,7 +53,7 @@ type produceState struct {

type ackHandler func(count int)

func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer {
func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer {
openState := openState{
log: b.logger,
isOpen: atomic.MakeBool(true),
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestProducerCancelRemovesEvents(t *testing.T) {

func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory {
return func(_ *testing.T) queue.Queue {
return NewBroker(nil, Settings{
return NewQueue(nil, Settings{
Events: sz,
FlushMinEvents: minEvents,
FlushTimeout: flushTimeout,
Expand Down

0 comments on commit bb1bcb8

Please sign in to comment.