diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index fe637883d2b..bfc51ec41a1 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -44,11 +44,11 @@ const ( // FactoryOption applies changes to kafkaReceiverFactory. type FactoryOption func(factory *kafkaReceiverFactory) -// WithAddUnmarshallers adds marshallers. -func WithAddUnmarshallers(encodingMarshaller map[string]Unmarshaller) FactoryOption { +// WithAddTracesUnmarshallers adds marshallers. +func WithAddTracesUnmarshallers(encodingMarshaller map[string]TracesUnmarshaller) FactoryOption { return func(factory *kafkaReceiverFactory) { for encoding, unmarshaller := range encodingMarshaller { - factory.unmarshalers[encoding] = unmarshaller + factory.tracesUnmarshalers[encoding] = unmarshaller } } } @@ -65,8 +65,8 @@ func WithAddLogsUnmarshallers(encodingMarshaller map[string]LogsUnmarshaller) Fa // NewFactory creates Kafka receiver factory. func NewFactory(options ...FactoryOption) component.ReceiverFactory { f := &kafkaReceiverFactory{ - unmarshalers: defaultUnmarshallers(), - logsUnmarshaller: defaultLogsUnmarshallers(), + tracesUnmarshalers: defaultTracesUnmarshallers(), + logsUnmarshaller: defaultLogsUnmarshallers(), } for _, o := range options { o(f) @@ -101,8 +101,8 @@ func createDefaultConfig() config.Receiver { } type kafkaReceiverFactory struct { - unmarshalers map[string]Unmarshaller - logsUnmarshaller map[string]LogsUnmarshaller + tracesUnmarshalers map[string]TracesUnmarshaller + logsUnmarshaller map[string]LogsUnmarshaller } func (f *kafkaReceiverFactory) createTracesReceiver( @@ -112,7 +112,7 @@ func (f *kafkaReceiverFactory) createTracesReceiver( nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { c := cfg.(*Config) - r, err := newReceiver(*c, params, f.unmarshalers, nextConsumer) + r, err := newTracesReceiver(*c, params, f.tracesUnmarshalers, nextConsumer) if err != nil { return nil, err } diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index bd6b0fb6b1c..9d523229ab6 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -40,7 +40,7 @@ func TestCreateTracesReceiver(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Brokers = []string{"invalid:9092"} cfg.ProtocolVersion = "2.0.0" - f := kafkaReceiverFactory{unmarshalers: defaultUnmarshallers()} + f := kafkaReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshallers()} r, err := f.createTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) // no available broker require.Error(t, err) @@ -52,15 +52,15 @@ func TestCreateTracesReceiver_error(t *testing.T) { cfg.ProtocolVersion = "2.0.0" // disable contacting broker at startup cfg.Metadata.Full = false - f := kafkaReceiverFactory{unmarshalers: defaultUnmarshallers()} + f := kafkaReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshallers()} r, err := f.createTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) require.NoError(t, err) assert.NotNil(t, r) } -func TestWithUnmarshallers(t *testing.T) { - unmarshaller := &customUnmarshaller{} - f := NewFactory(WithAddUnmarshallers(map[string]Unmarshaller{unmarshaller.Encoding(): unmarshaller})) +func TestWithTracesUnmarshallers(t *testing.T) { + unmarshaller := &customTracesUnmarshaller{} + f := NewFactory(WithAddTracesUnmarshallers(map[string]TracesUnmarshaller{unmarshaller.Encoding(): unmarshaller})) cfg := createDefaultConfig().(*Config) // disable contacting broker cfg.Metadata.Full = false @@ -124,19 +124,19 @@ func TestWithLogsUnmarshallers(t *testing.T) { }) } -type customUnmarshaller struct { +type customTracesUnmarshaller struct { } type customLogsUnmarshaller struct { } -var _ Unmarshaller = (*customUnmarshaller)(nil) +var _ TracesUnmarshaller = (*customTracesUnmarshaller)(nil) -func (c customUnmarshaller) Unmarshal([]byte) (pdata.Traces, error) { +func (c customTracesUnmarshaller) Unmarshal([]byte) (pdata.Traces, error) { panic("implement me") } -func (c customUnmarshaller) Encoding() string { +func (c customTracesUnmarshaller) Encoding() string { return "custom" } diff --git a/receiver/kafkareceiver/jaeger_unmarshaller.go b/receiver/kafkareceiver/jaeger_unmarshaller.go index 579f7f6e1d5..6a48865f0eb 100644 --- a/receiver/kafkareceiver/jaeger_unmarshaller.go +++ b/receiver/kafkareceiver/jaeger_unmarshaller.go @@ -27,7 +27,7 @@ import ( type jaegerProtoSpanUnmarshaller struct { } -var _ Unmarshaller = (*jaegerProtoSpanUnmarshaller)(nil) +var _ TracesUnmarshaller = (*jaegerProtoSpanUnmarshaller)(nil) func (j jaegerProtoSpanUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { span := &jaegerproto.Span{} @@ -45,7 +45,7 @@ func (j jaegerProtoSpanUnmarshaller) Encoding() string { type jaegerJSONSpanUnmarshaller struct { } -var _ Unmarshaller = (*jaegerJSONSpanUnmarshaller)(nil) +var _ TracesUnmarshaller = (*jaegerJSONSpanUnmarshaller)(nil) func (j jaegerJSONSpanUnmarshaller) Unmarshal(data []byte) (pdata.Traces, error) { span := &jaegerproto.Span{} diff --git a/receiver/kafkareceiver/jaeger_unmarshaller_test.go b/receiver/kafkareceiver/jaeger_unmarshaller_test.go index 919488ca12d..af3aa4574c3 100644 --- a/receiver/kafkareceiver/jaeger_unmarshaller_test.go +++ b/receiver/kafkareceiver/jaeger_unmarshaller_test.go @@ -47,7 +47,7 @@ func TestUnmarshallJaeger(t *testing.T) { jsonMarshaller.Marshal(jsonBytes, batches[0].Spans[0]) tests := []struct { - unmarshaller Unmarshaller + unmarshaller TracesUnmarshaller encoding string bytes []byte }{ diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 65825a1118a..42164259540 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -36,14 +36,14 @@ const ( var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") -// kafkaConsumer uses sarama to consume and handle messages from kafka. -type kafkaConsumer struct { +// kafkaTracesConsumer uses sarama to consume and handle messages from kafka. +type kafkaTracesConsumer struct { name string consumerGroup sarama.ConsumerGroup nextConsumer consumer.Traces topics []string cancelConsumeLoop context.CancelFunc - unmarshaller Unmarshaller + unmarshaller TracesUnmarshaller logger *zap.Logger } @@ -60,10 +60,10 @@ type kafkaLogsConsumer struct { logger *zap.Logger } -var _ component.Receiver = (*kafkaConsumer)(nil) +var _ component.Receiver = (*kafkaTracesConsumer)(nil) var _ component.Receiver = (*kafkaLogsConsumer)(nil) -func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.Traces) (*kafkaConsumer, error) { +func newTracesReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]TracesUnmarshaller, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) { unmarshaller := unmarshalers[config.Encoding] if unmarshaller == nil { return nil, errUnrecognizedEncoding @@ -88,7 +88,7 @@ func newReceiver(config Config, params component.ReceiverCreateParams, unmarshal if err != nil { return nil, err } - return &kafkaConsumer{ + return &kafkaTracesConsumer{ name: config.Name(), consumerGroup: client, topics: []string{config.Topic}, @@ -98,10 +98,10 @@ func newReceiver(config Config, params component.ReceiverCreateParams, unmarshal }, nil } -func (c *kafkaConsumer) Start(context.Context, component.Host) error { +func (c *kafkaTracesConsumer) Start(context.Context, component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancel - consumerGroup := &consumerGroupHandler{ + consumerGroup := &tracesConsumerGroupHandler{ name: c.name, logger: c.logger, unmarshaller: c.unmarshaller, @@ -113,7 +113,7 @@ func (c *kafkaConsumer) Start(context.Context, component.Host) error { return nil } -func (c *kafkaConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { +func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be @@ -129,7 +129,7 @@ func (c *kafkaConsumer) consumeLoop(ctx context.Context, handler sarama.Consumer } } -func (c *kafkaConsumer) Shutdown(context.Context) error { +func (c *kafkaTracesConsumer) Shutdown(context.Context) error { c.cancelConsumeLoop() return c.consumerGroup.Close() } @@ -205,9 +205,9 @@ func (c *kafkaLogsConsumer) Shutdown(context.Context) error { return c.consumerGroup.Close() } -type consumerGroupHandler struct { +type tracesConsumerGroupHandler struct { name string - unmarshaller Unmarshaller + unmarshaller TracesUnmarshaller nextConsumer consumer.Traces ready chan bool readyCloser sync.Once @@ -225,10 +225,10 @@ type logsConsumerGroupHandler struct { logger *zap.Logger } -var _ sarama.ConsumerGroupHandler = (*consumerGroupHandler)(nil) +var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil) var _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil) -func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { +func (c *tracesConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { c.readyCloser.Do(func() { close(c.ready) }) @@ -237,13 +237,13 @@ func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error return nil } -func (c *consumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { +func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)} _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1)) return nil } -func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { +func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) for message := range claim.Messages() { c.logger.Debug("Kafka message claimed", diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 114612a263f..2e290f3459f 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -38,27 +38,27 @@ import ( "go.opentelemetry.io/collector/internal/testdata" ) -func TestNewReceiver_version_err(t *testing.T) { +func TestNewTracesReceiver_version_err(t *testing.T) { c := Config{ Encoding: defaultEncoding, ProtocolVersion: "none", } - r, err := newReceiver(c, component.ReceiverCreateParams{}, defaultUnmarshallers(), consumertest.NewNop()) + r, err := newTracesReceiver(c, component.ReceiverCreateParams{}, defaultTracesUnmarshallers(), consumertest.NewNop()) assert.Error(t, err) assert.Nil(t, r) } -func TestNewReceiver_encoding_err(t *testing.T) { +func TestNewTracesReceiver_encoding_err(t *testing.T) { c := Config{ Encoding: "foo", } - r, err := newReceiver(c, component.ReceiverCreateParams{}, defaultUnmarshallers(), consumertest.NewNop()) + r, err := newTracesReceiver(c, component.ReceiverCreateParams{}, defaultTracesUnmarshallers(), consumertest.NewNop()) require.Error(t, err) assert.Nil(t, r) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) } -func TestNewReceiver_err_auth_type(t *testing.T) { +func TestNewTracesReceiver_err_auth_type(t *testing.T) { c := Config{ ProtocolVersion: "2.0.0", Authentication: kafkaexporter.Authentication{ @@ -73,15 +73,15 @@ func TestNewReceiver_err_auth_type(t *testing.T) { Full: false, }, } - r, err := newReceiver(c, component.ReceiverCreateParams{}, defaultUnmarshallers(), consumertest.NewNop()) + r, err := newTracesReceiver(c, component.ReceiverCreateParams{}, defaultTracesUnmarshallers(), consumertest.NewNop()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") assert.Nil(t, r) } -func TestReceiverStart(t *testing.T) { +func TestTracesReceiverStart(t *testing.T) { testClient := testConsumerGroup{once: &sync.Once{}} - c := kafkaConsumer{ + c := kafkaTracesConsumer{ nextConsumer: consumertest.NewNop(), logger: zap.NewNop(), consumerGroup: testClient, @@ -92,9 +92,9 @@ func TestReceiverStart(t *testing.T) { c.Shutdown(context.Background()) } -func TestReceiverStartConsume(t *testing.T) { +func TestTracesReceiverStartConsume(t *testing.T) { testClient := testConsumerGroup{once: &sync.Once{}} - c := kafkaConsumer{ + c := kafkaTracesConsumer{ nextConsumer: consumertest.NewNop(), logger: zap.NewNop(), consumerGroup: testClient, @@ -102,19 +102,19 @@ func TestReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc c.Shutdown(context.Background()) - err := c.consumeLoop(ctx, &consumerGroupHandler{ + err := c.consumeLoop(ctx, &tracesConsumerGroupHandler{ ready: make(chan bool), }) assert.EqualError(t, err, context.Canceled.Error()) } -func TestReceiver_error(t *testing.T) { +func TestTracesReceiver_error(t *testing.T) { zcore, logObserver := observer.New(zapcore.ErrorLevel) logger := zap.New(zcore) expectedErr := fmt.Errorf("handler error") testClient := testConsumerGroup{once: &sync.Once{}, err: expectedErr} - c := kafkaConsumer{ + c := kafkaTracesConsumer{ nextConsumer: consumertest.NewNop(), logger: logger, consumerGroup: testClient, @@ -129,12 +129,12 @@ func TestReceiver_error(t *testing.T) { assert.True(t, logObserver.FilterField(zap.Error(expectedErr)).Len() > 0) } -func TestConsumerGroupHandler(t *testing.T) { +func TestTracesConsumerGroupHandler(t *testing.T) { views := MetricViews() view.Register(views...) defer view.Unregister(views...) - c := consumerGroupHandler{ + c := tracesConsumerGroupHandler{ unmarshaller: &otlpTracesPbUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), @@ -177,8 +177,8 @@ func TestConsumerGroupHandler(t *testing.T) { wg.Wait() } -func TestConsumerGroupHandler_error_unmarshall(t *testing.T) { - c := consumerGroupHandler{ +func TestTracesConsumerGroupHandler_error_unmarshall(t *testing.T) { + c := tracesConsumerGroupHandler{ unmarshaller: &otlpTracesPbUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), @@ -200,9 +200,9 @@ func TestConsumerGroupHandler_error_unmarshall(t *testing.T) { wg.Wait() } -func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) { +func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consumer") - c := consumerGroupHandler{ + c := tracesConsumerGroupHandler{ unmarshaller: &otlpTracesPbUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), @@ -419,8 +419,6 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { wg.Wait() } -// end - type testConsumerGroupClaim struct { messageChan chan *sarama.ConsumerMessage } diff --git a/receiver/kafkareceiver/otlp_unmarshaller.go b/receiver/kafkareceiver/otlp_unmarshaller.go index a6dd53f5326..d798f547636 100644 --- a/receiver/kafkareceiver/otlp_unmarshaller.go +++ b/receiver/kafkareceiver/otlp_unmarshaller.go @@ -21,7 +21,7 @@ import ( type otlpTracesPbUnmarshaller struct { } -var _ Unmarshaller = (*otlpTracesPbUnmarshaller)(nil) +var _ TracesUnmarshaller = (*otlpTracesPbUnmarshaller)(nil) func (p *otlpTracesPbUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { return pdata.TracesFromOtlpProtoBytes(bytes) diff --git a/receiver/kafkareceiver/otlp_unmarshaller_test.go b/receiver/kafkareceiver/otlp_unmarshaller_test.go index 2259e1f8aaf..350fa3f7cc1 100644 --- a/receiver/kafkareceiver/otlp_unmarshaller_test.go +++ b/receiver/kafkareceiver/otlp_unmarshaller_test.go @@ -24,7 +24,7 @@ import ( "go.opentelemetry.io/collector/internal/testdata" ) -func TestUnmarshallOTLP(t *testing.T) { +func TestUnmarshallOTLPTraces(t *testing.T) { td := pdata.NewTraces() td.ResourceSpans().Resize(1) td.ResourceSpans().At(0).Resource().Attributes().InsertString("foo", "bar") @@ -40,7 +40,7 @@ func TestUnmarshallOTLP(t *testing.T) { assert.Equal(t, "otlp_proto", p.Encoding()) } -func TestUnmarshallOTLP_error(t *testing.T) { +func TestUnmarshallOTLPTraces_error(t *testing.T) { p := otlpTracesPbUnmarshaller{} _, err := p.Unmarshal([]byte("+$%")) assert.Error(t, err) diff --git a/receiver/kafkareceiver/unmarshaller.go b/receiver/kafkareceiver/unmarshaller.go index 57973b75a0d..9ae6a23f70a 100644 --- a/receiver/kafkareceiver/unmarshaller.go +++ b/receiver/kafkareceiver/unmarshaller.go @@ -18,8 +18,8 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) -// Unmarshaller deserializes the message body. -type Unmarshaller interface { +// TracesUnmarshaller deserializes the message body. +type TracesUnmarshaller interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (pdata.Traces, error) @@ -36,15 +36,15 @@ type LogsUnmarshaller interface { Encoding() string } -// defaultUnmarshallers returns map of supported encodings with Unmarshaller. -func defaultUnmarshallers() map[string]Unmarshaller { +// defaultTracesUnmarshallers returns map of supported encodings with TracesUnmarshaller. +func defaultTracesUnmarshallers() map[string]TracesUnmarshaller { otlp := &otlpTracesPbUnmarshaller{} jaegerProto := jaegerProtoSpanUnmarshaller{} jaegerJSON := jaegerJSONSpanUnmarshaller{} zipkinProto := zipkinProtoSpanUnmarshaller{} zipkinJSON := zipkinJSONSpanUnmarshaller{} zipkinThrift := zipkinThriftSpanUnmarshaller{} - return map[string]Unmarshaller{ + return map[string]TracesUnmarshaller{ otlp.Encoding(): otlp, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, diff --git a/receiver/kafkareceiver/unmarshaller_test.go b/receiver/kafkareceiver/unmarshaller_test.go index 8c6b39ed834..c7795b8db25 100644 --- a/receiver/kafkareceiver/unmarshaller_test.go +++ b/receiver/kafkareceiver/unmarshaller_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestDefaultUnMarshaller(t *testing.T) { +func TestDefaultTracesUnMarshaller(t *testing.T) { expectedEncodings := []string{ "otlp_proto", "jaeger_proto", @@ -30,7 +30,7 @@ func TestDefaultUnMarshaller(t *testing.T) { "zipkin_json", "zipkin_thrift", } - marshallers := defaultUnmarshallers() + marshallers := defaultTracesUnmarshallers() assert.Equal(t, len(expectedEncodings), len(marshallers)) for _, e := range expectedEncodings { t.Run(e, func(t *testing.T) { diff --git a/receiver/kafkareceiver/zipkin_unmarshaller.go b/receiver/kafkareceiver/zipkin_unmarshaller.go index 698cffa97f5..c873bae62be 100644 --- a/receiver/kafkareceiver/zipkin_unmarshaller.go +++ b/receiver/kafkareceiver/zipkin_unmarshaller.go @@ -29,7 +29,7 @@ import ( type zipkinProtoSpanUnmarshaller struct { } -var _ Unmarshaller = (*zipkinProtoSpanUnmarshaller)(nil) +var _ TracesUnmarshaller = (*zipkinProtoSpanUnmarshaller)(nil) func (z zipkinProtoSpanUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { parseSpans, err := zipkin_proto3.ParseSpans(bytes, false) @@ -46,7 +46,7 @@ func (z zipkinProtoSpanUnmarshaller) Encoding() string { type zipkinJSONSpanUnmarshaller struct { } -var _ Unmarshaller = (*zipkinJSONSpanUnmarshaller)(nil) +var _ TracesUnmarshaller = (*zipkinJSONSpanUnmarshaller)(nil) func (z zipkinJSONSpanUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { var spans []*zipkinmodel.SpanModel @@ -63,7 +63,7 @@ func (z zipkinJSONSpanUnmarshaller) Encoding() string { type zipkinThriftSpanUnmarshaller struct { } -var _ Unmarshaller = (*zipkinThriftSpanUnmarshaller)(nil) +var _ TracesUnmarshaller = (*zipkinThriftSpanUnmarshaller)(nil) func (z zipkinThriftSpanUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { spans, err := deserializeZipkinThrift(bytes) diff --git a/receiver/kafkareceiver/zipkin_unmarshaller_test.go b/receiver/kafkareceiver/zipkin_unmarshaller_test.go index ed947672032..469f5859092 100644 --- a/receiver/kafkareceiver/zipkin_unmarshaller_test.go +++ b/receiver/kafkareceiver/zipkin_unmarshaller_test.go @@ -64,7 +64,7 @@ func TestUnmarshallZipkin(t *testing.T) { require.NoError(t, err) tests := []struct { - unmarshaller Unmarshaller + unmarshaller TracesUnmarshaller encoding string bytes []byte expected pdata.Traces