Skip to content

Commit

Permalink
[chore] [exporterhelper] Do not requeue on shutdown (#9054)
Browse files Browse the repository at this point in the history
This change unblocks adding the `enqueue_on_failure` option to the queue
sender by removing the requeue behavior on the shutdown. If we don't
remove requeue on shutdown, it's possible to run into a situation
described in
#7388.
After the recent refactoring, the chance of running into it is pretty
small, but it's still possible.

The only reason to requeue on shutdown is to make sure there is no data
loss with the persistent queue enabled. The persistent queue captures
all the inflight requests in the persistent storage anyway, so there is
no reason to requeue an inflight request. The only downside is it
potentially can cause sending duplicate data on the collector restart in
case of a partially failed request during shutdown.

Another option would be to rework the memory queue to never close the
channel but still ensure draining.
  • Loading branch information
dmitryax authored Dec 12, 2023
1 parent f8b1c65 commit f0f9391
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 47 deletions.
20 changes: 6 additions & 14 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"sync/atomic"

"go.opentelemetry.io/collector/component"
)
Expand All @@ -17,25 +16,19 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
stopped *atomic.Bool
items chan queueRequest[T]
items chan queueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
return &boundedMemoryQueue[T]{
items: make(chan queueRequest[T], capacity),
stopped: &atomic.Bool{},
items: make(chan queueRequest[T], capacity),
}
}

// Offer is used by the producer to submit new item to the queue.
// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
if q.stopped.Load() {
return ErrQueueIsStopped
}

select {
case q.items <- queueRequest[T]{ctx: ctx, req: req}:
return nil
Expand All @@ -46,8 +39,8 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T)) bool {
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool {
item, ok := <-q.items
if !ok {
return false
Expand All @@ -56,9 +49,8 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T)) bo
return true
}

// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.stopped.Store(true) // disable producer
close(q.items)
return nil
}
Expand Down
24 changes: 11 additions & 13 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ func TestBoundedQueue(t *testing.T) {

consumerState := newConsumerState(t)

consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) {
consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) bool {
consumerState.record(item)
<-waitCh
return true
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -74,7 +75,6 @@ func TestBoundedQueue(t *testing.T) {
}

assert.NoError(t, consumers.Shutdown(context.Background()))
assert.ErrorIs(t, q.Offer(context.Background(), "x"), ErrQueueIsStopped)
}

// In this test we run a queue with many items and a slow consumer.
Expand All @@ -89,9 +89,10 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
consumerState := newConsumerState(t)

waitChan := make(chan struct{})
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) {
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) bool {
<-waitChan
consumerState.record(item)
return true
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -106,17 +107,13 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
assert.NoError(t, q.Offer(context.Background(), "i"))
assert.NoError(t, q.Offer(context.Background(), "j"))

// we block the workers and wait for the queue to start rejecting new items to release the lock.
// This ensures that we test that the queue has been called to shutdown while items are still in the queue.
go func() {
require.EventuallyWithT(t, func(c *assert.CollectT) {
// ensure the request is rejected due to closed queue
assert.ErrorIs(t, q.Offer(context.Background(), "x"), ErrQueueIsStopped)
}, 1*time.Second, 10*time.Millisecond)
close(waitChan)
assert.NoError(t, consumers.Shutdown(context.Background()))
}()

assert.NoError(t, consumers.Shutdown(context.Background()))
// wait a bit to ensure shutdown is called and unblock the consumers.
time.Sleep(100 * time.Millisecond)
close(waitChan)

consumerState.assertConsumed(map[string]bool{
"a": true,
Expand Down Expand Up @@ -179,8 +176,9 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
b.ReportAllocs()
for i := 0; i < b.N; i++ {
q := NewBoundedMemoryQueue[string](capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) {
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) bool {
time.Sleep(1 * time.Millisecond)
return true
})
require.NoError(b, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
Expand Down Expand Up @@ -238,7 +236,7 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
func TestZeroSizeWithConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)

consumers := NewQueueConsumers(q, 1, func(context.Context, string) {})
consumers := NewQueueConsumers(q, 1, func(context.Context, string) bool { return true })
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

assert.NoError(t, q.Offer(context.Background(), "a")) // in process
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
type QueueConsumers[T any] struct {
queue Queue[T]
numConsumers int
consumeFunc func(context.Context, T)
consumeFunc func(context.Context, T) bool
stopWG sync.WaitGroup
}

func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T)) *QueueConsumers[T] {
func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) bool) *QueueConsumers[T] {
return &QueueConsumers[T]{
queue: q,
numConsumers: numConsumers,
Expand Down
7 changes: 4 additions & 3 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T)) bool {
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool {
var (
req T
onProcessingFinished func()
Expand All @@ -157,8 +157,9 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T)) bool
}

if consumed {
consumeFunc(context.Background(), req)
onProcessingFinished()
if ok := consumeFunc(context.Background(), req); ok {
onProcessingFinished()
}
return true
}
}
Expand Down
14 changes: 8 additions & 6 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
}

// createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers.
func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces)) Queue[ptrace.Traces] {
func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces) bool) Queue[ptrace.Traces] {
pq := NewPersistentQueue[ptrace.Traces](capacity, component.DataTypeTraces, component.ID{}, marshaler.MarshalTraces,
unmarshaler.UnmarshalTraces, exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
Expand Down Expand Up @@ -73,9 +73,10 @@ func createTestPersistentQueue(client storage.Client) *persistentQueue[ptrace.Tr
func TestPersistentQueue_FullCapacity(t *testing.T) {
start := make(chan struct{})
done := make(chan struct{})
pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) {
pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) bool {
<-start
<-done
return true
})
assert.Equal(t, 0, pq.Size())

Expand All @@ -99,7 +100,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) {
}

func TestPersistentQueue_Shutdown(t *testing.T) {
pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) {})
pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) bool { return true })
req := newTraces(1, 10)

for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -139,8 +140,9 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
req := newTraces(1, 10)

numMessagesConsumed := &atomic.Int32{}
pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) {
pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) bool {
numMessagesConsumed.Add(int32(1))
return true
})

for i := 0; i < c.numMessagesProduced; i++ {
Expand Down Expand Up @@ -522,7 +524,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) {
}

for i := 0; i < bb.N; i++ {
require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) {}))
require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) bool { return true }))
}
require.NoError(b, ext.Shutdown(context.Background()))
})
Expand Down Expand Up @@ -641,7 +643,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) {
// Subsequent items succeed, as deleting the first item frees enough space for the state update
reqCount--
for i := reqCount; i > 0; i-- {
require.True(t, ps.Consume(func(context.Context, ptrace.Traces) {}))
require.True(t, ps.Consume(func(context.Context, ptrace.Traces) bool { return true }))
}

// We should be able to put a new item in
Expand Down
3 changes: 2 additions & 1 deletion exporter/exporterhelper/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type Queue[T any] interface {
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
Consume(func(ctx context.Context, item T)) bool
// The provided callback function returns true if the item was consumed or false if the consumer is stopped.
Consume(func(ctx context.Context, item T) bool) bool
// Size returns the current Size of the queue
Size() int
// Capacity returns the capacity of the queue.
Expand Down
17 changes: 14 additions & 3 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

"go.opencensus.io/metric/metricdata"
Expand Down Expand Up @@ -81,6 +82,7 @@ type queueSender struct {
meter otelmetric.Meter
consumers *internal.QueueConsumers[Request]
requeuingEnabled bool
stopped *atomic.Bool

metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
Expand All @@ -105,18 +107,24 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: isPersistent,
stopped: &atomic.Bool{},
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume)
return qs
}

// consume is the function that is executed by the queue consumers to send the data to the next consumerSender.
func (qs *queueSender) consume(ctx context.Context, req Request) {
func (qs *queueSender) consume(ctx context.Context, req Request) bool {
err := qs.nextSender.send(ctx, req)

// Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender.
if err == nil || consumererror.IsPermanent(err) {
return
return true
}

// Do not requeue if the queue sender is stopped.
if qs.stopped.Load() {
return false
}

if !qs.requeuingEnabled {
Expand All @@ -125,7 +133,7 @@ func (qs *queueSender) consume(ctx context.Context, req Request) {
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
return
return true
}

if qs.queue.Offer(ctx, extractPartialRequest(req, err)) == nil {
Expand All @@ -140,6 +148,7 @@ func (qs *queueSender) consume(ctx context.Context, req Request) {
zap.Int("dropped_items", req.ItemsCount()),
)
}
return true
}

// Start is invoked during service startup.
Expand Down Expand Up @@ -203,6 +212,8 @@ func (qs *queueSender) recordWithOC() error {

// Shutdown is invoked during service shutdown.
func (qs *queueSender) Shutdown(ctx context.Context) error {
qs.stopped.Store(true)

// Cleanup queue metrics reporting
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
Expand Down
15 changes: 11 additions & 4 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,18 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
return be.queueSender.(*queueSender).queue.Size() == 0
}, time.Second, 1*time.Millisecond)

// shuts down and ensure the item is produced in the queue again
// shuts down the exporter, unsent data should be preserved as in-flight data in the persistent queue.
require.NoError(t, be.Shutdown(context.Background()))
assert.Eventually(t, func() bool {
return be.queueSender.(*queueSender).queue.Size() == 1
}, time.Second, 1*time.Millisecond)

// start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail.
replacedReq := newMockRequest(1, nil)
be, err = newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(replacedReq),
newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), host))

// wait for the item to be consumed from the queue
replacedReq.checkNumRequests(t, 1)
}

func TestQueueSenderNoStartShutdown(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func mockRequestUnmarshaler(mr Request) RequestUnmarshaler {
}

func mockRequestMarshaler(_ Request) ([]byte, error) {
return nil, nil
return []byte("mockRequest"), nil
}

func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
Expand Down

0 comments on commit f0f9391

Please sign in to comment.