Skip to content
This repository has been archived by the owner on Nov 7, 2023. It is now read-only.

Support decoding optional headers in the OTLP-Arrow receiver #34

Merged
merged 9 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions receiver/otlpreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module go.opentelemetry.io/collector/receiver/otlpreceiver
go 1.18

require (
github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf
github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.8.1
Expand All @@ -14,13 +14,14 @@ require (
go.opentelemetry.io/collector/pdata v1.0.0-rc2
go.opentelemetry.io/collector/semconv v0.68.0
go.uber.org/zap v1.24.0
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c
golang.org/x/net v0.1.0
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
)

require (
cloud.google.com/go/compute/metadata v0.2.0 // indirect
cloud.google.com/go/compute v1.14.0 // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
Expand Down Expand Up @@ -76,7 +77,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/tools v0.2.0 // indirect
Expand Down
17 changes: 10 additions & 7 deletions receiver/otlpreceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKV
cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs=
cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/compute/metadata v0.2.0 h1:nBbNSZyDpkNlo3DepaaLKVuO7ClyifSAmNloSCZrHnQ=
cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
cloud.google.com/go/compute v1.14.0 h1:hfm2+FfxVmnRlh6LpB7cg1ZNU+5edAHmW679JePztk0=
cloud.google.com/go/compute v1.14.0/go.mod h1:YfLtxrj9sU4Yxv+sXzZkyPjEyPBZfXHUvjxega5vAdo=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
Expand Down Expand Up @@ -108,8 +110,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf h1:YscUclI5muwYyyjre4X/knO91mmNXbwRiWjIvMcUMjQ=
github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf/go.mod h1:K26rqsOr5x+2IHL4Nmc39whwHhmnd1D51R3Cx0k/Vp4=
github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34 h1:bddBC3wltfFKaSEib8ZKwxH8QEqHgCXorl/aIGmATb8=
github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34/go.mod h1:r8GVfuga2F/a3FCY+r4IGOM4TLK5w2TiAIZ1u8KajOY=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
Expand Down Expand Up @@ -578,7 +580,8 @@ golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c h1:q3gFqPqH7NVofKo3c3yETAP//pPI+G5mvB7qqj1Y5kY=
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk=
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -787,8 +790,8 @@ google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U=
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c h1:QgY/XxIAIeccR+Ca/rDdKubLIU9rcJ3xfy1DC/Wd2Oo=
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c/go.mod h1:CGI5F/G+E5bKwmfYo09AXuVN4dD894kIKUFmVbP2/Fo=
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd h1:OjndDrsik+Gt+e6fs45z9AxiewiKyLKYpA45W5Kpkks=
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
Expand Down
63 changes: 58 additions & 5 deletions receiver/otlpreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ import (
arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1"
arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record"
"go.uber.org/zap"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/obsreport"
)

const (
receiverTransport = "otlp-arrow"
receiverTransport = "otlp-arrow"
hpackMaxDynamicSize = 4096
hpackMaxStringSize = 4096
)

var (
Expand All @@ -51,6 +57,7 @@ type Receiver struct {

telemetry component.TelemetrySettings
obsrecv *obsreport.Receiver
gsettings *configgrpc.GRPCServerSettings
newConsumer func() arrowRecord.ConsumerAPI
}

Expand All @@ -59,6 +66,7 @@ func New(
id component.ID,
cs Consumers,
set component.ReceiverCreateSettings,
gsettings *configgrpc.GRPCServerSettings,
newConsumer func() arrowRecord.ConsumerAPI,
) (*Receiver, error) {
obs, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
Expand All @@ -74,23 +82,43 @@ func New(
obsrecv: obs,
telemetry: set.TelemetrySettings,
newConsumer: newConsumer,
gsettings: gsettings,
}, nil
}

func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStreamServer) error {
ctx := serverStream.Context()
streamCtx := serverStream.Context()
ac := r.newConsumer()

// hdrs is re-calculated for each request by resetting to a copy
// of the static incoming context.
var hdrs map[string][]string
resetHdrs := func() {
if md, ok := metadata.FromIncomingContext(serverStream.Context()); ok {
hdrs = map[string][]string(md)
} else {
hdrs = map[string][]string{}
}
}

// resetHdrs will be called before hp.Write()
hp := hpack.NewDecoder(hpackMaxDynamicSize, func(hf hpack.HeaderField) {
hdrs[hf.Name] = append(hdrs[hf.Name], hf.Value)
})
hp.SetMaxStringLength(hpackMaxStringSize)
defer func() {
if err := ac.Close(); err != nil {
r.telemetry.Logger.Error("arrow stream close", zap.Error(err))
}
}()

connInfo := client.FromContext(streamCtx)

for {
// See if the context has been canceled.
select {
case <-ctx.Done():
return ctx.Err()
case <-streamCtx.Done():
return streamCtx.Err()
default:
}

Expand All @@ -100,9 +128,34 @@ func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStre
return err
}

// Check for optional headers and set the incoming context.
thisCtx := streamCtx

// When configured, setup metadata.
if r.gsettings.IncludeMetadata {
// Copy the incoming context. Note we could avoid an allocation
// if req.Headers is empty, as then we would need one copy of the
// map, instead of one per request.
resetHdrs()

if hdrsBytes := req.GetHeaders(); len(hdrsBytes) != 0 {
// Write calls the emitFunc, appending directly into `hdrs`.
hp.Write(hdrsBytes)
}

// Retain the Addr/Auth of the stream
// connection, update the per-request metadata
// from the Arrow batch.
thisCtx = client.NewContext(thisCtx, client.Info{
Addr: connInfo.Addr,
Auth: connInfo.Auth,
Metadata: client.NewMetadata(hdrs),
})
}

// Process records: an error in this code path does
// not necessarily break the stream.
err = r.processRecords(ctx, ac, req)
err = r.processRecords(thisCtx, ac, req)

// Note: Statuses can be batched: TODO: should we?
resp := &arrowpb.BatchStatus{}
Expand Down
Loading