Skip to content

Commit

Permalink
[otelarrowreceiver] Avoid one unnecessary span error at stream EOF (#…
Browse files Browse the repository at this point in the history
…34175)

**Description:** We have been using the OTel-Arrow components in
production and begun to monitor the component's health using span data.
There are spans coming from otelgrpc instrumentation and there are
explicit spans in both components.

This adds a new test that exercises multiple streams between the
exporter and receiver components. At the end of the test, it checks for
no span errors, with one exception discussed in
open-telemetry/opentelemetry-go-contrib#2644.
We will not modify the logic of these components to avoid stream
cancelation, we will lobby for stream cancelation not to register as
span errors.

The receiver is modified to avoid setting Span errors for EOF and
Cancelation following its existing logic for setting log severity for
the same condition. As a result, the `otel_arrow_stream_inflight` span
will not show errors for the final `Recv()` as a stream shuts down.

**Link to tracking Issue:** Part of
open-telemetry/otel-arrow#227.

**Testing:** A new test is added.
  • Loading branch information
jmacd authored Jul 22, 2024
1 parent b5f8197 commit 8f6e9c6
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 19 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-spanerrors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Eliminate one spurious span error.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34175]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion internal/otelarrow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/stretchr/testify v1.9.0
github.com/wk8/go-ordered-map/v2 v2.1.8
go.opentelemetry.io/collector/component v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/config/configgrpc v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/config/configtelemetry v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/consumer v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/exporter v0.105.1-0.20240717163034-43ed6184f9fe
Expand Down Expand Up @@ -72,7 +73,6 @@ require (
go.opentelemetry.io/collector v0.105.1-0.20240717163034-43ed6184f9fe // indirect
go.opentelemetry.io/collector/config/configauth v0.105.1-0.20240717163034-43ed6184f9fe // indirect
go.opentelemetry.io/collector/config/configcompression v1.12.1-0.20240716231837-5753a58f712b // indirect
go.opentelemetry.io/collector/config/configgrpc v0.105.1-0.20240717163034-43ed6184f9fe // indirect
go.opentelemetry.io/collector/config/confignet v0.105.1-0.20240717163034-43ed6184f9fe // indirect
go.opentelemetry.io/collector/config/configopaque v1.12.1-0.20240716231837-5753a58f712b // indirect
go.opentelemetry.io/collector/config/configretry v1.12.1-0.20240716231837-5753a58f712b // indirect
Expand Down
126 changes: 119 additions & 7 deletions internal/otelarrow/test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand All @@ -27,6 +28,9 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -57,6 +61,9 @@ type testConsumer struct {
sink consumertest.TracesSink
recvLogs *observer.ObservedLogs
expLogs *observer.ObservedLogs

recvSpans *tracetest.InMemoryExporter
expSpans *tracetest.InMemoryExporter
}

var _ consumer.Traces = &testConsumer{}
Expand All @@ -66,7 +73,7 @@ type RecvConfig = otelarrowreceiver.Config
type CfgFunc func(*ExpConfig, *RecvConfig)
type GenFunc func(int) ptrace.Traces
type MkGen func() GenFunc
type EndFunc func(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces)
type EndFunc func(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) (rops, eops map[string]int)
type ConsumerErrFunc func(t *testing.T, err error)

func (*testConsumer) Capabilities() consumer.Capabilities {
Expand All @@ -78,18 +85,21 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err
return tc.sink.ConsumeTraces(ctx, td)
}

func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs) {
func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) {
tset := componenttest.NewNopTelemetrySettings()

core, obslogs := observer.New(zapcore.InfoLevel)

exp := tracetest.NewInMemoryExporter()

// Note: if you want to see these logs in development, use:
// tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core()))
// Also see failureMemoryLimitEnding() for explicit tests based on the
// logs observer.
tset.Logger = zap.New(core)
tset.TracerProvider = trace.NewTracerProvider(trace.WithSyncer(exp))

return tset, obslogs
return tset, obslogs, exp
}

func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) {
Expand Down Expand Up @@ -119,12 +129,15 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces
cfgF(exporterCfg, receiverCfg)
}

