diff --git a/receiver/otlpreceiver/internal/arrow/arrow.go b/receiver/otlpreceiver/internal/arrow/arrow.go index 80cf9286c..d917eeefd 100644 --- a/receiver/otlpreceiver/internal/arrow/arrow.go +++ b/receiver/otlpreceiver/internal/arrow/arrow.go @@ -141,15 +141,28 @@ func (h *headerReceiver) combineHeaders(ctx context.Context, hdrsBytes []byte) ( h.tmpHdrs = map[string][]string{} - for k, v := range h.streamHdrs { - h.tmpHdrs[k] = v - } - // Write calls the emitFunc, appending directly into `tmpHrs`. if _, err := h.decoder.Write(hdrsBytes); err != nil { return ctx, err } + // Add streamHdrs that were not carried in the per-request headers. + for k, v := range h.streamHdrs { + // Note: This is done after the per-request metadata is defined + // in recognition of a potential for duplicated values stemming + // from the Arrow exporter's independent call to the Auth + // extension's GetRequestMetadata(). This paired with the + // headersetter's return of empty-string values means, we would + // end up with an empty-string element for any headersetter + // `from_context` rules b/c the stream uses background context. + // This allows static headers through. + // + // See https://github.com/open-telemetry/opentelemetry-collector/issues/6965 + if _, ok := h.tmpHdrs[k]; !ok { + h.tmpHdrs[k] = v + } + } + // Release the temporary copy. newHdrs := h.tmpHdrs h.tmpHdrs = nil diff --git a/receiver/otlpreceiver/internal/arrow/arrow_test.go b/receiver/otlpreceiver/internal/arrow/arrow_test.go index 62757a57f..55438e47f 100644 --- a/receiver/otlpreceiver/internal/arrow/arrow_test.go +++ b/receiver/otlpreceiver/internal/arrow/arrow_test.go @@ -791,3 +791,49 @@ func TestHeaderReceiverBothMetadata(t *testing.T) { requireContainsAll(t, client.FromContext(cc).Metadata, expect) } } + +func TestHeaderReceiverDuplicateMetadata(t *testing.T) { + expectStream := map[string][]string{ + "K": {"k1", "k2"}, + + // "M" value does not appear b/c the same header + // appears in per-request metadata. + "M": {""}, + } + expectRequest := map[string][]string{ + "L": {"l1"}, + "M": {"m1", "m2"}, + } + expectCombined := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + "M": {"m1", "m2"}, + } + + var hpb bytes.Buffer + + hpe := hpack.NewEncoder(&hpb) + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(expectStream)) + + h := newHeaderReceiver(ctx, true) + + for i := 0; i < 3; i++ { + hpb.Reset() + + for key, vals := range expectRequest { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: key, + Value: val, + }) + require.NoError(t, err) + } + } + + cc, err := h.combineHeaders(ctx, hpb.Bytes()) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(cc).Metadata, expectCombined) + } +}