Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add memory-bound queue #2013

Closed
wants to merge 3 commits into from

Conversation

yurishkuro
Copy link
Member

As alternative to channel-based queue in #1985

Copy link
Contributor

@jpkrohling jpkrohling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks promising! @pavolloffay suggested something similar, but his idea was to check the size directly in the linked list, IIRC. This wouldn't require changing the backing buffer, but would require sync access during the enqueue/dequeue operations, I believe.

In any case, would be nice to get his review as well.


func (q *MemoryBoundedQueue) Enqueue(item Sizeable) bool {
size := item.Size()
for q.memSize+size > q.memCapacity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves a comment, it took me a while to realize that you are dropping items until space is freed.

}
// now we're good with memory, but may need to make the queue longer
if q.queue.Full() {
q.queue.EnsureCapacity(2 * q.queue.Capacity())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug statement?

Size() int
}

type MemoryBoundedQueue struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue isn't meant for concurrent access and this fact needs to be documented, even though there's an explicit "concurrent" version down there.

}

func NewRingBufferQueue(size int) *RingBufferQueue {
return &RingBufferQueue{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the boundaries? The min size of the queue is probably 1.

package queue

type RingBufferQueue struct {
size, head, tail int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Size could be uint

type MemoryBoundedQueue struct {
queue *RingBufferQueue
memSize int
memCapacity int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both could be uint

}

func NewMemoryBoundedQueue(initLength int, memCapacity int, onDroppedItem func(item Sizeable)) *MemoryBoundedQueue {
return &MemoryBoundedQueue{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the boundaries for the given values

return ok
}

func (q *ConcurrentMemoryBoundedQueue) StartConsumers(workers int, consumeFn func(item Sizeable)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when workers is 0, or negative?

}{
{items: []int{1, 2}, newCap: 4, expCap: 4, expItems: []int{1, 2, 0, 0}},
{items: []int{1, 2, 3, -1}, newCap: 4, expCap: 4, expItems: []int{2, 3, 0, 0}},
{items: []int{1, 2, 3, -1, -2, 4, 5}, newCap: 4, expCap: 4, expItems: []int{3, 4, 5, 0}},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting something wrong... This is would I would expect:

1, 2, 3, 0 (initial state)
2, 3, 0, 0 (after -1)
0, 0, 0, 0 (after -2)
4, 5, 0, 0 (final state)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless -2 doesn't mean take two, but take one, for the second time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think -2 means dequeue and expect the dequeued item 2.

Refer to the comment above: items []int // positive to enqueue, negative to dequeue
and the assertion below: assert.Equal(t, -item, itm)


package queue

type RingBufferQueue struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also require the consumer to take care of concurrency. Would also be good to have a documentation somewhere showing how the types relate to each other. Like:

ConcurrentMemoryBoundedQueue wraps MemoryBoundedQueue, which wraps RingBufferQueue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can make these package private and export just the concurrent version.

"github.com/uber/jaeger-lib/metrics"
)

type Sizeable interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a core API to get the size? Could we use that instead?

https://golang.org/src/go/types/sizes.go


package queue

type RingBufferQueue struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can make these package private and export just the concurrent version.

if q.Full() {
return false
}
if q.head == -1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call to Empty is more idiomatic

@yurishkuro
Copy link
Member Author

@jpkrohling this was a quick prototype, if we think it's worth pursuing I can polish it. The basic ring buffer is ready/tested, but the other two types are only written, I never ran them yet.

@jpkrohling
Copy link
Contributor

if we think it's worth pursuing I can polish it

Is it possible to benchmark it against the bounded queue that was recently changed? If the performance is similar, I think it's well worth pursuing it!

@yurishkuro
Copy link
Member Author

The new queue uses a single mutex, those are usually more efficient than Go channels. But the calls to Span.Size(), which are recursive, will introduce an extra overhead (although it will apply to both implementations). I am skeptical that the queue itself will be in any way a limiting factor in the collector's throughput, and micro-benchmarks won't reflect that anyway.

@pavolloffay
Copy link
Member

Could we have a benchmark for the concurrent version and compare it against the current impl?

The POC looks good to me, if there is no perf degradation.

@jpkrohling
Copy link
Contributor

@yurishkuro will you continue working on this one? I closed the PR that adds the flags (#1985), but just realized that this PR here only adds a new queue type. Should I reopen that and work on the review comments, eventually switching to this new queue, or would you rather have me wait for this one here to be merged first?

@yurishkuro
Copy link
Member Author

If someone could take over this pr it would be great, as I will only have time to return to it over the weekend or next week.

The other PR you closed is still needed as it introduces the wiring for the new queue (cli flags, etc).

@jpkrohling
Copy link
Contributor

The other PR you closed is still needed as it introduces the wiring for the new queue (cli flags, etc).

I'll reopen that and address the comments. If this here is merged before that one, I'll adapt that to use this new queue, otherwise, I'll send a follow-up PR once the new queue gets merged.

Yuri Shkuro added 2 commits January 19, 2020 20:44
Signed-off-by: Yuri Shkuro <ys@uber.com>
Signed-off-by: Yuri Shkuro <ys@uber.com>
@yurishkuro
Copy link
Member Author

Benchmark results are puzzling: the mutex+cond version is about x3 slower than channels.

BenchmarkChannelQueue-12    	 1227919	       968 ns/op
BenchmarkRingBuffer-12      	  477895	      2522 ns/op
BenchmarkChannel-12         	 1796756	       651 ns/op
BenchmarkCond-12            	  557828	      2170 ns/op

I added the last two benchmarks which are just raw channel and Cond, without any queueing logic. If I remove the calls to Goshed(), then Cond benchmark becomes slightly faster than channel, although the full queue is still slowed (custom data structure understandably slower than built-in channels).

BenchmarkChannelQueue-12    	14336493	        72.7 ns/op
BenchmarkRingBuffer-12      	 3808519	       318 ns/op
BenchmarkChannel-12         	11800252	        96.9 ns/op
BenchmarkCond-12            	14106967	        84.7 ns/op

The CPU profiles are very similar (with yield), spending most time in the same Cond wait, yet channel is much faster:

channel:
image

Cond:
image

Signed-off-by: Yuri Shkuro <ys@uber.com>
Copy link
Contributor

@albertteoh albertteoh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting together the benchmarks, that's an interesting insight. Forgive the ignorance in my question; does this mean the existing approach of using the ChannelQueue is preferred given the benchmark results?

}{
{items: []int{1, 2}, newCap: 4, expCap: 4, expItems: []int{1, 2, 0, 0}},
{items: []int{1, 2, 3, -1}, newCap: 4, expCap: 4, expItems: []int{2, 3, 0, 0}},
{items: []int{1, 2, 3, -1, -2, 4, 5}, newCap: 4, expCap: 4, expItems: []int{3, 4, 5, 0}},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think -2 means dequeue and expect the dequeued item 2.

Refer to the comment above: items []int // positive to enqueue, negative to dequeue
and the assertion below: assert.Equal(t, -item, itm)

expCap int
expItems []int // content of the buffer after EnsureCapacity
}{
{items: []int{1, 2}, newCap: 4, expCap: 4, expItems: []int{1, 2, 0, 0}},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use want instead of exp which I feel is a lot clearer as exp can be an abbreviation for many other words other than expect, or maybe I'm just being too pedantic :)

return false
}
if q.Empty() {
// empty queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment necessary?

return nil, false
}
var item interface{}
if q.head == q.tail {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this logic could be simplified if we took the approach of Empty == head == tail, where the tail pointer points to the next "empty" index of the buffer. This means the buffer is never "full", but there is always an empty slot. This also implies that Full means tail == ((head - 1 + size) % size). I think this should do away with the need for conditional logic, but take all this with a grain of salt as I haven't thought this out thoroughly nor tested it.

@yurishkuro
Copy link
Member Author

does this mean the existing approach of using the ChannelQueue is preferred given the benchmark results?

That is precisely the open question. I don't think these benchmarks are conclusive, the methodology is flawed. It makes sense to me that plain Cond is faster than channels, considering that profile shows that channels are using the same primitives underneath. I cannot explain why adding yield makes Cond test 3 times slower. I used yield as a standby for a syscall, which the real collector would be doing in both producer and consumer, but real syscalls are likely to dominate the benchmark.

@yurishkuro yurishkuro closed this Aug 14, 2020
@yurishkuro yurishkuro deleted the mem-bound-queue branch August 14, 2020 17:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants