Skip to content

Commit

Permalink
(otelarrowreceiver): Use a single call to BoundedQueue.Acquire (open-…
Browse files Browse the repository at this point in the history
…telemetry#36082)

#### Description

Simplifies the admission control logic for OTAP payloads. We call
Acquire() once after uncompressing the data, instead of once with
compressed size and once with the difference.

#### Link to tracking issue

Part of open-telemetry#36074.

#### Testing

One test is replaced with logic to verify certain BoundedQueue actions.

~Note: the OTel-Arrow test suite will not pass with this PR until it
merges with open-telemetry#36078.~ Originally developed in open-telemetry#36033.

#### Documentation

Not user-visible.
  • Loading branch information
jmacd authored and ArthurSens committed Nov 4, 2024
1 parent f6208bc commit 20bae43
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 143 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-arrow-single-acquire.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Simplify receiver admission control logic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/net v0.30.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
)

require (
Expand Down Expand Up @@ -90,6 +89,7 @@ require (
golang.org/x/tools v0.22.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
96 changes: 13 additions & 83 deletions receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"errors"
"fmt"
"io"
"net"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -39,7 +37,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"
Expand Down Expand Up @@ -454,9 +451,8 @@ type inFlightData struct {
// consumeAndRespond() function.
refs atomic.Int32

numAcquired int64 // how many bytes held in the semaphore
numItems int // how many items
uncompSize int64 // uncompressed data size
numItems int // how many items
uncompSize int64 // uncompressed data size == how many bytes held in the semaphore
}

func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) {
Expand Down Expand Up @@ -505,8 +501,8 @@ func (id *inFlightData) anyDone(ctx context.Context) {

id.span.End()

if id.numAcquired != 0 {
if err := id.boundedQueue.Release(id.numAcquired); err != nil {
if id.uncompSize != 0 {
if err := id.boundedQueue.Release(id.uncompSize); err != nil {
id.telemetry.Logger.Error("release error", zap.Error(err))
}
}
Expand Down Expand Up @@ -606,19 +602,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
}
}

var prevAcquiredBytes int64
uncompSizeHeaderStr, uncompSizeHeaderFound := authHdrs["otlp-pdata-size"]
if !uncompSizeHeaderFound || len(uncompSizeHeaderStr) == 0 {
// This is a compressed size so make sure to acquire the difference when request is decompressed.
prevAcquiredBytes = int64(proto.Size(req))
} else {
var parseErr error
prevAcquiredBytes, parseErr = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64)
if parseErr != nil {
return status.Errorf(codes.Internal, "failed to convert string to request size: %v", parseErr)
}
}

var callerCancel context.CancelFunc
if encodedTimeout, has := authHdrs["grpc-timeout"]; has && len(encodedTimeout) == 1 {
if timeout, decodeErr := grpcutil.DecodeTimeout(encodedTimeout[0]); decodeErr != nil {
Expand All @@ -638,17 +621,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
}
}

// Use the bounded queue to memory limit based on incoming
// uncompressed request size and waiters. Acquire will fail
// immediately if there are too many waiters, or will
// otherwise block until timeout or enough memory becomes
// available.
acquireErr := r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes)
if acquireErr != nil {
return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", acquireErr)
}
flight.numAcquired = prevAcquiredBytes

data, numItems, uncompSize, consumeErr := r.consumeBatch(ac, req)

if consumeErr != nil {
Expand All @@ -658,19 +630,21 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
return status.Errorf(codes.Internal, "otel-arrow decode: %v", consumeErr)
}

// Use the bounded queue to memory limit based on incoming
// uncompressed request size and waiters. Acquire will fail
// immediately if there are too many waiters, or will
// otherwise block until timeout or enough memory becomes
// available.
acquireErr := r.boundedQueue.Acquire(inflightCtx, uncompSize)
if acquireErr != nil {
return acquireErr
}
flight.uncompSize = uncompSize
flight.numItems = numItems

r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize)
r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems))

numAcquired, secondAcquireErr := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound)

flight.numAcquired = numAcquired
if secondAcquireErr != nil {
return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", secondAcquireErr)
}

// Recognize that the request is still in-flight via consumeAndRespond()
flight.refs.Add(1)

Expand Down Expand Up @@ -901,47 +875,3 @@ func (r *Receiver) consumeData(ctx context.Context, data any, flight *inFlightDa
}
return retErr
}

