diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index ebce1b138..fbad0703d 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -29,7 +29,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf // indirect + github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect @@ -99,7 +99,7 @@ require ( golang.org/x/text v0.4.0 // indirect golang.org/x/tools v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect - google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c // indirect + google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect google.golang.org/grpc v1.51.0 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/cmd/otelcorecol/go.sum b/cmd/otelcorecol/go.sum index fc010f859..faaabd786 100644 --- a/cmd/otelcorecol/go.sum +++ b/cmd/otelcorecol/go.sum @@ -20,7 +20,8 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= -cloud.google.com/go/compute/metadata v0.2.0 h1:nBbNSZyDpkNlo3DepaaLKVuO7ClyifSAmNloSCZrHnQ= +cloud.google.com/go/compute v1.14.0 h1:hfm2+FfxVmnRlh6LpB7cg1ZNU+5edAHmW679JePztk0= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -110,8 +111,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf h1:YscUclI5muwYyyjre4X/knO91mmNXbwRiWjIvMcUMjQ= -github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf/go.mod h1:K26rqsOr5x+2IHL4Nmc39whwHhmnd1D51R3Cx0k/Vp4= +github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34 h1:bddBC3wltfFKaSEib8ZKwxH8QEqHgCXorl/aIGmATb8= +github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34/go.mod h1:r8GVfuga2F/a3FCY+r4IGOM4TLK5w2TiAIZ1u8KajOY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -607,7 +608,7 @@ golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= -golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c h1:q3gFqPqH7NVofKo3c3yETAP//pPI+G5mvB7qqj1Y5kY= +golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -819,8 +820,8 @@ google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U= -google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c h1:QgY/XxIAIeccR+Ca/rDdKubLIU9rcJ3xfy1DC/Wd2Oo= -google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c/go.mod h1:CGI5F/G+E5bKwmfYo09AXuVN4dD894kIKUFmVbP2/Fo= +google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd h1:OjndDrsik+Gt+e6fs45z9AxiewiKyLKYpA45W5Kpkks= +google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 940922b6f..2a6a5ff9e 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -55,8 +55,7 @@ type baseExporter struct { clientConn *grpc.ClientConn metadata metadata.MD callOptions []grpc.CallOption - - settings component.ExporterCreateSettings + settings exporter.CreateSettings // Default user-agent header. userAgent string diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index d1b202ba2..54e8681cb 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -11,7 +11,7 @@ require ( go.opentelemetry.io/collector/pdata v1.0.0-rc2 go.opentelemetry.io/collector/receiver/otlpreceiver v0.68.0 go.uber.org/zap v1.24.0 - google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c + google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 ) @@ -23,7 +23,7 @@ require ( github.com/apache/thrift v0.16.0 // indirect github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf // indirect + github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/exporter/otlphttpexporter/go.sum b/exporter/otlphttpexporter/go.sum index 2e4197d3d..76e92d38a 100644 --- a/exporter/otlphttpexporter/go.sum +++ b/exporter/otlphttpexporter/go.sum @@ -1,7 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y= -cloud.google.com/go/compute/metadata v0.2.0 h1:nBbNSZyDpkNlo3DepaaLKVuO7ClyifSAmNloSCZrHnQ= +cloud.google.com/go/compute v1.14.0 h1:hfm2+FfxVmnRlh6LpB7cg1ZNU+5edAHmW679JePztk0= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= contrib.go.opencensus.io/exporter/prometheus v0.4.2 h1:sqfsYl5GIY/L570iT+l93ehxaWJs2/OwXtiWwew3oAg= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= @@ -73,8 +74,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf h1:YscUclI5muwYyyjre4X/knO91mmNXbwRiWjIvMcUMjQ= -github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf/go.mod h1:K26rqsOr5x+2IHL4Nmc39whwHhmnd1D51R3Cx0k/Vp4= +github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34 h1:bddBC3wltfFKaSEib8ZKwxH8QEqHgCXorl/aIGmATb8= +github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34/go.mod h1:r8GVfuga2F/a3FCY+r4IGOM4TLK5w2TiAIZ1u8KajOY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -436,8 +437,8 @@ golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -533,8 +534,8 @@ google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U= -google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c h1:QgY/XxIAIeccR+Ca/rDdKubLIU9rcJ3xfy1DC/Wd2Oo= -google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c/go.mod h1:CGI5F/G+E5bKwmfYo09AXuVN4dD894kIKUFmVbP2/Fo= +google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd h1:OjndDrsik+Gt+e6fs45z9AxiewiKyLKYpA45W5Kpkks= +google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= diff --git a/receiver/otlpreceiver/go.mod b/receiver/otlpreceiver/go.mod index af636d13b..987a88973 100644 --- a/receiver/otlpreceiver/go.mod +++ b/receiver/otlpreceiver/go.mod @@ -3,7 +3,7 @@ module go.opentelemetry.io/collector/receiver/otlpreceiver go 1.18 require ( - github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf + github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34 github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.6.0 github.com/stretchr/testify v1.8.1 @@ -14,13 +14,14 @@ require ( go.opentelemetry.io/collector/pdata v1.0.0-rc2 go.opentelemetry.io/collector/semconv v0.68.0 go.uber.org/zap v1.24.0 - google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c + golang.org/x/net v0.1.0 + google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 ) require ( - cloud.google.com/go/compute/metadata v0.2.0 // indirect + cloud.google.com/go/compute v1.14.0 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect github.com/andybalholm/brotli v1.0.4 // indirect github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect @@ -76,7 +77,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect golang.org/x/mod v0.6.0 // indirect - golang.org/x/net v0.1.0 // indirect + golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect golang.org/x/sys v0.3.0 // indirect golang.org/x/text v0.4.0 // indirect golang.org/x/tools v0.2.0 // indirect diff --git a/receiver/otlpreceiver/go.sum b/receiver/otlpreceiver/go.sum index 6f2ff657c..f2cd550c0 100644 --- a/receiver/otlpreceiver/go.sum +++ b/receiver/otlpreceiver/go.sum @@ -13,14 +13,16 @@ cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKV cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= +cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= -cloud.google.com/go/compute/metadata v0.2.0 h1:nBbNSZyDpkNlo3DepaaLKVuO7ClyifSAmNloSCZrHnQ= -cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +cloud.google.com/go/compute v1.14.0 h1:hfm2+FfxVmnRlh6LpB7cg1ZNU+5edAHmW679JePztk0= +cloud.google.com/go/compute v1.14.0/go.mod h1:YfLtxrj9sU4Yxv+sXzZkyPjEyPBZfXHUvjxega5vAdo= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -108,8 +110,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf h1:YscUclI5muwYyyjre4X/knO91mmNXbwRiWjIvMcUMjQ= -github.com/f5/otel-arrow-adapter v0.0.0-20221209234406-0e3c0d4657bf/go.mod h1:K26rqsOr5x+2IHL4Nmc39whwHhmnd1D51R3Cx0k/Vp4= +github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34 h1:bddBC3wltfFKaSEib8ZKwxH8QEqHgCXorl/aIGmATb8= +github.com/f5/otel-arrow-adapter v0.0.0-20230112193639-b13b25a63e34/go.mod h1:r8GVfuga2F/a3FCY+r4IGOM4TLK5w2TiAIZ1u8KajOY= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -578,7 +580,8 @@ golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= -golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c h1:q3gFqPqH7NVofKo3c3yETAP//pPI+G5mvB7qqj1Y5kY= +golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= +golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -787,8 +790,8 @@ google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U= -google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c h1:QgY/XxIAIeccR+Ca/rDdKubLIU9rcJ3xfy1DC/Wd2Oo= -google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c/go.mod h1:CGI5F/G+E5bKwmfYo09AXuVN4dD894kIKUFmVbP2/Fo= +google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd h1:OjndDrsik+Gt+e6fs45z9AxiewiKyLKYpA45W5Kpkks= +google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= diff --git a/receiver/otlpreceiver/internal/arrow/arrow.go b/receiver/otlpreceiver/internal/arrow/arrow.go index 40b86a1d5..80cf9286c 100644 --- a/receiver/otlpreceiver/internal/arrow/arrow.go +++ b/receiver/otlpreceiver/internal/arrow/arrow.go @@ -21,15 +21,21 @@ import ( arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1" arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" "go.uber.org/zap" + "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/metadata" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/receiver" ) const ( - receiverTransport = "otlp-arrow" + receiverTransport = "otlp-arrow" + hpackMaxDynamicSize = 4096 ) var ( @@ -51,6 +57,7 @@ type Receiver struct { telemetry component.TelemetrySettings obsrecv *obsreport.Receiver + gsettings *configgrpc.GRPCServerSettings newConsumer func() arrowRecord.ConsumerAPI } @@ -58,7 +65,8 @@ type Receiver struct { func New( id component.ID, cs Consumers, - set component.ReceiverCreateSettings, + set receiver.CreateSettings, + gsettings *configgrpc.GRPCServerSettings, newConsumer func() arrowRecord.ConsumerAPI, ) (*Receiver, error) { obs, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ @@ -74,12 +82,101 @@ func New( obsrecv: obs, telemetry: set.TelemetrySettings, newConsumer: newConsumer, + gsettings: gsettings, }, nil } +// headerReceiver contains the state necessary to decode per-request metadata +// from an arrow stream. +type headerReceiver struct { + // decoder maintains state across the stream. + decoder *hpack.Decoder + + // client connection info from the stream context, to be extended + // with per-request metadata. + connInfo client.Info + + // streamHdrs was translated from the incoming context, will be + // merged with per-request metadata. Note that the contents of + // this map are equivalent to connInfo.Metadata, however that + // library does not let us iterate over the map so we recalculate + // this from the gRPC incoming stream context. + streamHdrs map[string][]string + + // tmpHdrs is used by the decoder's emit function during Write. + tmpHdrs map[string][]string +} + +func newHeaderReceiver(streamCtx context.Context, includeMetadata bool) *headerReceiver { + if !includeMetadata { + return nil + } + hr := &headerReceiver{ + connInfo: client.FromContext(streamCtx), + } + + if smd, ok := metadata.FromIncomingContext(streamCtx); ok { + hr.streamHdrs = smd + } + + // Note the hpack decoder supports additional protections, + // such as SetMaxStringLength(), but as we already have limits + // on stream request size, this seems unnecessary. + hr.decoder = hpack.NewDecoder(hpackMaxDynamicSize, hr.tmpHdrsAppend) + + return hr +} + +// combineHeaders calculates per-request Metadata by combining the stream's +// client.Info with additional key:values associated with the arrow batch. +// This is safe to call when h is nil. +func (h *headerReceiver) combineHeaders(ctx context.Context, hdrsBytes []byte) (context.Context, error) { + if h == nil || (len(hdrsBytes) == 0 && len(h.streamHdrs) == 0) { + return ctx, nil + } + + if len(hdrsBytes) == 0 { + return h.newContext(ctx, h.streamHdrs), nil + } + + h.tmpHdrs = map[string][]string{} + + for k, v := range h.streamHdrs { + h.tmpHdrs[k] = v + } + + // Write calls the emitFunc, appending directly into `tmpHrs`. + if _, err := h.decoder.Write(hdrsBytes); err != nil { + return ctx, err + } + + // Release the temporary copy. + newHdrs := h.tmpHdrs + h.tmpHdrs = nil + + return h.newContext(ctx, newHdrs), nil +} + +// tmpHdrsAppend appends to tmpHdrs, from decoder's emit function. +func (h *headerReceiver) tmpHdrsAppend(hf hpack.HeaderField) { + h.tmpHdrs[hf.Name] = append(h.tmpHdrs[hf.Name], hf.Value) +} + +func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]string) context.Context { + // Retain the Addr/Auth of the stream connection, update the + // per-request metadata from the Arrow batch. + return client.NewContext(ctx, client.Info{ + Addr: h.connInfo.Addr, + Auth: h.connInfo.Auth, + Metadata: client.NewMetadata(hdrs), + }) +} + func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStreamServer) error { - ctx := serverStream.Context() + streamCtx := serverStream.Context() ac := r.newConsumer() + hrcv := newHeaderReceiver(serverStream.Context(), r.gsettings.IncludeMetadata) + defer func() { if err := ac.Close(); err != nil { r.telemetry.Logger.Error("arrow stream close", zap.Error(err)) @@ -89,20 +186,28 @@ func (r *Receiver) ArrowStream(serverStream arrowpb.ArrowStreamService_ArrowStre for { // See if the context has been canceled. select { - case <-ctx.Done(): - return ctx.Err() + case <-streamCtx.Done(): + return streamCtx.Err() default: } - // Receive a batch: + // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, + // or plog.Logs item. req, err := serverStream.Recv() if err != nil { return err } + // Check for optional headers and set the incoming context. + thisCtx, err := hrcv.combineHeaders(streamCtx, req.GetHeaders()) + if err != nil { + // Failing to parse the incoming headers breaks the stream. + return err + } + // Process records: an error in this code path does // not necessarily break the stream. - err = r.processRecords(ctx, ac, req) + err = r.processRecords(thisCtx, ac, req) // Note: Statuses can be batched: TODO: should we? resp := &arrowpb.BatchStatus{} diff --git a/receiver/otlpreceiver/internal/arrow/arrow_test.go b/receiver/otlpreceiver/internal/arrow/arrow_test.go index 49f56b542..62757a57f 100644 --- a/receiver/otlpreceiver/internal/arrow/arrow_test.go +++ b/receiver/otlpreceiver/internal/arrow/arrow_test.go @@ -15,6 +15,7 @@ package arrow import ( + "bytes" "context" "encoding/json" "errors" @@ -26,20 +27,24 @@ import ( arrowCollectorMock "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1/mock" arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" arrowRecordMock "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record/mock" + otelAssert "github.com/f5/otel-arrow-adapter/pkg/otel/assert" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/metadata" - otelAssert "github.com/f5/otel-arrow-adapter/pkg/otel/assert" - + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/arrow/mock" ) @@ -62,6 +67,11 @@ func (c compareJSONLogs) MarshalJSON() ([]byte, error) { return m.MarshalLogs(c.Logs) } +type consumeResult struct { + Ctx context.Context + Data interface{} +} + type commonTestCase struct { ctrl *gomock.Controller cancel context.CancelFunc @@ -69,7 +79,7 @@ type commonTestCase struct { consumers mockConsumers stream *arrowCollectorMock.MockArrowStreamService_ArrowStreamServer receive chan recvResult - consume chan interface{} + consume chan consumeResult streamErr chan error testProducer *arrowRecord.Producer @@ -139,7 +149,10 @@ func (ctc *commonTestCase) doAndReturnGetBatch(ctx context.Context) func() (*arr func (ctc *commonTestCase) doAndReturnConsumeTraces(tc testChannel) func(ctx context.Context, traces ptrace.Traces) error { return func(ctx context.Context, traces ptrace.Traces) error { select { - case ctc.consume <- traces: + case ctc.consume <- consumeResult{ + Ctx: ctx, + Data: traces, + }: return tc.onConsume() case <-ctx.Done(): return ctx.Err() @@ -150,7 +163,10 @@ func (ctc *commonTestCase) doAndReturnConsumeTraces(tc testChannel) func(ctx con func (ctc *commonTestCase) doAndReturnConsumeMetrics(tc testChannel) func(ctx context.Context, metrics pmetric.Metrics) error { return func(ctx context.Context, metrics pmetric.Metrics) error { select { - case ctc.consume <- metrics: + case ctc.consume <- consumeResult{ + Ctx: ctx, + Data: metrics, + }: return tc.onConsume() case <-ctx.Done(): return ctx.Err() @@ -161,7 +177,10 @@ func (ctc *commonTestCase) doAndReturnConsumeMetrics(tc testChannel) func(ctx co func (ctc *commonTestCase) doAndReturnConsumeLogs(tc testChannel) func(ctx context.Context, logs plog.Logs) error { return func(ctx context.Context, logs plog.Logs) error { select { - case ctc.consume <- logs: + case ctc.consume <- consumeResult{ + Ctx: ctx, + Data: logs, + }: return tc.onConsume() case <-ctx.Done(): return ctx.Err() @@ -211,6 +230,9 @@ func newCommonTestCase(t *testing.T, tc testChannel) *commonTestCase { stream := arrowCollectorMock.NewMockArrowStreamService_ArrowStreamServer(ctrl) ctx, cancel := context.WithCancel(context.Background()) + ctx = metadata.NewIncomingContext(ctx, metadata.MD{ + "stream_ctx": []string{"per-request"}, + }) ctc := &commonTestCase{ ctrl: ctrl, @@ -219,7 +241,7 @@ func newCommonTestCase(t *testing.T, tc testChannel) *commonTestCase { consumers: newMockConsumers(ctrl), stream: stream, receive: make(chan recvResult), - consume: make(chan interface{}), + consume: make(chan consumeResult), streamErr: make(chan error), testProducer: arrowRecord.NewProducer(), ctxCall: stream.EXPECT().Context().Times(0), @@ -303,14 +325,19 @@ func (ctc *commonTestCase) newErrorConsumer() arrowRecord.ConsumerAPI { return cons } -func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI) { +func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, gopts ...func(cfg *configgrpc.GRPCServerSettings)) { + gsettings := &configgrpc.GRPCServerSettings{} + for _, gf := range gopts { + gf(gsettings) + } rcvr, err := New( component.NewID("arrowtest"), ctc.consumers, - component.ReceiverCreateSettings{ + receiver.CreateSettings{ TelemetrySettings: ctc.telset, BuildInfo: component.NewDefaultBuildInfo(), }, + gsettings, newConsumer, ) if err != nil { @@ -336,7 +363,7 @@ func TestReceiverTraces(t *testing.T) { ctc.start(ctc.newRealConsumer) ctc.putBatch(batch, nil) - assert.EqualValues(t, td, <-ctc.consume) + assert.EqualValues(t, td, (<-ctc.consume).Data) err = ctc.cancelAndWait() require.Error(t, err) @@ -356,7 +383,7 @@ func TestReceiverLogs(t *testing.T) { ctc.start(ctc.newRealConsumer) ctc.putBatch(batch, nil) - assert.EqualValues(t, []json.Marshaler{compareJSONLogs{ld}}, []json.Marshaler{compareJSONLogs{(<-ctc.consume).(plog.Logs)}}) + assert.EqualValues(t, []json.Marshaler{compareJSONLogs{ld}}, []json.Marshaler{compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}}) err = ctc.cancelAndWait() require.Error(t, err) @@ -379,7 +406,7 @@ func TestReceiverMetrics(t *testing.T) { otelAssert.Equiv(t, []json.Marshaler{ compareJSONMetrics{md}, }, []json.Marshaler{ - compareJSONMetrics{(<-ctc.consume).(pmetric.Metrics)}, + compareJSONMetrics{(<-ctc.consume).Data.(pmetric.Metrics)}, }) err = ctc.cancelAndWait() @@ -413,7 +440,7 @@ func TestReceiverSendError(t *testing.T) { ctc.start(ctc.newRealConsumer) ctc.putBatch(batch, nil) - assert.EqualValues(t, ld, <-ctc.consume) + assert.EqualValues(t, ld, (<-ctc.consume).Data) err = ctc.wait() require.Error(t, err) @@ -456,19 +483,19 @@ func TestReceiverConsumeError(t *testing.T) { otelAssert.Equiv(t, []json.Marshaler{ compareJSONTraces{input}, }, []json.Marshaler{ - compareJSONTraces{(<-ctc.consume).(ptrace.Traces)}, + compareJSONTraces{(<-ctc.consume).Data.(ptrace.Traces)}, }) case plog.Logs: otelAssert.Equiv(t, []json.Marshaler{ compareJSONLogs{input}, }, []json.Marshaler{ - compareJSONLogs{(<-ctc.consume).(plog.Logs)}, + compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}, }) case pmetric.Metrics: otelAssert.Equiv(t, []json.Marshaler{ compareJSONMetrics{input}, }, []json.Marshaler{ - compareJSONMetrics{(<-ctc.consume).(pmetric.Metrics)}, + compareJSONMetrics{(<-ctc.consume).Data.(pmetric.Metrics)}, }) } @@ -542,7 +569,7 @@ func TestReceiverEOF(t *testing.T) { }() for i := 0; i < times; i++ { - actualData = append(actualData, (<-ctc.consume).(ptrace.Traces)) + actualData = append(actualData, (<-ctc.consume).Data.(ptrace.Traces)) } assert.EqualValues(t, expectData, actualData) @@ -552,6 +579,83 @@ func TestReceiverEOF(t *testing.T) { require.True(t, errors.Is(err, io.EOF)) } +func TestReceiverHeaders(t *testing.T) { + t.Run("include", func(t *testing.T) { testReceiverHeaders(t, true) }) + t.Run("noinclude", func(t *testing.T) { testReceiverHeaders(t, false) }) +} + +func testReceiverHeaders(t *testing.T, includeMeta bool) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + expectData := []map[string][]string{ + {"k1": []string{"v1"}}, + nil, + {"k2": []string{"v2"}, "k3": []string{"v3"}}, + nil, + {"k1": []string{"v5"}}, + {"k1": []string{"v1"}, "k3": []string{"v2", "v3", "v4"}}, + nil, + } + + ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).Return(nil) + + ctc.start(ctc.newRealConsumer, func(gsettings *configgrpc.GRPCServerSettings) { + gsettings.IncludeMetadata = includeMeta + }) + + go func() { + var hpb bytes.Buffer + hpe := hpack.NewEncoder(&hpb) + + for _, md := range expectData { + td := testdata.GenerateTraces(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + if len(md) != 0 { + hpb.Reset() + for key, vals := range md { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: key, + Value: val, + }) + require.NoError(t, err) + } + } + + batch.Headers = make([]byte, hpb.Len()) + copy(batch.Headers, hpb.Bytes()) + } + ctc.putBatch(batch, nil) + } + close(ctc.receive) + }() + + for _, expect := range expectData { + info := client.FromContext((<-ctc.consume).Ctx) + + // The static stream context contains one extra variable. + if expect == nil { + expect = map[string][]string{} + } + expect["stream_ctx"] = []string{"per-request"} + + for key, vals := range expect { + if includeMeta { + require.Equal(t, vals, info.Metadata.Get(key)) + } else { + require.Equal(t, []string(nil), info.Metadata.Get(key)) + } + } + } + + err := ctc.wait() + require.Error(t, err) + require.True(t, errors.Is(err, io.EOF)) +} + func TestReceiverCancel(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) @@ -563,3 +667,127 @@ func TestReceiverCancel(t *testing.T) { require.Error(t, err) require.True(t, errors.Is(err, context.Canceled)) } + +func requireContainsAll(t *testing.T, md client.Metadata, exp map[string][]string) { + for key, vals := range exp { + require.Equal(t, vals, md.Get(key)) + } +} + +func requireContainsNone(t *testing.T, md client.Metadata, exp map[string][]string) { + for key := range exp { + require.Equal(t, []string(nil), md.Get(key)) + } +} + +func TestHeaderReceiverStreamContextOnly(t *testing.T) { + expect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + } + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(expect)) + + h := newHeaderReceiver(ctx, true) + + for i := 0; i < 3; i++ { + cc, err := h.combineHeaders(ctx, nil) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(cc).Metadata, expect) + } +} + +func TestHeaderReceiverNoIncludeMetadata(t *testing.T) { + noExpect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + } + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(noExpect)) + + h := newHeaderReceiver(ctx, false) + + for i := 0; i < 3; i++ { + cc, err := h.combineHeaders(ctx, nil) + + require.NoError(t, err) + requireContainsNone(t, client.FromContext(cc).Metadata, noExpect) + } +} + +func TestHeaderReceiverRequestNoStreamMetadata(t *testing.T) { + expect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + } + + var hpb bytes.Buffer + + hpe := hpack.NewEncoder(&hpb) + + ctx := context.Background() + + h := newHeaderReceiver(ctx, true) + + for i := 0; i < 3; i++ { + hpb.Reset() + + for key, vals := range expect { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: key, + Value: val, + }) + require.NoError(t, err) + } + } + + cc, err := h.combineHeaders(ctx, hpb.Bytes()) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(cc).Metadata, expect) + } +} + +func TestHeaderReceiverBothMetadata(t *testing.T) { + expectK := map[string][]string{ + "K": {"k1", "k2"}, + } + expectL := map[string][]string{ + "L": {"l1"}, + "M": {"m1", "m2"}, + } + expect := map[string][]string{ + "K": {"k1", "k2"}, + "L": {"l1"}, + "M": {"m1", "m2"}, + } + + var hpb bytes.Buffer + + hpe := hpack.NewEncoder(&hpb) + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD(expectK)) + + h := newHeaderReceiver(ctx, true) + + for i := 0; i < 3; i++ { + hpb.Reset() + + for key, vals := range expectL { + for _, val := range vals { + err := hpe.WriteField(hpack.HeaderField{ + Name: key, + Value: val, + }) + require.NoError(t, err) + } + } + + cc, err := h.combineHeaders(ctx, hpb.Bytes()) + + require.NoError(t, err) + requireContainsAll(t, client.FromContext(cc).Metadata, expect) + } +} diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 4d65656b9..33f888f4a 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -151,7 +151,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error { } if r.cfg.Arrow != nil && r.cfg.Arrow.Enabled { - r.arrowReceiver, err = arrow.New(r.settings.ID, arrow.Consumers(r), r.settings, func() arrowRecord.ConsumerAPI { + r.arrowReceiver, err = arrow.New(r.settings.ID, arrow.Consumers(r), r.settings, r.cfg.GRPC, func() arrowRecord.ConsumerAPI { return arrowRecord.NewConsumer() }) if err != nil { diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index fa308ff57..e794298a4 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -32,13 +32,16 @@ import ( arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/net/http2/hpack" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" @@ -1108,13 +1111,25 @@ func (esc *errOrSinkConsumer) Reset() { } } +type tracesSinkWithMetadata struct { + consumertest.TracesSink + MDs []client.Metadata +} + +func (ts *tracesSinkWithMetadata) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + info := client.FromContext(ctx) + ts.MDs = append(ts.MDs, info.Metadata) + return ts.TracesSink.ConsumeTraces(ctx, td) +} + func TestGRPCArrowReceiver(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - sink := new(consumertest.TracesSink) + sink := new(tracesSinkWithMetadata) factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPC.NetAddr.Endpoint = addr + cfg.GRPC.IncludeMetadata = true cfg.HTTP = nil cfg.Arrow.Enabled = true id := component.NewID("arrow") @@ -1134,15 +1149,39 @@ func TestGRPCArrowReceiver(t *testing.T) { require.NoError(t, err) producer := arrowRecord.NewProducer() + var headerBuf bytes.Buffer + hpd := hpack.NewEncoder(&headerBuf) + var expectTraces []ptrace.Traces - // Repeatedly send traces via arrow + var expectMDs []metadata.MD + + // Repeatedly send traces via arrow. Set the expected traces + // metadata to receive. for i := 0; i < 10; i++ { td := testdata.GenerateTraces(2) expectTraces = append(expectTraces, td) + headerBuf.Reset() + err := hpd.WriteField(hpack.HeaderField{ + Name: "seq", + Value: fmt.Sprint(i), + }) + require.NoError(t, err) + err = hpd.WriteField(hpack.HeaderField{ + Name: "test", + Value: "value", + }) + require.NoError(t, err) + expectMDs = append(expectMDs, metadata.MD{ + "seq": []string{fmt.Sprint(i)}, + "test": []string{"value"}, + }) + batch, err := producer.BatchArrowRecordsFromTraces(td) require.NoError(t, err) + batch.Headers = headerBuf.Bytes() + err = stream.Send(batch) require.NoError(t, err) @@ -1157,4 +1196,13 @@ func TestGRPCArrowReceiver(t *testing.T) { require.NoError(t, ocr.Shutdown(context.Background())) assert.Equal(t, expectTraces, sink.AllTraces()) + + assert.Equal(t, len(expectMDs), len(sink.MDs)) + // gRPC adds its own metadata keys, so we check for only the + // expected ones below: + for idx := range expectMDs { + for key, vals := range expectMDs[idx] { + require.Equal(t, vals, sink.MDs[idx].Get(key), "for key %s", key) + } + } }