Skip to content

Commit

Permalink
Call IPCReader.Err() after reader loop (#215)
Browse files Browse the repository at this point in the history
Part of #210.

This updates memory-limit error handling because, prior to this fix,
there were two fallbacks.

(1) the use of os.Stderr to print the message that we had lost
(2) a test for the expected number of records to match

The first is now removed (i.e. no printing to os.Stderr). The second is
now an internal error.

The consumer is expected to test for the memory limit error explicitly
(e.g., and handle it as ResourceExhausted). A new function is added to
do this (NewLimitErrorFromError) that parses the message using a regexp.
An upstream issue report and PR will be filed with Arrow-Go to overcome
this in the future.

See apache/arrow#41989.
  • Loading branch information
jmacd authored Jun 5, 2024
1 parent 0ed688e commit e3533c2
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 19 deletions.
35 changes: 26 additions & 9 deletions pkg/otel/arrow_record/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package arrow_record
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"math/rand"
Expand Down Expand Up @@ -60,9 +61,12 @@ type ConsumerAPI interface {

var _ ConsumerAPI = &Consumer{}

var ErrConsumerMemoryLimit = fmt.Errorf(
"The number of decoded records is smaller than the number of received payloads. " +
"Please increase the memory limit of the consumer.")
// ErrConsumerMemoryLimit is used by calling code to check
// errors.Is(err, ErrConsumerMemoryLimit). It is never returned.
var ErrConsumerMemoryLimit error = common.LimitError{}

var errConsumerInternalError = errors.New(
"internal error: number of decoded records is smaller than the number of received payloads")

// Consumer is a BatchArrowRecords consumer.
type Consumer struct {
Expand Down Expand Up @@ -311,12 +315,15 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces,

// Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage.
// Note: the records wrapped in the RecordMessage must be released after use by the caller.
func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) {
func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) (ibes []*record_message.RecordMessage, retErr error) {
ctx := context.Background()

var ibes []*record_message.RecordMessage
defer func() {
c.recordsCounter.Add(ctx, int64(len(ibes)), c.metricOpts()...)
if retErr != nil {
releaseRecords(ibes)
ibes = nil
}
}()

// Transform each individual OtlpArrowPayload into RecordMessage
Expand Down Expand Up @@ -356,8 +363,7 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R
ipc.WithZstd(),
)
if err != nil {
releaseRecords(ibes)
return nil, werror.Wrap(err)
return ibes, werror.Wrap(distinguishMemoryError(err))
}
sc.ipcReader = ipcReader
}
Expand All @@ -370,16 +376,27 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R
rec.Retain()
ibes = append(ibes, record_message.NewRecordMessage(bar.BatchId, payload.GetType(), rec))
}

if err := sc.ipcReader.Err(); err != nil {
return ibes, werror.Wrap(distinguishMemoryError(err))
}
}

if len(ibes) < len(bar.ArrowPayloads) {
releaseRecords(ibes)
return nil, ErrConsumerMemoryLimit
return ibes, werror.Wrap(errConsumerInternalError)
}

return ibes, nil
}

func distinguishMemoryError(err error) error {
limErr, ok := common.NewLimitErrorFromError(err)
if ok {
return limErr
}
return err
}

type runtimeChecker struct{}

var _ memory.TestingT = &runtimeChecker{}
Expand Down
37 changes: 29 additions & 8 deletions pkg/otel/common/arrow/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ package arrow

import (
"fmt"
"os"
"regexp"
"strconv"

"github.com/apache/arrow/go/v14/arrow/memory"
)
Expand Down Expand Up @@ -44,8 +45,34 @@ type LimitError struct {

var _ error = LimitError{}

var limitRegexp = regexp.MustCompile(`memory limit exceeded: requested (\d+) out of (\d+) \(in-use=(\d+)\)`)

// NewLimitErrorFromError extracts a formatted limit error.
//
// Note: the arrow/go package (as of v16) has a panic recovery
// mechanism which formats the error object raised through panic in
// the code below. The formatting uses a "%v" which means we lose the
// error wrapping facility that would let us easily extract the
// object. Therefore, we use a regexp to unpack memory limit errors.
func NewLimitErrorFromError(err error) (error, bool) {
matches := limitRegexp.FindStringSubmatch(err.Error())
if len(matches) != 4 {
return err, false
}

req, _ := strconv.ParseUint(matches[1], 10, 64)
lim, _ := strconv.ParseUint(matches[2], 10, 64)
inuse, _ := strconv.ParseUint(matches[3], 10, 64)

return LimitError{
Request: req,
Inuse: inuse,
Limit: lim,
}, true
}

func (le LimitError) Error() string {
return fmt.Sprintf("allocation size %d exceeds limit %d (in-use=%d)", le.Request, le.Limit, le.Inuse)
return fmt.Sprintf("memory limit exceeded: requested %d out of %d (in-use=%d)", le.Request, le.Limit, le.Inuse)
}

func (_ LimitError) Is(tgt error) bool {
Expand All @@ -65,9 +92,6 @@ func (l *LimitedAllocator) Allocate(size int) []byte {
Inuse: l.inuse,
Limit: l.limit,
}
// Write the error to stderr so that it is visible even if the
// panic is caught.
os.Stderr.WriteString(err.Error() + "\n")
panic(err)
}

Expand All @@ -86,9 +110,6 @@ func (l *LimitedAllocator) Reallocate(size int, b []byte) []byte {
Inuse: l.inuse,
Limit: l.limit,
}
// Write the error to stderr so that it is visible even if the
// panic is caught.
os.Stderr.WriteString(err.Error() + "\n")
panic(err)
}

Expand Down
22 changes: 20 additions & 2 deletions pkg/otel/common/arrow/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package arrow

import (
"errors"
"fmt"
"testing"

"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/stretchr/testify/require"
)

func TestLimitedAllocator(t *testing.T) {
func TestLimitedAllocatorUnformatted(t *testing.T) {
const boundary = 1000000
check := memory.NewCheckedAllocator(memory.NewGoAllocator())
limit := NewLimitedAllocator(check, boundary)
Expand All @@ -47,9 +48,26 @@ func TestLimitedAllocator(t *testing.T) {
}()
require.NotNil(t, capture)
require.True(t, errors.Is(capture.(error), LimitError{}))
require.Equal(t, "allocation size 1 exceeds limit 1000000 (in-use=1000000)", capture.(error).Error())
require.Equal(t, "memory limit exceeded: requested 1 out of 1000000 (in-use=1000000)", capture.(error).Error())

limit.Free(b)

check.AssertSize(t, 0)
}

func TestLimitedAllocatorFormatted(t *testing.T) {
// Arrow does not wrap the error, so the consumer sees its
// formatted version.
expect := LimitError{
Request: 1000,
Inuse: 9900,
Limit: 10000,
}

unwrap, ok := NewLimitErrorFromError(fmt.Errorf("some sort of prefix %v some sort of suffix", expect))
require.Error(t, unwrap)
require.True(t, ok)
require.Equal(t, expect, unwrap)

require.True(t, errors.Is(unwrap, LimitError{}))
}

0 comments on commit e3533c2

Please sign in to comment.