From 8589eee279738eb9f7e58babb3ce5ad08638f489 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 5 Aug 2020 11:10:18 -0700 Subject: [PATCH] Change to private unnecessary exported types and funcs (#1499) Signed-off-by: Bogdan Drutu --- exporter/fileexporter/factory.go | 6 +- exporter/fileexporter/file_exporter.go | 14 +-- exporter/fileexporter/file_exporter_test.go | 8 +- exporter/loggingexporter/factory.go | 18 +--- exporter/loggingexporter/logging_exporter.go | 12 +-- .../loggingexporter/logging_exporter_test.go | 6 +- exporter/zipkinexporter/zipkin.go | 4 +- receiver/{empty.go => doc.go} | 0 receiver/empty_test.go | 15 +++ receiver/end_to_end_test.go | 97 ------------------- receiver/fluentforwardreceiver/factory.go | 2 +- receiver/fluentforwardreceiver/receiver.go | 10 +- .../fluentforwardreceiver/receiver_test.go | 8 +- receiver/jaegerreceiver/factory.go | 6 +- receiver/jaegerreceiver/jaeger_agent_test.go | 24 ++--- receiver/jaegerreceiver/trace_receiver.go | 12 +-- .../jaegerreceiver/trace_receiver_test.go | 30 +++--- receiver/opencensusreceiver/config.go | 8 +- receiver/opencensusreceiver/factory.go | 10 +- receiver/opencensusreceiver/opencensus.go | 32 +++--- .../opencensusreceiver/opencensus_test.go | 30 +++--- receiver/opencensusreceiver/options.go | 22 ++--- receiver/otlpreceiver/factory.go | 8 +- receiver/otlpreceiver/factory_test.go | 2 +- receiver/otlpreceiver/otlp.go | 20 ++-- receiver/otlpreceiver/otlp_test.go | 20 ++-- .../prometheusreceiver/metrics_receiver.go | 19 ++-- .../zipkinreceiver/trace_receiver_test.go | 14 +-- 28 files changed, 178 insertions(+), 279 deletions(-) rename receiver/{empty.go => doc.go} (100%) create mode 100644 receiver/empty_test.go delete mode 100644 receiver/end_to_end_test.go diff --git a/exporter/fileexporter/factory.go b/exporter/fileexporter/factory.go index ef25dfaf8536..419c1181b5d4 100644 --- a/exporter/fileexporter/factory.go +++ b/exporter/fileexporter/factory.go @@ -71,7 +71,7 @@ func createLogsExporter( return createExporter(cfg) } -func createExporter(config configmodels.Exporter) (*Exporter, error) { +func createExporter(config configmodels.Exporter) (*fileExporter, error) { cfg := config.(*Config) // There must be one exporter for metrics, traces, and logs. We maintain a @@ -85,7 +85,7 @@ func createExporter(config configmodels.Exporter) (*Exporter, error) { if err != nil { return nil, err } - exporter = &Exporter{file: file} + exporter = &fileExporter{file: file} // Remember the receiver in the map exporters[cfg] = exporter @@ -97,4 +97,4 @@ func createExporter(config configmodels.Exporter) (*Exporter, error) { // We maintain this map because the Factory is asked trace and metric receivers separately // when it gets CreateTraceReceiver() and CreateMetricsReceiver() but they must not // create separate objects, they must use one Receiver object per configuration. -var exporters = map[*Config]*Exporter{} +var exporters = map[*Config]*fileExporter{} diff --git a/exporter/fileexporter/file_exporter.go b/exporter/fileexporter/file_exporter.go index 1fde594b37c2..a1daa43d83a6 100644 --- a/exporter/fileexporter/file_exporter.go +++ b/exporter/fileexporter/file_exporter.go @@ -134,14 +134,14 @@ func exportResourceAndNode(writer *jsonWriter, node *commonpb.Node, resource *re return nil } -// Exporter is the implementation of file exporter that writes telemetry data to a file +// fileExporter is the implementation of file exporter that writes telemetry data to a file // in Protobuf-JSON format. -type Exporter struct { +type fileExporter struct { file io.WriteCloser mutex sync.Mutex } -func (e *Exporter) ConsumeTraces(_ context.Context, td pdata.Traces) error { +func (e *fileExporter) ConsumeTraces(_ context.Context, td pdata.Traces) error { octds := internaldata.TraceDataToOC(td) for _, octd := range octds { // Ensure only one write operation happens at a time. @@ -175,7 +175,7 @@ func (e *Exporter) ConsumeTraces(_ context.Context, td pdata.Traces) error { return nil } -func (e *Exporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { +func (e *fileExporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { ocmds := pdatautil.MetricsToMetricsData(md) for _, ocmd := range ocmds { // Ensure only one write operation happens at a time. @@ -209,7 +209,7 @@ func (e *Exporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { return nil } -func (e *Exporter) ConsumeLogs(_ context.Context, ld pdata.Logs) error { +func (e *fileExporter) ConsumeLogs(_ context.Context, ld pdata.Logs) error { // Ensure only one write operation happens at a time. e.mutex.Lock() defer e.mutex.Unlock() @@ -249,11 +249,11 @@ func (e *Exporter) ConsumeLogs(_ context.Context, ld pdata.Logs) error { return nil } -func (e *Exporter) Start(ctx context.Context, host component.Host) error { +func (e *fileExporter) Start(ctx context.Context, host component.Host) error { return nil } // Shutdown stops the exporter and is invoked during shutdown. -func (e *Exporter) Shutdown(context.Context) error { +func (e *fileExporter) Shutdown(context.Context) error { return e.file.Close() } diff --git a/exporter/fileexporter/file_exporter_test.go b/exporter/fileexporter/file_exporter_test.go index 30eb375bf91a..185ba801ac3d 100644 --- a/exporter/fileexporter/file_exporter_test.go +++ b/exporter/fileexporter/file_exporter_test.go @@ -39,7 +39,7 @@ import ( func TestFileTraceExporterNoErrors(t *testing.T) { mf := &testutil.LimitedWriter{} - lte := &Exporter{file: mf} + lte := &fileExporter{file: mf} require.NotNil(t, lte) td := consumerdata.TraceData{ @@ -98,7 +98,7 @@ func TestFileTraceExporterNoErrors(t *testing.T) { func TestFileMetricsExporterNoErrors(t *testing.T) { mf := &testutil.LimitedWriter{} - lme := &Exporter{file: mf} + lme := &fileExporter{file: mf} require.NotNil(t, lme) md := pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{ @@ -160,7 +160,7 @@ func TestFileMetricsExporterNoErrors(t *testing.T) { func TestFileLogsExporterNoErrors(t *testing.T) { mf := &testutil.LimitedWriter{} - exporter := &Exporter{file: mf} + exporter := &fileExporter{file: mf} require.NotNil(t, exporter) now := time.Now() @@ -342,7 +342,7 @@ func TestFileLogsExporterErrors(t *testing.T) { mf := &testutil.LimitedWriter{ MaxLen: maxLen, } - exporter := &Exporter{file: mf} + exporter := &fileExporter{file: mf} require.NotNil(t, exporter) assert.Error(t, exporter.ConsumeLogs(context.Background(), pdata.LogsFromOtlp(ld))) diff --git a/exporter/loggingexporter/factory.go b/exporter/loggingexporter/factory.go index debaa191c128..797e888c73d8 100644 --- a/exporter/loggingexporter/factory.go +++ b/exporter/loggingexporter/factory.go @@ -62,11 +62,7 @@ func createTraceExporter(_ context.Context, _ component.ExporterCreateParams, co return nil, err } - lexp, err := NewTraceExporter(config, cfg.LogLevel, exporterLogger) - if err != nil { - return nil, err - } - return lexp, nil + return newTraceExporter(config, cfg.LogLevel, exporterLogger) } func createMetricsExporter(_ context.Context, _ component.ExporterCreateParams, config configmodels.Exporter) (component.MetricsExporter, error) { @@ -77,11 +73,7 @@ func createMetricsExporter(_ context.Context, _ component.ExporterCreateParams, return nil, err } - lexp, err := NewMetricsExporter(config, cfg.LogLevel, exporterLogger) - if err != nil { - return nil, err - } - return lexp, nil + return newMetricsExporter(config, cfg.LogLevel, exporterLogger) } func createLogsExporter(_ context.Context, _ component.ExporterCreateParams, config configmodels.Exporter) (component.LogsExporter, error) { @@ -92,11 +84,7 @@ func createLogsExporter(_ context.Context, _ component.ExporterCreateParams, con return nil, err } - lexp, err := NewLogsExporter(config, cfg.LogLevel, exporterLogger) - if err != nil { - return nil, err - } - return lexp, nil + return newLogsExporter(config, cfg.LogLevel, exporterLogger) } func createLogger(cfg *Config) (*zap.Logger, error) { diff --git a/exporter/loggingexporter/logging_exporter.go b/exporter/loggingexporter/logging_exporter.go index de04fa1f3485..0d8c19a46378 100644 --- a/exporter/loggingexporter/logging_exporter.go +++ b/exporter/loggingexporter/logging_exporter.go @@ -360,9 +360,9 @@ func (s *loggingExporter) pushMetricsData( return 0, nil } -// NewTraceExporter creates an exporter.TraceExporter that just drops the +// newTraceExporter creates an exporter.TraceExporter that just drops the // received data and logs debugging messages. -func NewTraceExporter(config configmodels.Exporter, level string, logger *zap.Logger) (component.TraceExporter, error) { +func newTraceExporter(config configmodels.Exporter, level string, logger *zap.Logger) (component.TraceExporter, error) { s := &loggingExporter{ debug: level == "debug", logger: logger, @@ -379,9 +379,9 @@ func NewTraceExporter(config configmodels.Exporter, level string, logger *zap.Lo ) } -// NewMetricsExporter creates an exporter.MetricsExporter that just drops the +// newMetricsExporter creates an exporter.MetricsExporter that just drops the // received data and logs debugging messages. -func NewMetricsExporter(config configmodels.Exporter, level string, logger *zap.Logger) (component.MetricsExporter, error) { +func newMetricsExporter(config configmodels.Exporter, level string, logger *zap.Logger) (component.MetricsExporter, error) { s := &loggingExporter{ debug: level == "debug", logger: logger, @@ -398,9 +398,9 @@ func NewMetricsExporter(config configmodels.Exporter, level string, logger *zap. ) } -// NewLogsExporter creates an exporter.LogsExporter that just drops the +// newLogsExporter creates an exporter.LogsExporter that just drops the // received data and logs debugging messages. -func NewLogsExporter(config configmodels.Exporter, level string, logger *zap.Logger) (component.LogsExporter, error) { +func newLogsExporter(config configmodels.Exporter, level string, logger *zap.Logger) (component.LogsExporter, error) { s := &loggingExporter{ debug: level == "debug", logger: logger, diff --git a/exporter/loggingexporter/logging_exporter_test.go b/exporter/loggingexporter/logging_exporter_test.go index 168f60e4bf43..fe2a73512826 100644 --- a/exporter/loggingexporter/logging_exporter_test.go +++ b/exporter/loggingexporter/logging_exporter_test.go @@ -27,7 +27,7 @@ import ( ) func TestLoggingTraceExporterNoErrors(t *testing.T) { - lte, err := NewTraceExporter(&configmodels.ExporterSettings{}, "debug", zap.NewNop()) + lte, err := newTraceExporter(&configmodels.ExporterSettings{}, "debug", zap.NewNop()) require.NotNil(t, lte) assert.NoError(t, err) @@ -41,7 +41,7 @@ func TestLoggingTraceExporterNoErrors(t *testing.T) { } func TestLoggingMetricsExporterNoErrors(t *testing.T) { - lme, err := NewMetricsExporter(&configmodels.ExporterSettings{}, "debug", zap.NewNop()) + lme, err := newMetricsExporter(&configmodels.ExporterSettings{}, "debug", zap.NewNop()) require.NotNil(t, lme) assert.NoError(t, err) @@ -59,7 +59,7 @@ func TestLoggingMetricsExporterNoErrors(t *testing.T) { } func TestLoggingLogsExporterNoErrors(t *testing.T) { - lle, err := NewLogsExporter(&configmodels.ExporterSettings{}, "debug", zap.NewNop()) + lle, err := newLogsExporter(&configmodels.ExporterSettings{}, "debug", zap.NewNop()) require.NotNil(t, lle) assert.NoError(t, err) diff --git a/exporter/zipkinexporter/zipkin.go b/exporter/zipkinexporter/zipkin.go index 278565a97b6a..a6932c790ae9 100644 --- a/exporter/zipkinexporter/zipkin.go +++ b/exporter/zipkinexporter/zipkin.go @@ -51,7 +51,7 @@ func newTraceExporter(config *Config) (component.TraceExporter, error) { if err != nil { return nil, err } - zexp, err := exporterhelper.NewTraceExporter(config, ze.PushTraceData) + zexp, err := exporterhelper.NewTraceExporter(config, ze.pushTraceData) if err != nil { return nil, err } @@ -83,7 +83,7 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) { return ze, nil } -func (ze *zipkinExporter) PushTraceData(ctx context.Context, td pdata.Traces) (int, error) { +func (ze *zipkinExporter) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) { numSpans := td.SpanCount() octds := internaldata.TraceDataToOC(td) diff --git a/receiver/empty.go b/receiver/doc.go similarity index 100% rename from receiver/empty.go rename to receiver/doc.go diff --git a/receiver/empty_test.go b/receiver/empty_test.go new file mode 100644 index 000000000000..bf313de0b521 --- /dev/null +++ b/receiver/empty_test.go @@ -0,0 +1,15 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receiver diff --git a/receiver/end_to_end_test.go b/receiver/end_to_end_test.go deleted file mode 100644 index 94f1d8e04785..000000000000 --- a/receiver/end_to_end_test.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package receiver_test - -import ( - "context" - "log" - "time" - - "contrib.go.opencensus.io/exporter/ocagent" - "go.opencensus.io/trace" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/consumer/converter" - "go.opentelemetry.io/collector/exporter/loggingexporter" - "go.opentelemetry.io/collector/receiver/opencensusreceiver" -) - -func Example_endToEnd() { - // This is what the cmd/ocagent code would look like this. - // A trace receiver as per the trace receiver - // configs that have been parsed. - lte, err := loggingexporter.NewTraceExporter(&configmodels.ExporterSettings{}, "debug", zap.NewNop()) - if err != nil { - log.Fatalf("Failed to create logging exporter: %v", err) - } - - tr, err := opencensusreceiver.New("opencensus", "tcp", "localhost:55678", converter.NewOCToInternalTraceConverter(lte), nil) - if err != nil { - log.Fatalf("Failed to create trace receiver: %v", err) - } - - // The agent will combine all trace receivers like this. - trl := []component.TraceReceiver{tr} - - // Once we have the span receiver which will connect to the - // various exporter pipeline i.e. *tracepb.Span->OpenCensus.SpanData - for _, tr := range trl { - if err = tr.Start(context.Background(), nil); err != nil { - log.Fatalf("Failed to start trace receiver: %v", err) - } - } - - // Before exiting, stop all the trace receivers - defer func() { - for _, tr := range trl { - _ = tr.Shutdown(context.Background()) - } - }() - log.Println("Done starting the trace receiver") - // We are done with the agent-core - - // Now this code would exist in the client application e.g. client code. - // Create the agent exporter - oce, err := ocagent.NewExporter(ocagent.WithInsecure()) - if err != nil { - log.Fatalf("Failed to create ocagent exporter: %v", err) - } - defer oce.Stop() - - // Register it as a trace exporter - trace.RegisterExporter(oce) - // For demo purposes we are always sampling - trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()}) - - log.Println("Starting loop") - ctx, span := trace.StartSpan(context.Background(), "ClientLibrarySpan") - for i := 0; i < 10; i++ { - _, childSpan := trace.StartSpan(ctx, "ChildSpan") - childSpan.Annotatef([]trace.Attribute{ - trace.StringAttribute("type", "Child"), - trace.Int64Attribute("i", int64(i)), - }, "This is an annotation") - <-time.After(100 * time.Millisecond) - childSpan.End() - oce.Flush() - } - span.End() - - <-time.After(400 * time.Millisecond) - oce.Flush() - <-time.After(5 * time.Second) -} diff --git a/receiver/fluentforwardreceiver/factory.go b/receiver/fluentforwardreceiver/factory.go index 49a0d0856cc6..900f68fb5665 100644 --- a/receiver/fluentforwardreceiver/factory.go +++ b/receiver/fluentforwardreceiver/factory.go @@ -54,5 +54,5 @@ func createLogsReceiver( ) (component.LogsReceiver, error) { rCfg := cfg.(*Config) - return New(ctx, params.Logger, rCfg, consumer) + return newFluentReceiver(ctx, params.Logger, rCfg, consumer) } diff --git a/receiver/fluentforwardreceiver/receiver.go b/receiver/fluentforwardreceiver/receiver.go index e62025e8ba3a..05fba3ee79ba 100644 --- a/receiver/fluentforwardreceiver/receiver.go +++ b/receiver/fluentforwardreceiver/receiver.go @@ -29,7 +29,7 @@ import ( // FluentBit and increase throughput. const eventChannelLength = 100 -type Receiver struct { +type fluentReceiver struct { collector *Collector listener net.Listener conf *Config @@ -38,14 +38,14 @@ type Receiver struct { cancel context.CancelFunc } -func New(ctx context.Context, logger *zap.Logger, conf *Config, next consumer.LogsConsumer) (component.LogsReceiver, error) { +func newFluentReceiver(ctx context.Context, logger *zap.Logger, conf *Config, next consumer.LogsConsumer) (component.LogsReceiver, error) { eventCh := make(chan Event, eventChannelLength) collector := newCollector(eventCh, next, logger) server := newServer(eventCh, logger) - return &Receiver{ + return &fluentReceiver{ collector: collector, server: server, conf: conf, @@ -53,7 +53,7 @@ func New(ctx context.Context, logger *zap.Logger, conf *Config, next consumer.Lo }, nil } -func (r *Receiver) Start(ctx context.Context, _ component.Host) error { +func (r *fluentReceiver) Start(ctx context.Context, _ component.Host) error { receiverCtx, cancel := context.WithCancel(ctx) r.cancel = cancel @@ -88,7 +88,7 @@ func (r *Receiver) Start(ctx context.Context, _ component.Host) error { return nil } -func (r *Receiver) Shutdown(ctx context.Context) error { +func (r *fluentReceiver) Shutdown(ctx context.Context) error { r.cancel() return nil } diff --git a/receiver/fluentforwardreceiver/receiver_test.go b/receiver/fluentforwardreceiver/receiver_test.go index 4b346f19caa7..12d369c5690e 100644 --- a/receiver/fluentforwardreceiver/receiver_test.go +++ b/receiver/fluentforwardreceiver/receiver_test.go @@ -48,12 +48,12 @@ func setupServer(t *testing.T) (func() net.Conn, *exportertest.SinkLogsExporter, ListenAddress: "127.0.0.1:0", } - receiver, err := New(ctx, logger, conf, next) + receiver, err := newFluentReceiver(ctx, logger, conf, next) require.NoError(t, err) require.NoError(t, receiver.Start(ctx, nil)) connect := func() net.Conn { - conn, err := net.Dial("tcp", receiver.(*Receiver).listener.Addr().String()) + conn, err := net.Dial("tcp", receiver.(*fluentReceiver).listener.Addr().String()) require.Nil(t, err) return conn } @@ -370,11 +370,11 @@ func TestUnixEndpoint(t *testing.T) { ListenAddress: "unix://" + filepath.Join(tmpdir, "fluent.sock"), } - receiver, err := New(ctx, zap.NewNop(), conf, next) + receiver, err := newFluentReceiver(ctx, zap.NewNop(), conf, next) require.NoError(t, err) require.NoError(t, receiver.Start(ctx, nil)) - conn, err := net.Dial("unix", receiver.(*Receiver).listener.Addr().String()) + conn, err := net.Dial("unix", receiver.(*fluentReceiver).listener.Addr().String()) require.NoError(t, err) n, err := conn.Write(testdata.ParseHexDump("message-event")) diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index ce03624576c7..46b07e3923fc 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -146,13 +146,13 @@ func createTraceReceiver( nextConsumer consumer.TraceConsumer, ) (component.TraceReceiver, error) { - // Convert settings in the source config to Configuration struct + // Convert settings in the source config to configuration struct // that Jaeger receiver understands. rCfg := cfg.(*Config) remoteSamplingConfig := rCfg.RemoteSampling - config := Configuration{} + config := configuration{} // Set ports if rCfg.Protocols.GRPC != nil { @@ -231,7 +231,7 @@ func createTraceReceiver( } // Create the receiver. - return New(rCfg.Name(), &config, nextConsumer, params) + return newJaegerReceiver(rCfg.Name(), &config, nextConsumer, params) } // extract the port number from string in "address:port" format. If the diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index b86d22c897dc..18774a5c5b7a 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -42,7 +42,7 @@ const jaegerAgent = "jaeger_agent_test" func TestJaegerAgentUDP_ThriftCompact_6831(t *testing.T) { port := 6831 addrForClient := fmt.Sprintf(":%d", port) - testJaegerAgent(t, addrForClient, &Configuration{ + testJaegerAgent(t, addrForClient, &configuration{ AgentCompactThriftPort: port, }) } @@ -50,11 +50,11 @@ func TestJaegerAgentUDP_ThriftCompact_6831(t *testing.T) { func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) { port := 999999 - config := &Configuration{ + config := &configuration{ AgentCompactThriftPort: int(port), } params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerAgent, config, nil, params) + jr, err := newJaegerReceiver(jaegerAgent, config, nil, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") err = jr.Start(context.Background(), componenttest.NewNopHost()) @@ -68,7 +68,7 @@ func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { port := 6832 addrForClient := fmt.Sprintf(":%d", port) - testJaegerAgent(t, addrForClient, &Configuration{ + testJaegerAgent(t, addrForClient, &configuration{ AgentBinaryThriftPort: port, }) } @@ -77,11 +77,11 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { // This test confirms that the thrift binary port is opened correctly. This is all we can test at the moment. See above. port := testutil.GetAvailablePort(t) - config := &Configuration{ + config := &configuration{ AgentBinaryThriftPort: int(port), } params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerAgent, config, nil, params) + jr, err := newJaegerReceiver(jaegerAgent, config, nil, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") err = jr.(*jReceiver).startAgent(componenttest.NewNopHost()) @@ -99,11 +99,11 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) { port := 999999 - config := &Configuration{ + config := &configuration{ AgentBinaryThriftPort: int(port), } params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerAgent, config, nil, params) + jr, err := newJaegerReceiver(jaegerAgent, config, nil, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") err = jr.Start(context.Background(), componenttest.NewNopHost()) @@ -138,7 +138,7 @@ func TestJaegerHTTP(t *testing.T) { defer s.GracefulStop() port := testutil.GetAvailablePort(t) - config := &Configuration{ + config := &configuration{ AgentHTTPPort: int(port), RemoteSamplingClientSettings: configgrpc.GRPCClientSettings{ Endpoint: addr.String(), @@ -148,7 +148,7 @@ func TestJaegerHTTP(t *testing.T) { }, } params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerAgent, config, nil, params) + jr, err := newJaegerReceiver(jaegerAgent, config, nil, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") defer jr.Shutdown(context.Background()) @@ -181,11 +181,11 @@ func TestJaegerHTTP(t *testing.T) { } } -func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configuration) { +func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) { // 1. Create the Jaeger receiver aka "server" sink := new(exportertest.SinkTraceExporter) params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerAgent, receiverConfig, sink, params) + jr, err := newJaegerReceiver(jaegerAgent, receiverConfig, sink, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") defer jr.Shutdown(context.Background()) diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 14653774c90d..1744f40d978e 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -53,9 +53,9 @@ import ( jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" ) -// Configuration defines the behavior and the ports that +// configuration defines the behavior and the ports that // the Jaeger receiver will use. -type Configuration struct { +type configuration struct { CollectorThriftPort int CollectorHTTPPort int CollectorGRPCPort int @@ -80,7 +80,7 @@ type jReceiver struct { startOnce sync.Once stopOnce sync.Once - config *Configuration + config *configuration grpc *grpc.Server collectorServer *http.Server @@ -117,11 +117,11 @@ var ( } ) -// New creates a TraceReceiver that receives traffic as a Jaeger collector, and +// newJaegerReceiver creates a TraceReceiver that receives traffic as a Jaeger collector, and // also as a Jaeger agent. -func New( +func newJaegerReceiver( instanceName string, - config *Configuration, + config *configuration, nextConsumer consumer.TraceConsumer, params component.ReceiverCreateParams, ) (component.TraceReceiver, error) { diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index 2874fa1d1102..d9053cf69d8d 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -59,7 +59,7 @@ const jaegerReceiver = "jaeger_receiver_test" func TestTraceSource(t *testing.T) { params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerReceiver, &Configuration{}, nil, params) + jr, err := newJaegerReceiver(jaegerReceiver, &configuration{}, nil, params) assert.NoError(t, err, "should not have failed to create the Jaeger receiver") require.NotNil(t, jr) } @@ -130,13 +130,13 @@ func TestClientIPDetection(t *testing.T) { func TestReception(t *testing.T) { // 1. Create the Jaeger receiver aka "server" - config := &Configuration{ + config := &configuration{ CollectorHTTPPort: 14268, // that's the only one used by this test } sink := new(exportertest.SinkTraceExporter) params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerReceiver, config, sink, params) + jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) defer jr.Shutdown(context.Background()) assert.NoError(t, err, "should not have failed to create the Jaeger received") @@ -179,12 +179,12 @@ func TestReception(t *testing.T) { func TestPortsNotOpen(t *testing.T) { // an empty config should result in no open ports - config := &Configuration{} + config := &configuration{} sink := new(exportertest.SinkTraceExporter) params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerReceiver, config, sink, params) + jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -208,13 +208,13 @@ func TestPortsNotOpen(t *testing.T) { func TestGRPCReception(t *testing.T) { // prepare - config := &Configuration{ + config := &configuration{ CollectorGRPCPort: 14250, // that's the only one used by this test } sink := new(exportertest.SinkTraceExporter) params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerReceiver, config, sink, params) + jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -265,14 +265,14 @@ func TestGRPCReceptionWithTLS(t *testing.T) { grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsCfg))) port := testutil.GetAvailablePort(t) - config := &Configuration{ + config := &configuration{ CollectorGRPCPort: int(port), CollectorGRPCOptions: grpcServerOptions, } sink := new(exportertest.SinkTraceExporter) params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerReceiver, config, sink, params) + jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -448,14 +448,14 @@ func grpcFixture(t1 time.Time, d1, d2 time.Duration) *api_v2.PostSpansRequest { func TestSampling(t *testing.T) { port := testutil.GetAvailablePort(t) - config := &Configuration{ + config := &configuration{ CollectorGRPCPort: int(port), RemoteSamplingStrategyFile: "testdata/strategies.json", } sink := new(exportertest.SinkTraceExporter) params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerReceiver, config, sink, params) + jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -502,13 +502,13 @@ func TestSampling(t *testing.T) { func TestSamplingFailsOnNotConfigured(t *testing.T) { port := testutil.GetAvailablePort(t) // prepare - config := &Configuration{ + config := &configuration{ CollectorGRPCPort: int(port), } sink := new(exportertest.SinkTraceExporter) params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerReceiver, config, sink, params) + jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -531,14 +531,14 @@ func TestSamplingFailsOnNotConfigured(t *testing.T) { func TestSamplingFailsOnBadFile(t *testing.T) { port := testutil.GetAvailablePort(t) // prepare - config := &Configuration{ + config := &configuration{ CollectorGRPCPort: int(port), RemoteSamplingStrategyFile: "does-not-exist", } sink := new(exportertest.SinkTraceExporter) params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr, err := New(jaegerReceiver, config, sink, params) + jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) assert.Error(t, jr.Start(context.Background(), componenttest.NewNopHost())) diff --git a/receiver/opencensusreceiver/config.go b/receiver/opencensusreceiver/config.go index 16706e805eeb..4fc2d2f6efcd 100644 --- a/receiver/opencensusreceiver/config.go +++ b/receiver/opencensusreceiver/config.go @@ -33,10 +33,10 @@ type Config struct { CorsOrigins []string `mapstructure:"cors_allowed_origins"` } -func (rOpts *Config) buildOptions() ([]Option, error) { - var opts []Option +func (rOpts *Config) buildOptions() ([]ocOption, error) { + var opts []ocOption if len(rOpts.CorsOrigins) > 0 { - opts = append(opts, WithCorsOrigins(rOpts.CorsOrigins)) + opts = append(opts, withCorsOrigins(rOpts.CorsOrigins)) } grpcServerOptions, err := rOpts.GRPCServerSettings.ToServerOption() @@ -44,7 +44,7 @@ func (rOpts *Config) buildOptions() ([]Option, error) { return nil, err } if len(grpcServerOptions) > 0 { - opts = append(opts, WithGRPCServerOptions(grpcServerOptions...)) + opts = append(opts, withGRPCServerOptions(grpcServerOptions...)) } return opts, nil diff --git a/receiver/opencensusreceiver/factory.go b/receiver/opencensusreceiver/factory.go index 125ce6edaf0f..168e456b8c37 100644 --- a/receiver/opencensusreceiver/factory.go +++ b/receiver/opencensusreceiver/factory.go @@ -35,7 +35,7 @@ const ( type Factory struct { } -// Type gets the type of the Receiver config created by this Factory. +// Type gets the type of the ocReceiver config created by this Factory. func (f *Factory) Type() configmodels.Type { return typeStr } @@ -88,7 +88,7 @@ func (f *Factory) CreateMetricsReceiver(_ context.Context, _ *zap.Logger, cfg co return r, nil } -func (f *Factory) createReceiver(cfg configmodels.Receiver) (*Receiver, error) { +func (f *Factory) createReceiver(cfg configmodels.Receiver) (*ocReceiver, error) { rCfg := cfg.(*Config) // There must be one receiver for both metrics and traces. We maintain a map of @@ -104,7 +104,7 @@ func (f *Factory) createReceiver(cfg configmodels.Receiver) (*Receiver, error) { } // We don't have a receiver, so create one. - receiver, err = New( + receiver, err = newOpenCensusReceiver( rCfg.Name(), rCfg.NetAddr.Transport, rCfg.NetAddr.Endpoint, nil, nil, opts...) if err != nil { return nil, err @@ -118,5 +118,5 @@ func (f *Factory) createReceiver(cfg configmodels.Receiver) (*Receiver, error) { // This is the map of already created OpenCensus receivers for particular configurations. // We maintain this map because the Factory is asked trace and metric receivers separately // when it gets CreateTraceReceiver() and CreateMetricsReceiver() but they must not -// create separate objects, they must use one Receiver object per configuration. -var receivers = map[*Config]*Receiver{} +// create separate objects, they must use one ocReceiver object per configuration. +var receivers = map[*Config]*ocReceiver{} diff --git a/receiver/opencensusreceiver/opencensus.go b/receiver/opencensusreceiver/opencensus.go index f944349a31a8..12540b5f0ac6 100644 --- a/receiver/opencensusreceiver/opencensus.go +++ b/receiver/opencensusreceiver/opencensus.go @@ -37,8 +37,8 @@ import ( "go.opentelemetry.io/collector/receiver/opencensusreceiver/octrace" ) -// Receiver is the type that exposes Trace and Metrics reception. -type Receiver struct { +// ocReceiver is the type that exposes Trace and Metrics reception. +type ocReceiver struct { mu sync.Mutex ln net.Listener serverGRPC *grpc.Server @@ -63,24 +63,24 @@ type Receiver struct { instanceName string } -// New just creates the OpenCensus receiver services. It is the caller's +// newOpenCensusReceiver just creates the OpenCensus receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func New( +func newOpenCensusReceiver( instanceName string, transport string, addr string, tc consumer.TraceConsumerOld, mc consumer.MetricsConsumerOld, - opts ...Option, -) (*Receiver, error) { + opts ...ocOption, +) (*ocReceiver, error) { // TODO: (@odeke-em) use options to enable address binding changes. ln, err := net.Listen(transport, addr) if err != nil { return nil, fmt.Errorf("failed to bind to address %q: %v", addr, err) } - ocr := &Receiver{ + ocr := &ocReceiver{ ln: ln, corsOrigins: []string{}, // Disable CORS by default. gatewayMux: gatewayruntime.NewServeMux(), @@ -99,11 +99,11 @@ func New( // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. -func (ocr *Receiver) Start(ctx context.Context, host component.Host) error { +func (ocr *ocReceiver) Start(ctx context.Context, host component.Host) error { return ocr.start(host) } -func (ocr *Receiver) registerTraceConsumer() error { +func (ocr *ocReceiver) registerTraceConsumer() error { var err = componenterror.ErrAlreadyStarted ocr.startTraceReceiverOnce.Do(func() { @@ -118,7 +118,7 @@ func (ocr *Receiver) registerTraceConsumer() error { return err } -func (ocr *Receiver) registerMetricsConsumer() error { +func (ocr *ocReceiver) registerMetricsConsumer() error { var err = componenterror.ErrAlreadyStarted ocr.startMetricsReceiverOnce.Do(func() { @@ -132,7 +132,7 @@ func (ocr *Receiver) registerMetricsConsumer() error { return err } -func (ocr *Receiver) grpcServer() *grpc.Server { +func (ocr *ocReceiver) grpcServer() *grpc.Server { ocr.mu.Lock() defer ocr.mu.Unlock() @@ -144,7 +144,7 @@ func (ocr *Receiver) grpcServer() *grpc.Server { } // Shutdown is a method to turn off receiving. -func (ocr *Receiver) Shutdown(context.Context) error { +func (ocr *ocReceiver) Shutdown(context.Context) error { if err := ocr.stop(); err != componenterror.ErrAlreadyStopped { return err } @@ -152,7 +152,7 @@ func (ocr *Receiver) Shutdown(context.Context) error { } // start runs all the receivers/services namely, Trace and Metrics services. -func (ocr *Receiver) start(host component.Host) error { +func (ocr *ocReceiver) start(host component.Host) error { hasConsumer := false if ocr.traceConsumer != nil { hasConsumer = true @@ -182,7 +182,7 @@ func (ocr *Receiver) start(host component.Host) error { } // stop stops the underlying gRPC server and all the services running on it. -func (ocr *Receiver) stop() error { +func (ocr *ocReceiver) stop() error { ocr.mu.Lock() defer ocr.mu.Unlock() @@ -208,7 +208,7 @@ func (ocr *Receiver) stop() error { return err } -func (ocr *Receiver) httpServer() *http.Server { +func (ocr *ocReceiver) httpServer() *http.Server { ocr.mu.Lock() defer ocr.mu.Unlock() @@ -224,7 +224,7 @@ func (ocr *Receiver) httpServer() *http.Server { return ocr.serverHTTP } -func (ocr *Receiver) startServer(host component.Host) error { +func (ocr *ocReceiver) startServer(host component.Host) error { err := componenterror.ErrAlreadyStarted ocr.startServerOnce.Do(func() { err = nil diff --git a/receiver/opencensusreceiver/opencensus_test.go b/receiver/opencensusreceiver/opencensus_test.go index 0ab80a7aa065..a5a5ee43b6c4 100644 --- a/receiver/opencensusreceiver/opencensus_test.go +++ b/receiver/opencensusreceiver/opencensus_test.go @@ -49,7 +49,7 @@ import ( "go.opentelemetry.io/collector/testutil" ) -const ocReceiver = "oc_receiver_test" +const ocReceiverName = "oc_receiver_test" // TODO(ccaraman): Migrate tests to use assert for validating functionality. func TestGrpcGateway_endToEnd(t *testing.T) { @@ -57,7 +57,7 @@ func TestGrpcGateway_endToEnd(t *testing.T) { // Set the buffer count to 1 to make it flush the test span immediately. sink := new(exportertest.SinkTraceExporterOld) - ocr, err := New(ocReceiver, "tcp", addr, sink, nil) + ocr, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, sink, nil) require.NoError(t, err, "Failed to create trace receiver: %v", err) err = ocr.Start(context.Background(), componenttest.NewNopHost()) @@ -155,7 +155,7 @@ func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) { corsOrigins := []string{"allowed-*.com"} sink := new(exportertest.SinkTraceExporterOld) - ocr, err := New(ocReceiver, "tcp", addr, sink, nil, WithCorsOrigins(corsOrigins)) + ocr, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, sink, nil, withCorsOrigins(corsOrigins)) require.NoError(t, err, "Failed to create trace receiver: %v", err) defer ocr.Shutdown(context.Background()) @@ -180,7 +180,7 @@ func TestMetricsGrpcGatewayCors_endToEnd(t *testing.T) { corsOrigins := []string{"allowed-*.com"} sink := new(exportertest.SinkMetricsExporterOld) - ocr, err := New(ocReceiver, "tcp", addr, nil, sink, WithCorsOrigins(corsOrigins)) + ocr, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, nil, sink, withCorsOrigins(corsOrigins)) require.NoError(t, err, "Failed to create metrics receiver: %v", err) defer ocr.Shutdown(context.Background()) @@ -208,7 +208,7 @@ func TestAcceptAllGRPCProtoAffiliatedContentTypes(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) cbts := new(exportertest.SinkTraceExporterOld) - ocr, err := New(ocReceiver, "tcp", addr, cbts, nil) + ocr, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, cbts, nil) require.NoError(t, err, "Failed to create trace receiver: %v", err) err = ocr.Start(context.Background(), componenttest.NewNopHost()) @@ -237,7 +237,7 @@ func TestAcceptAllGRPCProtoAffiliatedContentTypes(t *testing.T) { wantLen := len(protoAffiliatedContentSubTypes) + len(protoAffiliatedContentTypes) gotReqs := cbts.AllTraces() if len(gotReqs) != wantLen { - t.Errorf("Receiver ExportTraceServiceRequest length mismatch:: Got %d Want %d", len(gotReqs), wantLen) + t.Errorf("ocReceiver ExportTraceServiceRequest length mismatch:: Got %d Want %d", len(gotReqs), wantLen) } } @@ -331,7 +331,7 @@ func verifyCorsResp(t *testing.T, url string, origin string, wantStatus int, wan func TestStopWithoutStartNeverCrashes(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - ocr, err := New(ocReceiver, "tcp", addr, nil, nil) + ocr, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, nil, nil) require.NoError(t, err, "Failed to create an OpenCensus receiver: %v", err) // Stop it before ever invoking Start*. ocr.stop() @@ -343,14 +343,14 @@ func TestNewPortAlreadyUsed(t *testing.T) { require.NoError(t, err, "failed to listen on %q: %v", addr, err) defer ln.Close() - r, err := New(ocReceiver, "tcp", addr, nil, nil) + r, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, nil, nil) require.Error(t, err) require.Nil(t, r) } func TestMultipleStopReceptionShouldNotError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - r, err := New(ocReceiver, "tcp", addr, new(exportertest.SinkTraceExporterOld), new(exportertest.SinkMetricsExporterOld)) + r, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, new(exportertest.SinkTraceExporterOld), new(exportertest.SinkMetricsExporterOld)) require.NoError(t, err) require.NotNil(t, r) @@ -360,7 +360,7 @@ func TestMultipleStopReceptionShouldNotError(t *testing.T) { func TestStartWithoutConsumersShouldFail(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - r, err := New(ocReceiver, "tcp", addr, nil, nil) + r, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, nil, nil) require.NoError(t, err) require.NotNil(t, r) @@ -379,7 +379,7 @@ func tempSocketName(t *testing.T) string { func TestReceiveOnUnixDomainSocket_endToEnd(t *testing.T) { socketName := tempSocketName(t) cbts := new(exportertest.SinkTraceExporterOld) - r, err := New(ocReceiver, "unix", socketName, cbts, nil) + r, err := newOpenCensusReceiver(ocReceiverName, "unix", socketName, cbts, nil) require.NoError(t, err) require.NotNil(t, r) require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) @@ -522,8 +522,8 @@ func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) { sink := new(exportertest.SinkTraceExporterOld) - var opts []Option - ocr, err := New(exporter.receiverTag, "tcp", addr, nil, nil, opts...) + var opts []ocOption + ocr, err := newOpenCensusReceiver(exporter.receiverTag, "tcp", addr, nil, nil, opts...) require.Nil(t, err) require.NotNil(t, ocr) @@ -671,8 +671,8 @@ func TestOCReceiverMetrics_HandleNextConsumerResponse(t *testing.T) { sink := new(exportertest.SinkMetricsExporterOld) - var opts []Option - ocr, err := New(exporter.receiverTag, "tcp", addr, nil, nil, opts...) + var opts []ocOption + ocr, err := newOpenCensusReceiver(exporter.receiverTag, "tcp", addr, nil, nil, opts...) require.Nil(t, err) require.NotNil(t, ocr) diff --git a/receiver/opencensusreceiver/options.go b/receiver/opencensusreceiver/options.go index ceab1b0bfa59..d53d5b2be5fc 100644 --- a/receiver/opencensusreceiver/options.go +++ b/receiver/opencensusreceiver/options.go @@ -18,39 +18,39 @@ import ( "google.golang.org/grpc" ) -// Option interface defines for configuration settings to be applied to receivers. +// ocOption interface defines for configuration settings to be applied to receivers. // // withReceiver applies the configuration to the given receiver. -type Option interface { - withReceiver(*Receiver) +type ocOption interface { + withReceiver(*ocReceiver) } type corsOrigins struct { origins []string } -var _ Option = (*corsOrigins)(nil) +var _ ocOption = (*corsOrigins)(nil) -func (co *corsOrigins) withReceiver(ocr *Receiver) { +func (co *corsOrigins) withReceiver(ocr *ocReceiver) { ocr.corsOrigins = co.origins } -// WithCorsOrigins is an option to specify the allowed origins to enable writing +// withCorsOrigins is an option to specify the allowed origins to enable writing // HTTP/JSON requests to the grpc-gateway adapter using CORS. -func WithCorsOrigins(origins []string) Option { +func withCorsOrigins(origins []string) ocOption { return &corsOrigins{origins: origins} } -var _ Option = (grpcServerOptions)(nil) +var _ ocOption = (grpcServerOptions)(nil) type grpcServerOptions []grpc.ServerOption -func (gsvo grpcServerOptions) withReceiver(ocr *Receiver) { +func (gsvo grpcServerOptions) withReceiver(ocr *ocReceiver) { ocr.grpcServerOptions = gsvo } -// WithGRPCServerOptions allows one to specify the options for starting a gRPC server. -func WithGRPCServerOptions(gsOpts ...grpc.ServerOption) Option { +// withGRPCServerOptions allows one to specify the options for starting a gRPC server. +func withGRPCServerOptions(gsOpts ...grpc.ServerOption) ocOption { gsvOpts := grpcServerOptions(gsOpts) return gsvOpts } diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 173376a99f0f..550f167ebc36 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -165,7 +165,7 @@ func createLogReceiver( return r, nil } -func createReceiver(cfg configmodels.Receiver) (*Receiver, error) { +func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { rCfg := cfg.(*Config) // There must be one receiver for both metrics and traces. We maintain a map of @@ -176,7 +176,7 @@ func createReceiver(cfg configmodels.Receiver) (*Receiver, error) { if !ok { var err error // We don't have a receiver, so create one. - receiver, err = New(rCfg) + receiver, err = newOtlpReceiver(rCfg) if err != nil { return nil, err } @@ -189,5 +189,5 @@ func createReceiver(cfg configmodels.Receiver) (*Receiver, error) { // This is the map of already created OTLP receivers for particular configurations. // We maintain this map because the Factory is asked trace and metric receivers separately // when it gets CreateTraceReceiver() and CreateMetricsReceiver() but they must not -// create separate objects, they must use one Receiver object per configuration. -var receivers = map[*Config]*Receiver{} +// create separate objects, they must use one otlpReceiver object per configuration. +var receivers = map[*Config]*otlpReceiver{} diff --git a/receiver/otlpreceiver/factory_test.go b/receiver/otlpreceiver/factory_test.go index abcf7613332e..800f9589663e 100644 --- a/receiver/otlpreceiver/factory_test.go +++ b/receiver/otlpreceiver/factory_test.go @@ -348,7 +348,7 @@ func TestCreateLogReceiver(t *testing.T) { require.NoError(t, mr.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, mr.Shutdown(context.Background())) } - receivers = map[*Config]*Receiver{} + receivers = map[*Config]*otlpReceiver{} }) } } diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 0c69f72ae316..49fc11b94ef6 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -35,8 +35,8 @@ import ( "go.opentelemetry.io/collector/receiver/otlpreceiver/trace" ) -// Receiver is the type that exposes Trace and Metrics reception. -type Receiver struct { +// otlpReceiver is the type that exposes Trace and Metrics reception. +type otlpReceiver struct { cfg *Config serverGRPC *grpc.Server gatewayMux *gatewayruntime.ServeMux @@ -50,11 +50,11 @@ type Receiver struct { startServerOnce sync.Once } -// New just creates the OpenTelemetry receiver services. It is the caller's +// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func New(cfg *Config) (*Receiver, error) { - r := &Receiver{ +func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { + r := &otlpReceiver{ cfg: cfg, } if cfg.GRPC != nil { @@ -75,7 +75,7 @@ func New(cfg *Config) (*Receiver, error) { // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. -func (r *Receiver) Start(ctx context.Context, host component.Host) error { +func (r *otlpReceiver) Start(ctx context.Context, host component.Host) error { if r.traceReceiver == nil && r.metricsReceiver == nil && r.logReceiver == nil { return errors.New("cannot start receiver: no consumers were specified") } @@ -112,7 +112,7 @@ func (r *Receiver) Start(ctx context.Context, host component.Host) error { } // Shutdown is a method to turn off receiving. -func (r *Receiver) Shutdown(context.Context) error { +func (r *otlpReceiver) Shutdown(context.Context) error { var err error r.stopOnce.Do(func() { err = nil @@ -128,7 +128,7 @@ func (r *Receiver) Shutdown(context.Context) error { return err } -func (r *Receiver) registerTraceConsumer(ctx context.Context, tc consumer.TraceConsumer) error { +func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.TraceConsumer) error { if tc == nil { return componenterror.ErrNilNextConsumer } @@ -142,7 +142,7 @@ func (r *Receiver) registerTraceConsumer(ctx context.Context, tc consumer.TraceC return nil } -func (r *Receiver) registerMetricsConsumer(ctx context.Context, mc consumer.MetricsConsumer) error { +func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer.MetricsConsumer) error { if mc == nil { return componenterror.ErrNilNextConsumer } @@ -156,7 +156,7 @@ func (r *Receiver) registerMetricsConsumer(ctx context.Context, mc consumer.Metr return nil } -func (r *Receiver) registerLogsConsumer(ctx context.Context, tc consumer.LogsConsumer) error { +func (r *otlpReceiver) registerLogsConsumer(ctx context.Context, tc consumer.LogsConsumer) error { if tc == nil { return componenterror.ErrNilNextConsumer } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 6eec3360bdac..c1e80720a652 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -51,14 +51,14 @@ import ( "go.opentelemetry.io/collector/translator/conventions" ) -const otlpReceiver = "otlp_receiver_test" +const otlpReceiverName = "otlp_receiver_test" func TestGrpcGateway_endToEnd(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. sink := new(exportertest.SinkTraceExporter) - ocr := newHTTPReceiver(t, otlpReceiver, addr, sink, nil) + ocr := newHTTPReceiver(t, otlpReceiverName, addr, sink, nil) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") defer ocr.Shutdown(context.Background()) @@ -173,7 +173,7 @@ func TestProtoHttp(t *testing.T) { // Set the buffer count to 1 to make it flush the test span immediately. tSink := new(exportertest.SinkTraceExporter) mSink := new(exportertest.SinkMetricsExporter) - ocr := newHTTPReceiver(t, otlpReceiver, addr, tSink, mSink) + ocr := newHTTPReceiver(t, otlpReceiverName, addr, tSink, mSink) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") defer ocr.Shutdown(context.Background()) @@ -233,7 +233,7 @@ func TestGRPCNewPortAlreadyUsed(t *testing.T) { require.NoError(t, err, "failed to listen on %q: %v", addr, err) defer ln.Close() - r := newGRPCReceiver(t, otlpReceiver, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + r := newGRPCReceiver(t, otlpReceiverName, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) require.NotNil(t, r) require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) @@ -245,7 +245,7 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) { require.NoError(t, err, "failed to listen on %q: %v", addr, err) defer ln.Close() - r := newHTTPReceiver(t, otlpReceiver, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + r := newHTTPReceiver(t, otlpReceiverName, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) require.NotNil(t, r) require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) @@ -253,14 +253,14 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) { func TestGRPCStartWithoutConsumers(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - r := newGRPCReceiver(t, otlpReceiver, addr, nil, nil) + r := newGRPCReceiver(t, otlpReceiverName, addr, nil, nil) require.NotNil(t, r) require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } func TestHTTPStartWithoutConsumers(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - r := newHTTPReceiver(t, otlpReceiver, addr, nil, nil) + r := newHTTPReceiver(t, otlpReceiverName, addr, nil, nil) require.NotNil(t, r) require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } @@ -435,7 +435,7 @@ func TestHTTPInvalidTLSCredentials(t *testing.T) { `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } -func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { +func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *otlpReceiver { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.SetName(name) @@ -444,7 +444,7 @@ func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.Tra return newReceiver(t, factory, cfg, tc, mc) } -func newHTTPReceiver(t *testing.T, name string, endpoint string, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { +func newHTTPReceiver(t *testing.T, name string, endpoint string, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *otlpReceiver { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.SetName(name) @@ -453,7 +453,7 @@ func newHTTPReceiver(t *testing.T, name string, endpoint string, tc consumer.Tra return newReceiver(t, factory, cfg, tc, mc) } -func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { +func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *otlpReceiver { r, err := createReceiver(cfg) require.NoError(t, err) if tc != nil { diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 471800568e2b..90cf8a829b83 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -30,8 +30,8 @@ import ( "go.opentelemetry.io/collector/receiver/prometheusreceiver/internal" ) -// Preceiver is the type that provides Prometheus scraper/receiver functionality. -type Preceiver struct { +// pReceiver is the type that provides Prometheus scraper/receiver functionality. +type pReceiver struct { startOnce sync.Once stopOnce sync.Once cfg *Config @@ -41,8 +41,8 @@ type Preceiver struct { } // New creates a new prometheus.Receiver reference. -func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.MetricsConsumerOld) *Preceiver { - pr := &Preceiver{ +func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.MetricsConsumerOld) *pReceiver { + pr := &pReceiver{ cfg: cfg, consumer: next, logger: logger, @@ -52,7 +52,7 @@ func newPrometheusReceiver(logger *zap.Logger, cfg *Config, next consumer.Metric // Start is the method that starts Prometheus scraping and it // is controlled by having previously defined a Configuration using perhaps New. -func (pr *Preceiver) Start(ctx context.Context, host component.Host) error { +func (pr *pReceiver) Start(ctx context.Context, host component.Host) error { pr.startOnce.Do(func() { ctx := context.Background() c, cancel := context.WithCancel(ctx) @@ -106,15 +106,8 @@ func (pr *Preceiver) Start(ctx context.Context, host component.Host) error { return nil } -// Flush triggers the Flush method on the underlying Prometheus scrapers and instructs -// them to immediately sned over the metrics they've collected, to the MetricsConsumer. -// it's not needed on the new prometheus receiver implementation, let it do nothing -func (pr *Preceiver) Flush() { - -} - // Shutdown stops and cancels the underlying Prometheus scrapers. -func (pr *Preceiver) Shutdown(context.Context) error { +func (pr *pReceiver) Shutdown(context.Context) error { pr.stopOnce.Do(pr.cancel) return nil } diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index 8a021c445baa..3febb4d32832 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -57,7 +57,7 @@ import ( "go.opentelemetry.io/collector/translator/trace/zipkin" ) -const zipkinReceiver = "zipkin_receiver_test" +const zipkinReceiverName = "zipkin_receiver_test" func TestNew(t *testing.T) { type args struct { @@ -85,7 +85,7 @@ func TestNew(t *testing.T) { t.Run(tt.name, func(t *testing.T) { cfg := &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - NameVal: zipkinReceiver, + NameVal: zipkinReceiverName, }, HTTPServerSettings: confighttp.HTTPServerSettings{ Endpoint: tt.args.address, @@ -110,7 +110,7 @@ func TestZipkinReceiverPortAlreadyInUse(t *testing.T) { require.NoError(t, err, "failed to split listener address: %v", err) cfg := &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - NameVal: zipkinReceiver, + NameVal: zipkinReceiverName, }, HTTPServerSettings: confighttp.HTTPServerSettings{ Endpoint: "localhost:" + portStr, @@ -375,7 +375,7 @@ func TestStartTraceReception(t *testing.T) { sink := new(exportertest.SinkTraceExporter) cfg := &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - NameVal: zipkinReceiver, + NameVal: zipkinReceiverName, }, HTTPServerSettings: confighttp.HTTPServerSettings{ Endpoint: "localhost:0", @@ -473,7 +473,7 @@ func TestReceiverContentTypes(t *testing.T) { } cfg := &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - NameVal: zipkinReceiver, + NameVal: zipkinReceiverName, }, HTTPServerSettings: confighttp.HTTPServerSettings{ Endpoint: "", @@ -510,7 +510,7 @@ func TestReceiverInvalidContentType(t *testing.T) { } cfg := &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - NameVal: zipkinReceiver, + NameVal: zipkinReceiverName, }, HTTPServerSettings: confighttp.HTTPServerSettings{ Endpoint: "", @@ -540,7 +540,7 @@ func TestReceiverConsumerError(t *testing.T) { } cfg := &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - NameVal: zipkinReceiver, + NameVal: zipkinReceiverName, }, HTTPServerSettings: confighttp.HTTPServerSettings{ Endpoint: "localhost:9411",