expTset, expLogs := testLoggerSettings(t)
recvTset, recvLogs := testLoggerSettings(t)
expTset, expLogs, expSpans := testLoggerSettings(t)
recvTset, recvLogs, recvSpans := testLoggerSettings(t)

testCon := &testConsumer{
recvLogs: recvLogs,
expLogs: expLogs,

recvSpans: recvSpans,
expSpans: expSpans,
}

receiver, err := rfact.CreateTracesReceiver(ctx, receiver.Settings{
Expand Down Expand Up @@ -254,7 +267,7 @@ func bulkyGenFunc() MkGen {

}

func standardEnding(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) {
func standardEnding(t *testing.T, tp testParams, testCon *testConsumer, expect [][]ptrace.Traces) (rops, eops map[string]int) {
// Check for matching request count and data
require.Equal(t, tp.requestCount*tp.threadCount, testCon.sink.SpanCount())

Expand All @@ -271,6 +284,28 @@ func standardEnding(t *testing.T, tp testParams, testCon *testConsumer, expect [
}
asserter := assert.NewStdUnitTest(t)
assert.Equiv(asserter, expectJSON, receivedJSON)

rops = map[string]int{}
eops = map[string]int{}

for _, span := range testCon.expSpans.GetSpans() {
eops[fmt.Sprintf("%v/%v", span.Name, span.Status.Code)]++

// This span has a recognized span error which we can't easily fix. See
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/2644
if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" {
continue
}

require.NotEqual(t, otelcodes.Error, span.Status.Code,
"Exporter span has error: %v: %v", span.Name, span.Status.Description)
}
for _, span := range testCon.recvSpans.GetSpans() {
rops[fmt.Sprintf("%v/%v", span.Name, span.Status.Code)]++
require.NotEqual(t, otelcodes.Error, span.Status.Code,
"Receiver span has error: %v: %v", span.Name, span.Status.Description)
}
return rops, eops
}

// logSigs computes a signature of a structured log message emitted by
Expand Down Expand Up @@ -311,7 +346,7 @@ func countMemoryLimitErrors(msgs []string) (cnt int) {
return
}

func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, _ [][]ptrace.Traces) {
func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, _ [][]ptrace.Traces) (rops, eops map[string]int) {
require.Equal(t, 0, testCon.sink.SpanCount())

eSigs, eMsgs := logSigs(testCon.expLogs)
Expand All @@ -326,6 +361,8 @@ func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer,

require.Less(t, 0, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs)
require.Less(t, 0, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs)

return nil, nil
}

func consumerSuccess(t *testing.T, err error) {
Expand Down Expand Up @@ -376,3 +413,78 @@ func TestIntegrationMemoryLimited(t *testing.T) {
ecfg.TimeoutSettings.Timeout = 5 * time.Second
}, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding)
}

func multiStreamEnding(t *testing.T, p testParams, testCon *testConsumer, td [][]ptrace.Traces) (_, _ map[string]int) {
recvOps, expOps := standardEnding(t, p, testCon, td)

const streamName = "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces"

total := p.threadCount * p.requestCount

// Exporter spans:
//
// This span is the Arrow gRPC client stream. Should have no
// stream errors, > 1 streams.
expStreamsUnset := expOps[streamName+"/Unset"]
expStreamsError := expOps[streamName+"/Error"]
require.Less(t, 1, expStreamsUnset+expStreamsError)
require.Equal(t, 1, expStreamsError)

// Number of export requests: exact match. This is the
// exporterhelper's base span.
require.Equal(t, total, expOps["exporter/otelarrowexporter/traces/Unset"])

// Number of export requests: exact match. This span covers
// handling one request in the Arrow exporter.
require.Equal(t, total, expOps["otel_arrow_stream_send/Unset"])

// Receiver spans
//
// This span is the Arrow gRPC server stream, instrumented by
// otelgrpc. Because of
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/2644
// we expect either an error or unset. There should be > 1
// streams.
recvStreamsUnset := recvOps[streamName+"/Unset"]
recvStreamsError := recvOps[streamName+"/Error"]
require.Equal(t, 0, recvStreamsError)
require.Less(t, 1, recvStreamsUnset+recvStreamsError)

// For each stream, there is one Recv() span at the end that ends
// in cancelation (or EOF). So we expect total to be less than
// this span count.
require.Equal(t, total+recvStreamsUnset+recvStreamsError, recvOps["otel_arrow_stream_inflight/Unset"])

// This is in request context, the Arrow stream handling one request.
require.Equal(t, total, recvOps["otel_arrow_stream_recv/Unset"])

// This is in request context, the receiverhelper's per-request span.
require.Equal(t, total, recvOps["receiver/otelarrowreceiver/TraceDataReceived/Unset"])

// Exporter and Receiver stream span counts match:
require.Equal(t, expStreamsUnset+expStreamsError, recvStreamsUnset+recvStreamsError)

return recvOps, expOps
}

func TestIntegrationSelfTracing(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

params := memoryLimitParams
params.requestCount = 1000
testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) {
rcfg.Arrow.MemoryLimitMiB = 1
rcfg.Protocols.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionAge: time.Second,
MaxConnectionAgeGrace: 5 * time.Second,
},
}

