Skip to content

Commit

Permalink
rename queue.Batch.ACK -> queue.Batch.Done (#31903)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored and chrisberkhout committed Jun 1, 2023
1 parent 5446ed3 commit 952e468
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420]
- Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502]
- The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699]
- Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903]

==== Bugfixes

Expand Down
8 changes: 4 additions & 4 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type retryer interface {
type ttlBatch struct {
// The callback to inform the queue (and possibly the producer)
// that this batch has been acknowledged.
ack func()
done func()

// The internal hook back to the eventConsumer, used to implement the
// publisher.Batch retry interface.
Expand Down Expand Up @@ -61,7 +61,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
}

b := &ttlBatch{
ack: original.ACK,
done: original.Done,
retryer: retryer,
ttl: ttl,
events: events,
Expand All @@ -74,11 +74,11 @@ func (b *ttlBatch) Events() []publisher.Event {
}

func (b *ttlBatch) ACK() {
b.ack()
b.done()
}

func (b *ttlBatch) Drop() {
b.ack()
b.done()
}

func (b *ttlBatch) Retry() {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz
if err != nil {
return err
}
batch.ACK()
batch.Done()
received = received + batch.Count()
if received == num_events {
break
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ func (batch *diskQueueBatch) Event(i int) interface{} {
return batch.frames[i].event
}

func (batch *diskQueueBatch) ACK() {
func (batch *diskQueueBatch) Done() {
batch.queue.acks.addFrames(batch.frames)
}
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (l *ackLoop) collectAcked() chanList {
for !l.ackChans.empty() && !done {
acks := l.ackChans.front()
select {
case <-acks.ackChan:
case <-acks.doneChan:
lst.append(l.ackChans.pop())

default:
Expand Down
24 changes: 12 additions & 12 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,17 @@ type Settings struct {
}

type batch struct {
queue *broker
entries []queueEntry
ackChan chan batchAckMsg
queue *broker
entries []queueEntry
doneChan chan batchDoneMsg
}

// batchACKState stores the metadata associated with a batch of events sent to
// a consumer. When the consumer ACKs that batch, a batchAckMsg is sent on
// ackChan and received by
type batchACKState struct {
next *batchACKState
ackChan chan batchAckMsg
doneChan chan batchDoneMsg
start, count int // number of events waiting for ACK
entries []queueEntry
}
Expand Down Expand Up @@ -250,9 +250,9 @@ func (b *broker) Get(count int) (queue.Batch, error) {
// if request has been sent, we have to wait for a response
resp := <-responseChan
return &batch{
queue: b,
entries: resp.entries,
ackChan: resp.ackChan,
queue: b,
entries: resp.entries,
doneChan: resp.ackChan,
}, nil
}

Expand All @@ -277,7 +277,7 @@ func (b *broker) Metrics() (queue.Metrics, error) {
var ackChanPool = sync.Pool{
New: func() interface{} {
return &batchACKState{
ackChan: make(chan batchAckMsg, 1),
doneChan: make(chan batchDoneMsg, 1),
}
},
}
Expand Down Expand Up @@ -335,11 +335,11 @@ func (l *chanList) front() *batchACKState {
return l.head
}

func (l *chanList) nextBatchChannel() chan batchAckMsg {
func (l *chanList) nextBatchChannel() chan batchDoneMsg {
if l.head == nil {
return nil
}
return l.head.ackChan
return l.head.doneChan
}

func (l *chanList) pop() *batchACKState {
Expand Down Expand Up @@ -384,6 +384,6 @@ func (b *batch) Event(i int) interface{} {
return b.entries[i].event
}

func (b *batch) ACK() {
b.ackChan <- batchAckMsg{}
func (b *batch) Done() {
b.doneChan <- batchDoneMsg{}
}
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (l *directEventLoop) handleGetRequest(req *getRequest) {

ackCH := newBatchACKState(start, count, l.buf.entries)

req.responseChan <- getResponse{ackCH.ackChan, buf}
req.responseChan <- getResponse{ackCH.doneChan, buf}
l.pendingACKs.append(ackCH)
}

Expand Down Expand Up @@ -422,7 +422,7 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) {
entries := buf.entries[:count]
acker := newBatchACKState(0, count, entries)

req.responseChan <- getResponse{acker.ackChan, entries}
req.responseChan <- getResponse{acker.doneChan, entries}
l.pendingACKs.append(acker)

l.unackedEventCount += len(entries)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type getRequest struct {
}

type getResponse struct {
ackChan chan batchAckMsg
ackChan chan batchDoneMsg
entries []queueEntry
}

type batchAckMsg struct{}
type batchDoneMsg struct{}

// Metrics API

Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te
queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))

// Test metrics after ack
batch.ACK()
batch.Done()

queueMetricsAreValid(t, testQueue, 0, settings.Events, 0, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))

Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ type Producer interface {
Cancel() int
}

// Batch of events to be returned to Consumers. The `ACK` method will send the
// ACK signal to the queue.
// Batch of events to be returned to Consumers. The `Done` method will tell the
// queue that the batch has been consumed and its events can be discarded.
type Batch interface {
Count() int
Event(i int) interface{}
ACK()
Done()
}
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queuetest/producer_cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) {
for i := 0; i < batch.Count(); i++ {
events = append(events, batch.Event(i))
}
batch.ACK()
batch.Done()
}

// verify
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queuetest/queuetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory {
for j := 0; j < batch.Count(); j++ {
events.Done()
}
batch.ACK()
batch.Done()
}
}()
}
Expand Down

0 comments on commit 952e468

Please sign in to comment.