From e3533c2b069178762c92ccea2437892e8f682cf0 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 5 Jun 2024 08:50:24 -0700 Subject: [PATCH] Call IPCReader.Err() after reader loop (#215) Part of #210. This updates memory-limit error handling because, prior to this fix, there were two fallbacks. (1) the use of os.Stderr to print the message that we had lost (2) a test for the expected number of records to match The first is now removed (i.e. no printing to os.Stderr). The second is now an internal error. The consumer is expected to test for the memory limit error explicitly (e.g., and handle it as ResourceExhausted). A new function is added to do this (NewLimitErrorFromError) that parses the message using a regexp. An upstream issue report and PR will be filed with Arrow-Go to overcome this in the future. See https://github.com/apache/arrow/pull/41989. --- pkg/otel/arrow_record/consumer.go | 35 +++++++++++++++++------ pkg/otel/common/arrow/allocator.go | 37 +++++++++++++++++++------ pkg/otel/common/arrow/allocator_test.go | 22 +++++++++++++-- 3 files changed, 75 insertions(+), 19 deletions(-) diff --git a/pkg/otel/arrow_record/consumer.go b/pkg/otel/arrow_record/consumer.go index 8bb67172..62eff883 100644 --- a/pkg/otel/arrow_record/consumer.go +++ b/pkg/otel/arrow_record/consumer.go @@ -17,6 +17,7 @@ package arrow_record import ( "bytes" "context" + "errors" "fmt" "log" "math/rand" @@ -60,9 +61,12 @@ type ConsumerAPI interface { var _ ConsumerAPI = &Consumer{} -var ErrConsumerMemoryLimit = fmt.Errorf( - "The number of decoded records is smaller than the number of received payloads. " + - "Please increase the memory limit of the consumer.") +// ErrConsumerMemoryLimit is used by calling code to check +// errors.Is(err, ErrConsumerMemoryLimit). It is never returned. +var ErrConsumerMemoryLimit error = common.LimitError{} + +var errConsumerInternalError = errors.New( + "internal error: number of decoded records is smaller than the number of received payloads") // Consumer is a BatchArrowRecords consumer. type Consumer struct { @@ -311,12 +315,15 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, // Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage. // Note: the records wrapped in the RecordMessage must be released after use by the caller. -func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) { +func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) (ibes []*record_message.RecordMessage, retErr error) { ctx := context.Background() - var ibes []*record_message.RecordMessage defer func() { c.recordsCounter.Add(ctx, int64(len(ibes)), c.metricOpts()...) + if retErr != nil { + releaseRecords(ibes) + ibes = nil + } }() // Transform each individual OtlpArrowPayload into RecordMessage @@ -356,8 +363,7 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R ipc.WithZstd(), ) if err != nil { - releaseRecords(ibes) - return nil, werror.Wrap(err) + return ibes, werror.Wrap(distinguishMemoryError(err)) } sc.ipcReader = ipcReader } @@ -370,16 +376,27 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R rec.Retain() ibes = append(ibes, record_message.NewRecordMessage(bar.BatchId, payload.GetType(), rec)) } + + if err := sc.ipcReader.Err(); err != nil { + return ibes, werror.Wrap(distinguishMemoryError(err)) + } } if len(ibes) < len(bar.ArrowPayloads) { - releaseRecords(ibes) - return nil, ErrConsumerMemoryLimit + return ibes, werror.Wrap(errConsumerInternalError) } return ibes, nil } +func distinguishMemoryError(err error) error { + limErr, ok := common.NewLimitErrorFromError(err) + if ok { + return limErr + } + return err +} + type runtimeChecker struct{} var _ memory.TestingT = &runtimeChecker{} diff --git a/pkg/otel/common/arrow/allocator.go b/pkg/otel/common/arrow/allocator.go index dac4e71b..6ac7d304 100644 --- a/pkg/otel/common/arrow/allocator.go +++ b/pkg/otel/common/arrow/allocator.go @@ -16,7 +16,8 @@ package arrow import ( "fmt" - "os" + "regexp" + "strconv" "github.com/apache/arrow/go/v14/arrow/memory" ) @@ -44,8 +45,34 @@ type LimitError struct { var _ error = LimitError{} +var limitRegexp = regexp.MustCompile(`memory limit exceeded: requested (\d+) out of (\d+) \(in-use=(\d+)\)`) + +// NewLimitErrorFromError extracts a formatted limit error. +// +// Note: the arrow/go package (as of v16) has a panic recovery +// mechanism which formats the error object raised through panic in +// the code below. The formatting uses a "%v" which means we lose the +// error wrapping facility that would let us easily extract the +// object. Therefore, we use a regexp to unpack memory limit errors. +func NewLimitErrorFromError(err error) (error, bool) { + matches := limitRegexp.FindStringSubmatch(err.Error()) + if len(matches) != 4 { + return err, false + } + + req, _ := strconv.ParseUint(matches[1], 10, 64) + lim, _ := strconv.ParseUint(matches[2], 10, 64) + inuse, _ := strconv.ParseUint(matches[3], 10, 64) + + return LimitError{ + Request: req, + Inuse: inuse, + Limit: lim, + }, true +} + func (le LimitError) Error() string { - return fmt.Sprintf("allocation size %d exceeds limit %d (in-use=%d)", le.Request, le.Limit, le.Inuse) + return fmt.Sprintf("memory limit exceeded: requested %d out of %d (in-use=%d)", le.Request, le.Limit, le.Inuse) } func (_ LimitError) Is(tgt error) bool { @@ -65,9 +92,6 @@ func (l *LimitedAllocator) Allocate(size int) []byte { Inuse: l.inuse, Limit: l.limit, } - // Write the error to stderr so that it is visible even if the - // panic is caught. - os.Stderr.WriteString(err.Error() + "\n") panic(err) } @@ -86,9 +110,6 @@ func (l *LimitedAllocator) Reallocate(size int, b []byte) []byte { Inuse: l.inuse, Limit: l.limit, } - // Write the error to stderr so that it is visible even if the - // panic is caught. - os.Stderr.WriteString(err.Error() + "\n") panic(err) } diff --git a/pkg/otel/common/arrow/allocator_test.go b/pkg/otel/common/arrow/allocator_test.go index cc28564f..4362bf26 100644 --- a/pkg/otel/common/arrow/allocator_test.go +++ b/pkg/otel/common/arrow/allocator_test.go @@ -16,13 +16,14 @@ package arrow import ( "errors" + "fmt" "testing" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/stretchr/testify/require" ) -func TestLimitedAllocator(t *testing.T) { +func TestLimitedAllocatorUnformatted(t *testing.T) { const boundary = 1000000 check := memory.NewCheckedAllocator(memory.NewGoAllocator()) limit := NewLimitedAllocator(check, boundary) @@ -47,9 +48,26 @@ func TestLimitedAllocator(t *testing.T) { }() require.NotNil(t, capture) require.True(t, errors.Is(capture.(error), LimitError{})) - require.Equal(t, "allocation size 1 exceeds limit 1000000 (in-use=1000000)", capture.(error).Error()) + require.Equal(t, "memory limit exceeded: requested 1 out of 1000000 (in-use=1000000)", capture.(error).Error()) limit.Free(b) check.AssertSize(t, 0) } + +func TestLimitedAllocatorFormatted(t *testing.T) { + // Arrow does not wrap the error, so the consumer sees its + // formatted version. + expect := LimitError{ + Request: 1000, + Inuse: 9900, + Limit: 10000, + } + + unwrap, ok := NewLimitErrorFromError(fmt.Errorf("some sort of prefix %v some sort of suffix", expect)) + require.Error(t, unwrap) + require.True(t, ok) + require.Equal(t, expect, unwrap) + + require.True(t, errors.Is(unwrap, LimitError{})) +}