ecfg.Arrow.NumStreams = 1
ecfg.Arrow.MaxStreamLifetime = 2 * time.Second
ecfg.TimeoutSettings.Timeout = 1 * time.Second

}, func() GenFunc { return makeTestTraces }, consumerSuccess, multiStreamEnding)
}
23 changes: 17 additions & 6 deletions receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,8 @@ func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]strin
}

// logStreamError decides how to log an error.
func (r *Receiver) logStreamError(err error, where string) {
func (r *Receiver) logStreamError(err error, where string) (occode otelcodes.Code, msg string) {
var code codes.Code
var msg string
// gRPC tends to supply status-wrapped errors, so we always
// unpack them. A wrapped Canceled code indicates intentional
// shutdown, which can be due to normal causes (EOF, e.g.,
Expand All @@ -286,10 +285,14 @@ func (r *Receiver) logStreamError(err error, where string) {
}

if code == codes.Canceled {
occode = otelcodes.Unset
r.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where))
} else {
occode = otelcodes.Error
r.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where))
}

return occode, msg
}

func gRPCName(desc grpc.ServiceDesc) string {
Expand Down Expand Up @@ -458,8 +461,8 @@ func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) {

if retErr != nil {
// logStreamError because this response will break the stream.
id.logStreamError(retErr, "recv")
id.span.SetStatus(otelcodes.Error, retErr.Error())
occode, msg := id.logStreamError(retErr, "recv")
id.span.SetStatus(occode, msg)
}

id.anyDone(ctx)
Expand Down Expand Up @@ -550,8 +553,16 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
if err != nil {
if errors.Is(err, io.EOF) {
return err

} else if errors.Is(err, context.Canceled) {
return status.Error(codes.Canceled, "server stream shutdown")
// This is a special case to avoid introducing a span error
// for a canceled operation.
return io.EOF

} else if status, ok := status.FromError(err); ok && status.Code() == codes.Canceled {
// This is a special case to avoid introducing a span error
// for a canceled operation.
return io.EOF
}
// Note: err is directly from gRPC, should already have status.
return err
Expand Down Expand Up @@ -700,7 +711,7 @@ func (r *receiverStream) sendOne(serverStream anyStreamServer, resp batchResp) e

if err := serverStream.Send(bs); err != nil {
// logStreamError because this response will break the stream.
r.logStreamError(err, "send")
_, _ = r.logStreamError(err, "send")
return err
}

Expand Down
6 changes: 1 addition & 5 deletions receiver/otelarrowreceiver/otelarrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,7 @@ func TestOTelArrowShutdown(t *testing.T) {
}
}
}
if cooperative {
assert.Equal(t, "EOF", shutdownCause)
} else {
assert.Equal(t, "context canceled", shutdownCause)
}
assert.Equal(t, "EOF", shutdownCause)
})
}
}
Expand Down

0 comments on commit 8f6e9c6

Please sign in to comment.