diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 60659cd6ee0..2bde57cdca0 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/spf13/viper" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -37,6 +38,11 @@ const ( protoGRPC = "grpc" protoHTTP = "http" protocolsFieldName = "protocols" + + defaultGRPCEndpoint = "0.0.0.0:4317" + defaultHTTPEndpoint = "0.0.0.0:4318" + legacyGRPCEndpoint = "0.0.0.0:55680" + legacyHTTPEndpoint = "0.0.0.0:55681" ) func NewFactory() component.ReceiverFactory { @@ -59,14 +65,14 @@ func createDefaultConfig() configmodels.Receiver { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: defaultGRPCEndpoint, Transport: "tcp", }, // We almost write 0 bytes, so no need to tune WriteBufferSize. ReadBufferSize: 512 * 1024, }, HTTP: &confighttp.HTTPServerSettings{ - Endpoint: "0.0.0.0:55681", + Endpoint: defaultHTTPEndpoint, }, }, } @@ -117,11 +123,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{}) // CreateTracesReceiver creates a trace receiver based on provided config. func createTraceReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TracesConsumer, ) (component.TracesReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -134,11 +140,11 @@ func createTraceReceiver( // CreateMetricsReceiver creates a metrics receiver based on provided config. func createMetricsReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -151,11 +157,11 @@ func createMetricsReceiver( // CreateLogReceiver creates a log receiver based on provided config. func createLogReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.LogsConsumer, ) (component.LogsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -165,7 +171,7 @@ func createLogReceiver( return r, nil } -func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { +func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) { rCfg := cfg.(*Config) // There must be one receiver for both metrics and traces. We maintain a map of @@ -176,7 +182,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { if !ok { var err error // We don't have a receiver, so create one. - receiver, err = newOtlpReceiver(rCfg) + receiver, err = newOtlpReceiver(rCfg, logger) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 0b8d8e66077..b5322cee25a 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -22,10 +22,12 @@ import ( "sync" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" + "go.uber.org/zap" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" @@ -49,14 +51,17 @@ type otlpReceiver struct { stopOnce sync.Once startServerOnce sync.Once + + logger *zap.Logger } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { +func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) { r := &otlpReceiver{ - cfg: cfg, + cfg: cfg, + logger: logger, } if cfg.GRPC != nil { opts, err := cfg.GRPC.ToServerOption() @@ -84,6 +89,38 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { return r, nil } +func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error { + r.logger.Info("Starting GRPC server", zap.String("endpoint", cfg.NetAddr.Endpoint)) + var gln net.Listener + gln, err := cfg.ToListener() + if err != nil { + r.logger.Error("Failed to setup a listener", zap.Error(err)) + return err + } + go func() { + if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { + host.ReportFatalError(errGrpc) + } + }() + return nil +} + +func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error { + r.logger.Info("Starting HTTP server", zap.String("endpoint", cfg.Endpoint)) + var hln net.Listener + hln, err := r.cfg.HTTP.ToListener() + if err != nil { + r.logger.Error("Failed to setup a listener", zap.Error(err)) + return err + } + go func() { + if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { + host.ReportFatalError(errHTTP) + } + }() + return nil +} + // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { @@ -94,32 +131,41 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { var err error r.startServerOnce.Do(func() { if r.cfg.GRPC != nil { - var gln net.Listener - gln, err = r.cfg.GRPC.ToListener() + err = r.startGRPCServer(r.cfg.GRPC, host) if err != nil { return } - go func() { - if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { - host.ReportFatalError(errGrpc) + if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint { + r.logger.Info("Setting up a second GRPC listener on legacy endpoint") + + // Copy the config. + cfgLegacyGRPC := r.cfg.GRPC + // And use the legacy endpoint. + cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint + err = r.startGRPCServer(cfgLegacyGRPC, host) + if err != nil { + return } - }() + } } if r.cfg.HTTP != nil { r.serverHTTP = r.cfg.HTTP.ToServer( r.gatewayMux, confighttp.WithErrorHandler(errorHandler), ) - var hln net.Listener - hln, err = r.cfg.HTTP.ToListener() + err = r.startHTTPServer(r.cfg.HTTP, host) if err != nil { return } - go func() { - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { - host.ReportFatalError(errHTTP) + if r.cfg.HTTP.Endpoint == defaultHTTPEndpoint { + r.logger.Info("Setting up a second HTTP listener on legacy endpoint") + cfgLegacyHTTP := r.cfg.HTTP + cfgLegacyHTTP.Endpoint = legacyHTTPEndpoint + err = r.startHTTPServer(cfgLegacyHTTP, host) + if err != nil { + return } - }() + } } }) return err diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 7f5cc28f217..5fa87419b03 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -30,6 +30,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) { } // TLS is resolved during Creation of the receiver for GRPC. - _, err := createReceiver(cfg) + _, err := createReceiver(cfg, zap.NewNop()) assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer, } func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, zap.NewNop()) require.NoError(t, err) if tc != nil { params := component.ReceiverCreateParams{} diff --git a/service/internal/resources.go b/service/internal/resources.go index 3777769f2b3..9806a206ec6 100644 --- a/service/internal/resources.go +++ b/service/internal/resources.go @@ -1,17 +1,3 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - // Code generated by "esc -pkg internal -o resources.go templates/"; DO NOT EDIT. package internal @@ -227,7 +213,7 @@ var _escData = map[string]*_escFile{ name: "component_header.html", local: "templates/component_header.html", size: 156, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/1SMsQqDMBRFd7/iIq7q5lBiltKt9B8CPklQX6R1e9x/L6ZQ2vXcc65ZE3AZ0V3ztmcV PW467TnpQVZmzZp0Kfs96VJQizTjw1uyAgAXB+8C4lPmsT4fydqbdY+wCen64F0fB19iWV/yF/54X0en @@ -239,7 +225,7 @@ U3kHAAD//zT+SdCcAAAA name: "extensions_table.html", local: "templates/extensions_table.html", size: 353, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/2SQwU7DMBBE7/2KlemRNJwjxxwQHDnwB248DRbOOnK2tGD531HTQIvqk1fzZjU7Wuw2 gCb5CmjVNiaHVE2j7Tz3DT0osyIiynltqWlp8xSHMTJYntmN0bOUsgDJcg9ap3jw7HC8n7+z5y0epgU7 @@ -252,7 +238,7 @@ oxX5HeETfMGv9NPTkv4i2e6jT3HPrqE7AEui8yaECbdWkzPYUXWlaHFkg++5VR1YkJTRlt4Tdq06HVfK name: "footer.html", local: "templates/footer.html", size: 15, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/7LRT8pPqbTjstHPKMnNsQMEAAD//wEFevAPAAAA `, @@ -262,7 +248,7 @@ H4sIAAAAAAAC/7LRT8pPqbTjstHPKMnNsQMEAAD//wEFevAPAAAA name: "header.html", local: "templates/header.html", size: 467, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/5TRMU8sIRAH8P4+BY/25eC9szGGxUItLIwW11giO7uMB8wG5rxsLvfdDdnTxNhoBeFP fpnM3/y5fbzZPj/dicAp2pVph4guj52ELK0J4Hq7EkIIk4Cd8MGVCtzJPQ/rS3mOGDmCPR7Vtl1OJ6OX @@ -276,7 +262,7 @@ vuDEoocBiqjF/5RszGuV1uhFsCujl0bMC/Vz62vzZe1hY98DAAD//7qRGmLTAQAA name: "pipelines_table.html", local: "templates/pipelines_table.html", size: 1946, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/7SVwXLTMBCG7zyFxnRyIjVcU1scSpnhAMN0eAFZ2gRNlZVmJbdujd+dsWyrTp0LtL5k rOjX/tlv/8hFEJUB5sOjgTKrLCmgrXdCajzs2MeMv2OMsSLQ8DAsFJPWeCew/MSE0QcsDewDLyr+tTbm @@ -293,7 +279,7 @@ QeMmXNC4hCvdNKvQgsYtacFoGWFFxSvCNl+lu3HQFXl8JfO/AQAA//9We3KLmgcAAA== name: "properties_table.html", local: "templates/properties_table.html", size: 420, - modtime: 1594178791, + modtime: 1603991049, compressed: ` H4sIAAAAAAAC/2SRwW7DIBBE7/6KVRr1VMc5u5gfqFT11Ds2U8sqWVuwqRoR/r1yTCpb4YAEO48ZDarV MR7ezQkp1apqdaHEtA4U5OLQ7NrRW/gyTKYbuK/puNMFEVGMtB/Y4pfqho6UUr71hnvk0Qvt4XACyyw6 diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index 14af5c48cad..2bee5f6552f 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -317,6 +317,15 @@ func (ote *OTLPHTTPTraceDataSender) Start() error { return exp.Start(context.Background(), ote) } +func (ote *OTLPHTTPTraceDataSender) GenConfigYAMLStr() string { + // Note that this generates a receiver config for agent. + return ` + otlp: + protocols: + http: + endpoint: "0.0.0.0:4318"` +} + // OTLPHTTPMetricsDataSender implements MetricDataSender for OTLP/HTTP metrics exporterType. type OTLPHTTPMetricsDataSender struct { otlpHTTPDataSender @@ -447,6 +456,15 @@ func (ote *OTLPTraceDataSender) Start() error { return exp.Start(context.Background(), ote) } +func (ote *OTLPTraceDataSender) GenConfigYAMLStr() string { + // Note that this generates a receiver config for agent. + return ` + otlp: + protocols: + grpc: + endpoint: "0.0.0.0:4317"` +} + // OTLPMetricsDataSender implements MetricDataSender for OTLP metrics exporterType. type OTLPMetricsDataSender struct { otlpDataSender diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 7424d78dd3e..11449a503c3 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -67,7 +67,8 @@ func TestTrace10kSPS(t *testing.T) { }, { "OTLP", - testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + // testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPTraceDataSender(testbed.DefaultHost, 55680), testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 20, @@ -76,7 +77,7 @@ func TestTrace10kSPS(t *testing.T) { }, { "OTLP-HTTP", - testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, 55681), testbed.NewOTLPHTTPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 20,