Skip to content

Commit

Permalink
(otelarrowreceiver): Support disabling admission control (open-teleme…
Browse files Browse the repository at this point in the history
…try#36081)

#### Description

Adds a no-op implementation of the BoundedQueue used by the OTel-Arrow
receiver for admission control.

#### Link to tracking issue

Part of open-telemetry#36074.

#### Testing

Adds a new end-to-end test to verify admission control with/without
waiters and unbounded. The test was taken from open-telemetry#36033.

#### Documentation

Added: "0" request_limit_mib indicates no admission control.
  • Loading branch information
jmacd authored and sbylica-splunk committed Dec 17, 2024
1 parent 1488419 commit 273847f
Show file tree
Hide file tree
Showing 13 changed files with 281 additions and 42 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-unbounded.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable unlimited admission control when request_limit_mib is set to 0.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion internal/otelarrow/admission/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type waiter struct {
ID uuid.UUID
}

func NewBoundedQueue(ts component.TelemetrySettings, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitBytes, maxLimitWaiters int64) Queue {
return &BoundedQueue{
maxLimitBytes: maxLimitBytes,
maxLimitWaiters: maxLimitWaiters,
Expand Down
10 changes: 7 additions & 3 deletions internal/otelarrow/admission/boundedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@ func abs(x int64) int64 {

var noopTelemetry = componenttest.NewNopTelemetrySettings()

func newTestQueue(limitBytes, limitWaiters int64) *BoundedQueue {
return NewBoundedQueue(noopTelemetry, limitBytes, limitWaiters).(*BoundedQueue)
}

func TestAcquireSimpleNoWaiters(t *testing.T) {
maxLimitBytes := 1000
maxLimitWaiters := 10
numRequests := 40
requestSize := 21

bq := NewBoundedQueue(noopTelemetry, int64(maxLimitBytes), int64(maxLimitWaiters))
bq := newTestQueue(int64(maxLimitBytes), int64(maxLimitWaiters))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -99,7 +103,7 @@ func TestAcquireBoundedWithWaiters(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bq := NewBoundedQueue(noopTelemetry, tt.maxLimitBytes, tt.maxLimitWaiters)
bq := newTestQueue(tt.maxLimitBytes, tt.maxLimitWaiters)
var blockedRequests int64
numReqsUntilBlocked := tt.maxLimitBytes / tt.requestSize
requestsAboveLimit := abs(tt.numRequests - numReqsUntilBlocked)
Expand Down Expand Up @@ -163,7 +167,7 @@ func TestAcquireContextCanceled(t *testing.T) {
ts := noopTelemetry
ts.TracerProvider = tp

bq := NewBoundedQueue(ts, int64(maxLimitBytes), int64(maxLimitWaiters))
bq := NewBoundedQueue(ts, int64(maxLimitBytes), int64(maxLimitWaiters)).(*BoundedQueue)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var errs error
Expand Down
59 changes: 59 additions & 0 deletions internal/otelarrow/admission/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"

import (
"context"
)

// Queue is a weighted admission queue interface.
type Queue interface {
// Acquire asks the controller to admit the caller.
//
// The weight parameter specifies how large of an admission to make.
// This might be used on the bytes of request (for example) to differentiate
// between large and small requests.
//
// Admit will return when one of the following events occurs:
//
// (1) admission is allowed, or
// (2) the provided ctx becomes canceled, or
// (3) there are so many existing waiters that the
// controller decides to reject this caller without
// admitting it.
//
// In case (1), the return value will be a non-nil error. The
// caller must invoke Release(weight) after it is finished
// with the resource being guarded by the admission
// controller.
//
// In case (2), the return value will be a Cancelled or
// DeadlineExceeded error.
//
// In case (3), the return value will be a ResourceExhausted
// error.
Acquire(ctx context.Context, weight int64) error

// Release will be eliminated as part of issue #36074.
Release(weight int64) error
}

type noopController struct{}

var _ Queue = noopController{}

// NewUnboundedQueue returns a no-op implementation of the Queue interface.
func NewUnboundedQueue() Queue {
return noopController{}
}

// Acquire implements Queue.
func (noopController) Acquire(_ context.Context, _ int64) error {
return nil
}

// Acquire implements Queue.
func (noopController) Release(_ int64) error {
return nil
}
Loading

0 comments on commit 273847f

Please sign in to comment.