-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add memory-bound queue #2013
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks promising! @pavolloffay suggested something similar, but his idea was to check the size directly in the linked list, IIRC. This wouldn't require changing the backing buffer, but would require sync access during the enqueue/dequeue operations, I believe.
In any case, would be nice to get his review as well.
pkg/queue/mem_bounded_queue.go
Outdated
|
||
func (q *MemoryBoundedQueue) Enqueue(item Sizeable) bool { | ||
size := item.Size() | ||
for q.memSize+size > q.memCapacity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deserves a comment, it took me a while to realize that you are dropping items until space is freed.
pkg/queue/mem_bounded_queue.go
Outdated
} | ||
// now we're good with memory, but may need to make the queue longer | ||
if q.queue.Full() { | ||
q.queue.EnsureCapacity(2 * q.queue.Capacity()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug statement?
pkg/queue/mem_bounded_queue.go
Outdated
Size() int | ||
} | ||
|
||
type MemoryBoundedQueue struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue isn't meant for concurrent access and this fact needs to be documented, even though there's an explicit "concurrent" version down there.
} | ||
|
||
func NewRingBufferQueue(size int) *RingBufferQueue { | ||
return &RingBufferQueue{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the boundaries? The min size of the queue is probably 1.
package queue | ||
|
||
type RingBufferQueue struct { | ||
size, head, tail int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Size could be uint
pkg/queue/mem_bounded_queue.go
Outdated
type MemoryBoundedQueue struct { | ||
queue *RingBufferQueue | ||
memSize int | ||
memCapacity int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both could be uint
pkg/queue/mem_bounded_queue.go
Outdated
} | ||
|
||
func NewMemoryBoundedQueue(initLength int, memCapacity int, onDroppedItem func(item Sizeable)) *MemoryBoundedQueue { | ||
return &MemoryBoundedQueue{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the boundaries for the given values
pkg/queue/mem_bounded_queue.go
Outdated
return ok | ||
} | ||
|
||
func (q *ConcurrentMemoryBoundedQueue) StartConsumers(workers int, consumeFn func(item Sizeable)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when workers
is 0, or negative?
}{ | ||
{items: []int{1, 2}, newCap: 4, expCap: 4, expItems: []int{1, 2, 0, 0}}, | ||
{items: []int{1, 2, 3, -1}, newCap: 4, expCap: 4, expItems: []int{2, 3, 0, 0}}, | ||
{items: []int{1, 2, 3, -1, -2, 4, 5}, newCap: 4, expCap: 4, expItems: []int{3, 4, 5, 0}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm getting something wrong... This is would I would expect:
1, 2, 3, 0 (initial state)
2, 3, 0, 0 (after -1)
0, 0, 0, 0 (after -2)
4, 5, 0, 0 (final state)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless -2
doesn't mean take two
, but take one, for the second time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think -2
means dequeue and expect the dequeued item 2
.
Refer to the comment above: items []int // positive to enqueue, negative to dequeue
and the assertion below: assert.Equal(t, -item, itm)
|
||
package queue | ||
|
||
type RingBufferQueue struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also require the consumer to take care of concurrency. Would also be good to have a documentation somewhere showing how the types relate to each other. Like:
ConcurrentMemoryBoundedQueue wraps MemoryBoundedQueue, which wraps RingBufferQueue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we can make these package private and export just the concurrent version.
pkg/queue/mem_bounded_queue.go
Outdated
"github.com/uber/jaeger-lib/metrics" | ||
) | ||
|
||
type Sizeable interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a core API to get the size? Could we use that instead?
|
||
package queue | ||
|
||
type RingBufferQueue struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we can make these package private and export just the concurrent version.
pkg/queue/ring_buffer.go
Outdated
if q.Full() { | ||
return false | ||
} | ||
if q.head == -1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call to Empty
is more idiomatic
@jpkrohling this was a quick prototype, if we think it's worth pursuing I can polish it. The basic ring buffer is ready/tested, but the other two types are only written, I never ran them yet. |
Is it possible to benchmark it against the bounded queue that was recently changed? If the performance is similar, I think it's well worth pursuing it! |
The new queue uses a single mutex, those are usually more efficient than Go channels. But the calls to Span.Size(), which are recursive, will introduce an extra overhead (although it will apply to both implementations). I am skeptical that the queue itself will be in any way a limiting factor in the collector's throughput, and micro-benchmarks won't reflect that anyway. |
Could we have a benchmark for the concurrent version and compare it against the current impl? The POC looks good to me, if there is no perf degradation. |
@yurishkuro will you continue working on this one? I closed the PR that adds the flags (#1985), but just realized that this PR here only adds a new queue type. Should I reopen that and work on the review comments, eventually switching to this new queue, or would you rather have me wait for this one here to be merged first? |
If someone could take over this pr it would be great, as I will only have time to return to it over the weekend or next week. The other PR you closed is still needed as it introduces the wiring for the new queue (cli flags, etc). |
I'll reopen that and address the comments. If this here is merged before that one, I'll adapt that to use this new queue, otherwise, I'll send a follow-up PR once the new queue gets merged. |
Signed-off-by: Yuri Shkuro <ys@uber.com>
34756bf
to
a8256f9
Compare
Benchmark results are puzzling: the mutex+cond version is about x3 slower than channels.
I added the last two benchmarks which are just raw channel and Cond, without any queueing logic. If I remove the calls to Goshed(), then Cond benchmark becomes slightly faster than channel, although the full queue is still slowed (custom data structure understandably slower than built-in channels).
The CPU profiles are very similar (with yield), spending most time in the same Cond wait, yet channel is much faster: |
Signed-off-by: Yuri Shkuro <ys@uber.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting together the benchmarks, that's an interesting insight. Forgive the ignorance in my question; does this mean the existing approach of using the ChannelQueue
is preferred given the benchmark results?
}{ | ||
{items: []int{1, 2}, newCap: 4, expCap: 4, expItems: []int{1, 2, 0, 0}}, | ||
{items: []int{1, 2, 3, -1}, newCap: 4, expCap: 4, expItems: []int{2, 3, 0, 0}}, | ||
{items: []int{1, 2, 3, -1, -2, 4, 5}, newCap: 4, expCap: 4, expItems: []int{3, 4, 5, 0}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think -2
means dequeue and expect the dequeued item 2
.
Refer to the comment above: items []int // positive to enqueue, negative to dequeue
and the assertion below: assert.Equal(t, -item, itm)
expCap int | ||
expItems []int // content of the buffer after EnsureCapacity | ||
}{ | ||
{items: []int{1, 2}, newCap: 4, expCap: 4, expItems: []int{1, 2, 0, 0}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use want
instead of exp
which I feel is a lot clearer as exp
can be an abbreviation for many other words other than expect
, or maybe I'm just being too pedantic :)
return false | ||
} | ||
if q.Empty() { | ||
// empty queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment necessary?
return nil, false | ||
} | ||
var item interface{} | ||
if q.head == q.tail { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this logic could be simplified if we took the approach of Empty
== head == tail
, where the tail
pointer points to the next "empty" index of the buffer. This means the buffer is never "full", but there is always an empty slot. This also implies that Full
means tail == ((head - 1 + size) % size)
. I think this should do away with the need for conditional logic, but take all this with a grain of salt as I haven't thought this out thoroughly nor tested it.
That is precisely the open question. I don't think these benchmarks are conclusive, the methodology is flawed. It makes sense to me that plain Cond is faster than channels, considering that profile shows that channels are using the same primitives underneath. I cannot explain why adding yield makes Cond test 3 times slower. I used yield as a standby for a syscall, which the real collector would be doing in both producer and consumer, but real syscalls are likely to dominate the benchmark. |
As alternative to channel-based queue in #1985