Skip to content

Commit

Permalink
(otelarrowreceiver): Fix incorrect OTLP admission control behavior (o…
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd authored and sbylica-splunk committed Dec 17, 2024
1 parent 621bacd commit 1488419
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 231 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-otlp-admit-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Avoid breaking telemetry when admission control fails in OTLP handlers.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
12 changes: 8 additions & 4 deletions internal/otelarrow/admission/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import (

"github.com/google/uuid"
orderedmap "github.com/wk8/go-ordered-map/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters")
var ErrTooManyWaiters = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data")
var ErrRequestTooLarge = status.Error(grpccodes.InvalidArgument, "rejecting request, request is too large")

type BoundedQueue struct {
maxLimitBytes int64
Expand All @@ -33,12 +37,12 @@ type waiter struct {
ID uuid.UUID
}

func NewBoundedQueue(tp trace.TracerProvider, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
return &BoundedQueue{
maxLimitBytes: maxLimitBytes,
maxLimitWaiters: maxLimitWaiters,
waiters: orderedmap.New[uuid.UUID, waiter](),
tracer: tp.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"),
tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"),
}
}

Expand All @@ -47,7 +51,7 @@ func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
defer bq.lock.Unlock()

if pendingBytes > bq.maxLimitBytes { // will never succeed
return false, fmt.Errorf("rejecting request, request size larger than configured limit")
return false, ErrRequestTooLarge
}

if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
Expand Down
12 changes: 7 additions & 5 deletions internal/otelarrow/admission/boundedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/multierr"
)

Expand All @@ -39,15 +39,15 @@ func abs(x int64) int64 {
return x
}

var noopTraces = noop.NewTracerProvider()
var noopTelemetry = componenttest.NewNopTelemetrySettings()

func TestAcquireSimpleNoWaiters(t *testing.T) {
maxLimitBytes := 1000
maxLimitWaiters := 10
numRequests := 40
requestSize := 21

bq := NewBoundedQueue(noopTraces, int64(maxLimitBytes), int64(maxLimitWaiters))
bq := NewBoundedQueue(noopTelemetry, int64(maxLimitBytes), int64(maxLimitWaiters))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestAcquireBoundedWithWaiters(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bq := NewBoundedQueue(noopTraces, tt.maxLimitBytes, tt.maxLimitWaiters)
bq := NewBoundedQueue(noopTelemetry, tt.maxLimitBytes, tt.maxLimitWaiters)
var blockedRequests int64
numReqsUntilBlocked := tt.maxLimitBytes / tt.requestSize
requestsAboveLimit := abs(tt.numRequests - numReqsUntilBlocked)
Expand Down Expand Up @@ -160,8 +160,10 @@ func TestAcquireContextCanceled(t *testing.T) {

exp := tracetest.NewInMemoryExporter()
tp := trace.NewTracerProvider(trace.WithSyncer(exp))
ts := noopTelemetry
ts.TracerProvider = tp

bq := NewBoundedQueue(tp, int64(maxLimitBytes), int64(maxLimitWaiters))
bq := NewBoundedQueue(ts, int64(maxLimitBytes), int64(maxLimitWaiters))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var errs error
Expand Down
2 changes: 1 addition & 1 deletion receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
go.opentelemetry.io/collector/receiver v0.112.0
go.opentelemetry.io/otel v1.31.0
go.opentelemetry.io/otel/metric v1.31.0
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.uber.org/goleak v1.3.0
Expand Down Expand Up @@ -81,7 +82,6 @@ require (
go.opentelemetry.io/collector/pipeline v0.112.0 // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
9 changes: 4 additions & 5 deletions receiver/otelarrowreceiver/internal/arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/mock/gomock"
"go.uber.org/zap/zaptest"
"golang.org/x/net/http2/hpack"
Expand All @@ -50,10 +49,10 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock"
)

var noopTraces = noop.NewTracerProvider()
var noopTelemetry = componenttest.NewNopTelemetrySettings()

func defaultBQ() *admission.BoundedQueue {
return admission.NewBoundedQueue(noopTraces, int64(100000), int64(10))
return admission.NewBoundedQueue(noopTelemetry, int64(100000), int64(10))
}

type compareJSONTraces struct{ ptrace.Traces }
Expand Down Expand Up @@ -490,10 +489,10 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) {
var bq *admission.BoundedQueue
if tt.rejected {
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0)
bq = admission.NewBoundedQueue(noopTraces, int64(sizer.TracesSize(td)-100), 10)
bq = admission.NewBoundedQueue(noopTelemetry, int64(sizer.TracesSize(td)-100), 10)
} else {
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)
bq = admission.NewBoundedQueue(noopTraces, defaultBoundedQueueLimit, 10)
bq = admission.NewBoundedQueue(noopTelemetry, defaultBoundedQueueLimit, 10)
}

ctc.start(ctc.newRealConsumer, bq)
Expand Down
22 changes: 10 additions & 12 deletions receiver/otelarrowreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,24 @@ func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper
// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plogotlp.ExportResponse, error) {
ld := req.Logs()
numSpans := ld.LogRecordCount()
if numSpans == 0 {
numRecords := ld.LogRecordCount()
if numRecords == 0 {
return plogotlp.NewExportResponse(), nil
}

ctx = r.obsrecv.StartLogsOp(ctx)

var err error
sizeBytes := int64(r.sizer.LogsSize(req.Logs()))
err := r.boundedQueue.Acquire(ctx, sizeBytes)
if err != nil {
return plogotlp.NewExportResponse(), err
if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil {
err = r.nextConsumer.ConsumeLogs(ctx, ld)
// Release() is not checked, see #36074.
_ = r.boundedQueue.Release(sizeBytes) // immediate release
} else {
err = acqErr
}
defer func() {
if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil {
r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr))
}
}()

err = r.nextConsumer.ConsumeLogs(ctx, ld)
r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err)
r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numRecords, err)

return plogotlp.NewExportResponse(), err
}
Expand Down
Loading

0 comments on commit 1488419

Please sign in to comment.