Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change default OTLP port number #2104

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
## 💡 Enhancements 💡

- Add --log-format command line option (default to "console") #2177.
- Change default OTLP/gRPC port number to 4317, also continue receiving on legacy port
55680 during transition period (#2104).

## v0.14.0 Beta

Expand Down
6 changes: 3 additions & 3 deletions receiver/otlpreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ receivers:

The following settings are configurable:

- `endpoint` (default = 0.0.0.0:55680): host:port to which the receiver is
going to receive data. The valid syntax is described at
https://github.com/grpc/grpc/blob/master/doc/naming.md.
- `endpoint` (default = 0.0.0.0:4317 for grpc protocol, 0.0.0.0:55681 http protocol):
host:port to which the receiver is going to receive data. The valid syntax is
described at https://github.com/grpc/grpc/blob/master/doc/naming.md.

## Advanced Configuration

Expand Down
6 changes: 3 additions & 3 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
ReadBufferSize: 512 * 1024,
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
MaxRecvMsgSizeMiB: 32,
Expand All @@ -139,7 +139,7 @@ func TestLoadConfig(t *testing.T) {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:4317",
Transport: "tcp",
},
TLSSetting: &configtls.TLSServerSetting{
Expand Down
25 changes: 15 additions & 10 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -37,6 +38,10 @@ const (
protoGRPC = "grpc"
protoHTTP = "http"
protocolsFieldName = "protocols"

defaultGRPCEndpoint = "0.0.0.0:4317"
defaultHTTPEndpoint = "0.0.0.0:55681"
legacyGRPCEndpoint = "0.0.0.0:55680"
)

func NewFactory() component.ReceiverFactory {
Expand All @@ -59,14 +64,14 @@ func createDefaultConfig() configmodels.Receiver {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "0.0.0.0:55680",
Endpoint: defaultGRPCEndpoint,
Transport: "tcp",
},
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
Endpoint: defaultHTTPEndpoint,
},
},
}
Expand Down Expand Up @@ -117,11 +122,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{})
// CreateTracesReceiver creates a trace receiver based on provided config.
func createTraceReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TracesConsumer,
) (component.TracesReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -134,11 +139,11 @@ func createTraceReceiver(
// CreateMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -151,11 +156,11 @@ func createMetricsReceiver(
// CreateLogReceiver creates a log receiver based on provided config.
func createLogReceiver(
ctx context.Context,
_ component.ReceiverCreateParams,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.LogsConsumer,
) (component.LogsReceiver, error) {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, params.Logger)
if err != nil {
return nil, err
}
Expand All @@ -165,7 +170,7 @@ func createLogReceiver(
return r, nil
}

func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) {
func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) {
rCfg := cfg.(*Config)

// There must be one receiver for both metrics and traces. We maintain a map of
Expand All @@ -176,7 +181,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) {
if !ok {
var err error
// We don't have a receiver, so create one.
receiver, err = newOtlpReceiver(rCfg)
receiver, err = newOtlpReceiver(rCfg, logger)
if err != nil {
return nil, err
}
Expand Down
102 changes: 72 additions & 30 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"sync"

gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1"
Expand All @@ -49,14 +51,17 @@ type otlpReceiver struct {

stopOnce sync.Once
startServerOnce sync.Once

logger *zap.Logger
}

// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) {
r := &otlpReceiver{
cfg: cfg,
cfg: cfg,
logger: logger,
}
if cfg.GRPC != nil {
opts, err := cfg.GRPC.ToServerOption()
Expand Down Expand Up @@ -84,6 +89,70 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
return r, nil
}

func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error {
r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint)
var gln net.Listener
gln, err := cfg.ToListener()
if err != nil {
return err
}
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
host.ReportFatalError(errGrpc)
}
}()
return nil
}

func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error {
r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint)
var hln net.Listener
hln, err := r.cfg.HTTP.ToListener()
if err != nil {
return err
}
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
host.ReportFatalError(errHTTP)
}
}()
return nil
}

func (r *otlpReceiver) startProtocolServers(host component.Host) error {
var err error
if r.cfg.GRPC != nil {
err = r.startGRPCServer(r.cfg.GRPC, host)
if err != nil {
return err
}
if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint {
r.logger.Info("Setting up a second GRPC listener on legacy endpoint " + legacyGRPCEndpoint)

// Copy the config.
cfgLegacyGRPC := r.cfg.GRPC
// And use the legacy endpoint.
cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint
err = r.startGRPCServer(cfgLegacyGRPC, host)
if err != nil {
return err
}
}
}
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.gatewayMux,
confighttp.WithErrorHandler(errorHandler),
)
err = r.startHTTPServer(r.cfg.HTTP, host)
if err != nil {
return err
}
}

return err
}

// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
Expand All @@ -93,34 +162,7 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {

var err error
r.startServerOnce.Do(func() {
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
if r.cfg.GRPC != nil {
var gln net.Listener
gln, err = r.cfg.GRPC.ToListener()
if err != nil {
return
}
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
host.ReportFatalError(errGrpc)
}
}()
}
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.gatewayMux,
confighttp.WithErrorHandler(errorHandler),
)
var hln net.Listener
hln, err = r.cfg.HTTP.ToListener()
if err != nil {
return
}
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
host.ReportFatalError(errHTTP)
}
}()
}
err = r.startProtocolServers(host)
})
return err
}
Expand Down
5 changes: 3 additions & 2 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) {
}

// TLS is resolved during Creation of the receiver for GRPC.
_, err := createReceiver(cfg)
_, err := createReceiver(cfg, zap.NewNop())
assert.EqualError(t, err,
`failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`)
}
Expand Down Expand Up @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer,
}

func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver {
r, err := createReceiver(cfg)
r, err := createReceiver(cfg, zap.NewNop())
require.NoError(t, err)
if tc != nil {
params := component.ReceiverCreateParams{}
Expand Down