Skip to content

Commit

Permalink
Change default OTLP port number
Browse files Browse the repository at this point in the history
This implements specification change open-telemetry/opentelemetry-specification#1221

To make transition to new port numbers less painful OTLP receiver will
also accept data on the legacy port numbers when it is configured to
use the default endpoint. Users who use the default Collector config
can continue sending data to the legacy ports and have a graceful period
to update their senders to start sending to the new ports.

Note that OTLP/HTTP continues using a separate port number from OTLP/gRPC.
There is separate work in progress to use one port for both.
  • Loading branch information
Tigran Najaryan authored and tigrannajaryan committed Nov 10, 2020
1 parent bf818a2 commit c649b27
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 48 deletions.
26 changes: 16 additions & 10 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -37,6 +38,11 @@ const (
protoGRPC = "grpc"
protoHTTP = "http"
protocolsFieldName = "protocols"

defaultGRPCEndpoint = "0.0.0.0:4317"
defaultHTTPEndpoint = "0.0.0.0:4318"
legacyGRPCEndpoint = "0.0.0.0:55680"
legacyHTTPEndpoint = "0.0.0.0:55681"
)

func NewFactory() component.ReceiverFactory {
Expand All @@ -59,14 +65,14 @@ func createDefaultConfig() configmodels.Receiver {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: defaultGRPCEndpoint,
Transport: "tcp",
},
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
Endpoint: defaultHTTPEndpoint,
},
},
}
Expand Down Expand Up @@ -117,11 +123,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{})
// CreateTracesReceiver creates a trace receiver based on provided config.
func createTraceReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TracesConsumer,
) (component.TracesReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -134,11 +140,11 @@ func createTraceReceiver(
// CreateMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -151,11 +157,11 @@ func createMetricsReceiver(
// CreateLogReceiver creates a log receiver based on provided config.
func createLogReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.LogsConsumer,
) (component.LogsReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -165,7 +171,7 @@ func createLogReceiver(
return r, nil
}

func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) {
func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) {
rCfg := cfg.(*Config)

// There must be one receiver for both metrics and traces. We maintain a map of
Expand All @@ -176,7 +182,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) {
if !ok {
var err error
// We don't have a receiver, so create one.
receiver, err = newOtlpReceiver(rCfg)
receiver, err = newOtlpReceiver(rCfg, logger)
if err != nil {
return nil, err
}
Expand Down
74 changes: 60 additions & 14 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"sync"

gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1"
Expand All @@ -49,14 +51,17 @@ type otlpReceiver struct {

stopOnce sync.Once
startServerOnce sync.Once

logger *zap.Logger
}

// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) {
r := &otlpReceiver{
cfg: cfg,
cfg: cfg,
logger: logger,
}
if cfg.GRPC != nil {
opts, err := cfg.GRPC.ToServerOption()
Expand Down Expand Up @@ -84,6 +89,38 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
return r, nil
}

func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error {
r.logger.Info("Starting GRPC server", zap.String("endpoint", cfg.NetAddr.Endpoint))
var gln net.Listener
gln, err := cfg.ToListener()
if err != nil {
r.logger.Error("Failed to setup a listener", zap.Error(err))
return err
}
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
host.ReportFatalError(errGrpc)
}
}()
return nil
}

func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error {
r.logger.Info("Starting HTTP server", zap.String("endpoint", cfg.Endpoint))
var hln net.Listener
hln, err := r.cfg.HTTP.ToListener()
if err != nil {
r.logger.Error("Failed to setup a listener", zap.Error(err))
return err
}
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
host.ReportFatalError(errHTTP)
}
}()
return nil
}

// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
Expand All @@ -94,32 +131,41 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
var err error
r.startServerOnce.Do(func() {
if r.cfg.GRPC != nil {
var gln net.Listener
gln, err = r.cfg.GRPC.ToListener()
err = r.startGRPCServer(r.cfg.GRPC, host)
if err != nil {
return
}
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
host.ReportFatalError(errGrpc)
if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint {
r.logger.Info("Setting up a second GRPC listener on legacy endpoint")

// Copy the config.
cfgLegacyGRPC := r.cfg.GRPC
// And use the legacy endpoint.
cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint
err = r.startGRPCServer(cfgLegacyGRPC, host)
if err != nil {
return
}
}()
}
}
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.gatewayMux,
confighttp.WithErrorHandler(errorHandler),
)
var hln net.Listener
hln, err = r.cfg.HTTP.ToListener()
err = r.startHTTPServer(r.cfg.HTTP, host)
if err != nil {
return
}
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
host.ReportFatalError(errHTTP)
if r.cfg.HTTP.Endpoint == defaultHTTPEndpoint {
r.logger.Info("Setting up a second HTTP listener on legacy endpoint")
cfgLegacyHTTP := r.cfg.HTTP
cfgLegacyHTTP.Endpoint = legacyHTTPEndpoint
err = r.startHTTPServer(cfgLegacyHTTP, host)
if err != nil {
return
}
}()
}
}
})
return err
Expand Down
5 changes: 3 additions & 2 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) {
}

// TLS is resolved during Creation of the receiver for GRPC.
_, err := createReceiver(cfg)
_, err := createReceiver(cfg, zap.NewNop())
assert.EqualError(t, err,
`failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`)
}
Expand Down Expand Up @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer,
}

func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, zap.NewNop())
require.NoError(t, err)
if tc != nil {
params := component.ReceiverCreateParams{}
Expand Down
26 changes: 6 additions & 20 deletions service/internal/resources.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions testbed/testbed/senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,15 @@ func (ote *OTLPHTTPTraceDataSender) Start() error {
return exp.Start(context.Background(), ote)
}

func (ote *OTLPHTTPTraceDataSender) GenConfigYAMLStr() string {
// Note that this generates a receiver config for agent.
return `
otlp:
protocols:
http:
endpoint: "0.0.0.0:4318"`
}

// OTLPHTTPMetricsDataSender implements MetricDataSender for OTLP/HTTP metrics exporterType.
type OTLPHTTPMetricsDataSender struct {
otlpHTTPDataSender
Expand Down Expand Up @@ -447,6 +456,15 @@ func (ote *OTLPTraceDataSender) Start() error {
return exp.Start(context.Background(), ote)
}

func (ote *OTLPTraceDataSender) GenConfigYAMLStr() string {
// Note that this generates a receiver config for agent.
return `
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"`
}

// OTLPMetricsDataSender implements MetricDataSender for OTLP metrics exporterType.
type OTLPMetricsDataSender struct {
otlpDataSender
Expand Down
5 changes: 3 additions & 2 deletions testbed/tests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func TestTrace10kSPS(t *testing.T) {
},
{
"OTLP",
testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
// testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
testbed.NewOTLPTraceDataSender(testbed.DefaultHost, 55680),
testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
testbed.ResourceSpec{
ExpectedMaxCPU: 20,
Expand All @@ -76,7 +77,7 @@ func TestTrace10kSPS(t *testing.T) {
},
{
"OTLP-HTTP",
testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
testbed.NewOTLPHTTPTraceDataSender(testbed.DefaultHost, 55681),
testbed.NewOTLPHTTPDataReceiver(testbed.GetAvailablePort(t)),
testbed.ResourceSpec{
ExpectedMaxCPU: 20,
Expand Down

0 comments on commit c649b27

Please sign in to comment.