Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify logic in boundedMemoryQueue, use channels len/cap #8829

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .chloggen/simplifyqueue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: 'exporterhelper'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Simplify logic in boundedMemoryQueue, use channels len/cap"

# One or more tracking issues or pull requests related to the change
issues: [8829]
24 changes: 4 additions & 20 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,21 @@ import (
// the producer are dropped.
type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
numConsumers int
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
// NewBoundedMemoryQueue constructs the new queue of specified capacity. Capacity cannot be 0.
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
numConsumers: numConsumers,
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// Start starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error {
var startWG sync.WaitGroup
Expand All @@ -48,7 +43,6 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
startWG.Done()
defer q.stopWG.Done()
for item := range q.items {
q.size.Add(^uint32(0))
set.Callback(item)
}
}()
Expand All @@ -63,20 +57,10 @@ func (q *boundedMemoryQueue) Produce(item Request) bool {
return false
}

// we might have two concurrent backing queues at the moment
// their combined size is stored in q.size, and their combined capacity
// should match the capacity of the new queue
if q.size.Load() >= q.capacity {
return false
}

q.size.Add(1)
select {
case q.items <- item:
return true
default:
// should not happen, as overflows should have been captured earlier
q.size.Add(^uint32(0))
return false
}
}
Expand All @@ -91,11 +75,11 @@ func (q *boundedMemoryQueue) Stop() {

// Size returns the current size of the queue
func (q *boundedMemoryQueue) Size() int {
return int(q.size.Load())
return len(q.items)
}

func (q *boundedMemoryQueue) Capacity() int {
return int(q.capacity)
return cap(q.items)
}

func (q *boundedMemoryQueue) IsPersistent() bool {
Expand Down
15 changes: 14 additions & 1 deletion exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,24 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
assert.Equal(s.t, expected, s.snapshot())
}

func TestZeroSize(t *testing.T) {
func TestZeroSizeWithConsumers(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small behavior change here if capacity/size is 0 and numConsumers >0. But this is not a valid case in QueueConfig, see Validate so we should not worry.

q := NewBoundedMemoryQueue(0, 1)

err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {}))
assert.NoError(t, err)

assert.True(t, q.Produce(newStringRequest("a"))) // in process

q.Stop()
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue(0, 0)

err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {}))
assert.NoError(t, err)

assert.False(t, q.Produce(newStringRequest("a"))) // in process

q.Stop()
}