func (r *Receiver) acquireAdditionalBytes(ctx context.Context, prevAcquired, uncompSize int64, addr net.Addr, uncompSizeHeaderFound bool) (int64, error) {
diff := uncompSize - prevAcquired

if diff == 0 {
return uncompSize, nil
}

if uncompSizeHeaderFound {
var clientAddr string
if addr != nil {
clientAddr = addr.String()
}
// a mismatch between header set by exporter and the uncompSize just calculated.
r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header",
zap.String("client-address", clientAddr),
zap.Int("uncompsize", int(uncompSize)),
zap.Int("otlp-pdata-size", int(prevAcquired)),
)
} else if diff < 0 {
// proto.Size() on compressed request was greater than pdata uncompressed size.
r.telemetry.Logger.Debug("uncompressed size is less than compressed size",
zap.Int("uncompressed", int(uncompSize)),
zap.Int("compressed", int(prevAcquired)),
)
}

if diff < 0 {
// If the difference is negative, release the overage.
if err := r.boundedQueue.Release(-diff); err != nil {
return 0, err
}
} else {
// Release previously acquired bytes to prevent deadlock and
// reacquire the uncompressed size we just calculated.
if err := r.boundedQueue.Release(prevAcquired); err != nil {
return 0, err
}
if err := r.boundedQueue.Acquire(ctx, uncompSize); err != nil {
return 0, err
}
}
return uncompSize, nil
}
80 changes: 21 additions & 59 deletions receiver/otelarrowreceiver/internal/arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -408,98 +407,61 @@ func requireExhaustedStatus(t *testing.T, err error) {
requireStatus(t, codes.ResourceExhausted, err)
}

func requireInvalidArgumentStatus(t *testing.T, err error) {
requireStatus(t, codes.InvalidArgument, err)
}

func requireStatus(t *testing.T, code codes.Code, err error) {
require.Error(t, err)
status, ok := status.FromError(err)
require.True(t, ok, "is status-wrapped %v", err)
require.Equal(t, code, status.Code())
}

func TestBoundedQueueWithPdataHeaders(t *testing.T) {
func TestBoundedQueueLimits(t *testing.T) {
var sizer ptrace.ProtoMarshaler
stdTesting := otelAssert.NewStdUnitTest(t)
pdataSizeTenTraces := sizer.TracesSize(testdata.GenerateTraces(10))
defaultBoundedQueueLimit := int64(100000)
td := testdata.GenerateTraces(10)
tdSize := int64(sizer.TracesSize(td))

tests := []struct {
name string
numTraces int
includePdataHeader bool
pdataSize string
rejected bool
name string
admitLimit int64
expectErr bool
}{
{
name: "no header compressed greater than uncompressed",
numTraces: 10,
},
{
name: "no header compressed less than uncompressed",
numTraces: 100,
},
{
name: "pdata header less than uncompressedSize",
numTraces: 10,
pdataSize: strconv.Itoa(pdataSizeTenTraces / 2),
includePdataHeader: true,
},
{
name: "pdata header equal uncompressedSize",
numTraces: 10,
pdataSize: strconv.Itoa(pdataSizeTenTraces),
includePdataHeader: true,
name: "admit request",
admitLimit: tdSize * 2,
expectErr: false,
},
{
name: "pdata header greater than uncompressedSize",
numTraces: 10,
pdataSize: strconv.Itoa(pdataSizeTenTraces * 2),
includePdataHeader: true,
},
{
name: "no header compressed accepted uncompressed rejected",
numTraces: 100,
rejected: true,
},
{
name: "pdata header accepted uncompressed rejected",
numTraces: 100,
rejected: true,
pdataSize: strconv.Itoa(pdataSizeTenTraces),
includePdataHeader: true,
name: "reject request",
admitLimit: tdSize / 2,
expectErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tc := newHealthyTestChannel(t)
ctc := newCommonTestCase(t, tc)

td := testdata.GenerateTraces(tt.numTraces)
batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td)
require.NoError(t, err)
if tt.includePdataHeader {
var hpb bytes.Buffer
hpe := hpack.NewEncoder(&hpb)
err = hpe.WriteField(hpack.HeaderField{
Name: "otlp-pdata-size",
Value: tt.pdataSize,
})
assert.NoError(t, err)
batch.Headers = make([]byte, hpb.Len())
copy(batch.Headers, hpb.Bytes())
}

var bq admission.Queue
if tt.rejected {
if tt.expectErr {
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0)
bq = admission.NewBoundedQueue(noopTelemetry, int64(sizer.TracesSize(td)-100), 10)
} else {
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)
bq = admission.NewBoundedQueue(noopTelemetry, defaultBoundedQueueLimit, 10)
bq = admission.NewBoundedQueue(noopTelemetry, tt.admitLimit, 10)
}

ctc.start(ctc.newRealConsumer, bq)
ctc.putBatch(batch, nil)

if tt.rejected {
requireExhaustedStatus(t, ctc.wait())
if tt.expectErr {
requireInvalidArgumentStatus(t, ctc.wait())
} else {
data := <-ctc.consume
actualTD := data.Data.(ptrace.Traces)
Expand Down

0 comments on commit 20bae43

Please sign in to comment.