Skip to content

Commit 6e18d9b

Browse files
authored
Record admission-blocked event as a span & duration (#244)
Alternative to #242 Allow us to monitor when requests are being blocked by admission limits.
1 parent 5ed5ffc commit 6e18d9b

File tree

1 file changed

+31
-15
lines changed

1 file changed

+31
-15
lines changed

collector/admission/boundedqueue.go

+31-15
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,45 @@ import (
77

88
"github.com/google/uuid"
99
orderedmap "github.com/wk8/go-ordered-map/v2"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/codes"
12+
"go.opentelemetry.io/otel/trace"
13+
"go.opentelemetry.io/otel/trace/noop"
1014
)
1115

1216
var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters")
1317

1418
type BoundedQueue struct {
15-
maxLimitBytes int64
19+
maxLimitBytes int64
1620
maxLimitWaiters int64
17-
currentBytes int64
18-
currentWaiters int64
19-
lock sync.Mutex
20-
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
21+
currentBytes int64
22+
currentWaiters int64
23+
lock sync.Mutex
24+
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
25+
tracer trace.Tracer
2126
}
2227

2328
type waiter struct {
24-
readyCh chan struct{}
29+
readyCh chan struct{}
2530
pendingBytes int64
26-
ID uuid.UUID
31+
ID uuid.UUID
2732
}
2833

2934
func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
3035
return &BoundedQueue{
31-
maxLimitBytes: maxLimitBytes,
36+
maxLimitBytes: maxLimitBytes,
3237
maxLimitWaiters: maxLimitWaiters,
33-
waiters: orderedmap.New[uuid.UUID, waiter](),
38+
waiters: orderedmap.New[uuid.UUID, waiter](),
39+
tracer: noop.NewTracerProvider().Tracer(""),
3440
}
3541
}
3642

43+
func NewTracedBoundedQueue(tp trace.TracerProvider, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
44+
bq := NewBoundedQueue(maxLimitBytes, maxLimitWaiters)
45+
bq.tracer = tp.Tracer("otel-arrow/admission")
46+
return bq
47+
}
48+
3749
func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
3850
bq.lock.Lock()
3951
defer bq.lock.Unlock()
@@ -42,13 +54,13 @@ func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
4254
return false, fmt.Errorf("rejecting request, request size larger than configured limit")
4355
}
4456

45-
if bq.currentBytes + pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
57+
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
4658
bq.currentBytes += pendingBytes
4759
return true, nil
4860
}
4961

5062
// since we were unable to admit, check if we can wait.
51-
if bq.currentWaiters + 1 > bq.maxLimitWaiters { // too many waiters
63+
if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters
5264
return false, ErrTooManyWaiters
5365
}
5466

@@ -66,7 +78,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
6678
// otherwise we need to wait for bytes to be released
6779
curWaiter := waiter{
6880
pendingBytes: pendingBytes,
69-
readyCh: make(chan struct{}),
81+
readyCh: make(chan struct{}),
7082
}
7183

7284
bq.lock.Lock()
@@ -84,6 +96,9 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
8496
}
8597

8698
bq.lock.Unlock()
99+
ctx, span := bq.tracer.Start(ctx, "admission_blocked",
100+
trace.WithAttributes(attribute.Int64("pending", pendingBytes)))
101+
defer span.End()
87102

88103
select {
89104
case <-curWaiter.readyCh:
@@ -93,6 +108,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
93108
bq.lock.Lock()
94109
defer bq.lock.Unlock()
95110
err = fmt.Errorf("context canceled: %w ", ctx.Err())
111+
span.SetStatus(codes.Error, "context canceled")
96112

97113
_, found := bq.waiters.Delete(curWaiter.ID)
98114
if !found {
@@ -121,7 +137,7 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error {
121137
next := bq.waiters.Oldest()
122138
nextWaiter := next.Value
123139
nextKey := next.Key
124-
if bq.currentBytes + nextWaiter.pendingBytes <= bq.maxLimitBytes {
140+
if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes {
125141
bq.currentBytes += nextWaiter.pendingBytes
126142
bq.currentWaiters -= 1
127143
close(nextWaiter.readyCh)
@@ -142,9 +158,9 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error {
142158
func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool {
143159
bq.lock.Lock()
144160
defer bq.lock.Unlock()
145-
if bq.currentBytes + pendingBytes <= bq.maxLimitBytes {
161+
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes {
146162
bq.currentBytes += pendingBytes
147163
return true
148164
}
149165
return false
150-
}
166+
}

0 commit comments

Comments
 (0)