Skip to content

Commit

Permalink
Rename traces methods/objects to include Traces
Browse files Browse the repository at this point in the history
  • Loading branch information
sincejune committed Apr 20, 2021
1 parent 0a2ea1b commit d432377
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 71 deletions.
16 changes: 8 additions & 8 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ const (
// FactoryOption applies changes to kafkaReceiverFactory.
type FactoryOption func(factory *kafkaReceiverFactory)

// WithAddUnmarshallers adds marshallers.
func WithAddUnmarshallers(encodingMarshaller map[string]Unmarshaller) FactoryOption {
// WithAddTracesUnmarshallers adds marshallers.
func WithAddTracesUnmarshallers(encodingMarshaller map[string]TracesUnmarshaller) FactoryOption {
return func(factory *kafkaReceiverFactory) {
for encoding, unmarshaller := range encodingMarshaller {
factory.unmarshalers[encoding] = unmarshaller
factory.tracesUnmarshalers[encoding] = unmarshaller
}
}
}
Expand All @@ -65,8 +65,8 @@ func WithAddLogsUnmarshallers(encodingMarshaller map[string]LogsUnmarshaller) Fa
// NewFactory creates Kafka receiver factory.
func NewFactory(options ...FactoryOption) component.ReceiverFactory {
f := &kafkaReceiverFactory{
unmarshalers: defaultUnmarshallers(),
logsUnmarshaller: defaultLogsUnmarshallers(),
tracesUnmarshalers: defaultTracesUnmarshallers(),
logsUnmarshaller: defaultLogsUnmarshallers(),
}
for _, o := range options {
o(f)
Expand Down Expand Up @@ -101,8 +101,8 @@ func createDefaultConfig() config.Receiver {
}

type kafkaReceiverFactory struct {
unmarshalers map[string]Unmarshaller
logsUnmarshaller map[string]LogsUnmarshaller
tracesUnmarshalers map[string]TracesUnmarshaller
logsUnmarshaller map[string]LogsUnmarshaller
}

func (f *kafkaReceiverFactory) createTracesReceiver(
Expand All @@ -112,7 +112,7 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
nextConsumer consumer.Traces,
) (component.TracesReceiver, error) {
c := cfg.(*Config)
r, err := newReceiver(*c, params, f.unmarshalers, nextConsumer)
r, err := newTracesReceiver(*c, params, f.tracesUnmarshalers, nextConsumer)
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCreateTracesReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaReceiverFactory{unmarshalers: defaultUnmarshallers()}
f := kafkaReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshallers()}
r, err := f.createTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
// no available broker
require.Error(t, err)
Expand All @@ -52,15 +52,15 @@ func TestCreateTracesReceiver_error(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// disable contacting broker at startup
cfg.Metadata.Full = false
f := kafkaReceiverFactory{unmarshalers: defaultUnmarshallers()}
f := kafkaReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshallers()}
r, err := f.createTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestWithUnmarshallers(t *testing.T) {
unmarshaller := &customUnmarshaller{}
f := NewFactory(WithAddUnmarshallers(map[string]Unmarshaller{unmarshaller.Encoding(): unmarshaller}))
func TestWithTracesUnmarshallers(t *testing.T) {
unmarshaller := &customTracesUnmarshaller{}
f := NewFactory(WithAddTracesUnmarshallers(map[string]TracesUnmarshaller{unmarshaller.Encoding(): unmarshaller}))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
Expand Down Expand Up @@ -124,19 +124,19 @@ func TestWithLogsUnmarshallers(t *testing.T) {
})
}

type customUnmarshaller struct {
type customTracesUnmarshaller struct {
}

type customLogsUnmarshaller struct {
}

var _ Unmarshaller = (*customUnmarshaller)(nil)
var _ TracesUnmarshaller = (*customTracesUnmarshaller)(nil)

func (c customUnmarshaller) Unmarshal([]byte) (pdata.Traces, error) {
func (c customTracesUnmarshaller) Unmarshal([]byte) (pdata.Traces, error) {
panic("implement me")
}

func (c customUnmarshaller) Encoding() string {
func (c customTracesUnmarshaller) Encoding() string {
return "custom"
}

Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/jaeger_unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
type jaegerProtoSpanUnmarshaller struct {
}

var _ Unmarshaller = (*jaegerProtoSpanUnmarshaller)(nil)
var _ TracesUnmarshaller = (*jaegerProtoSpanUnmarshaller)(nil)

func (j jaegerProtoSpanUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) {
span := &jaegerproto.Span{}
Expand All @@ -45,7 +45,7 @@ func (j jaegerProtoSpanUnmarshaller) Encoding() string {
type jaegerJSONSpanUnmarshaller struct {
}

var _ Unmarshaller = (*jaegerJSONSpanUnmarshaller)(nil)
var _ TracesUnmarshaller = (*jaegerJSONSpanUnmarshaller)(nil)

func (j jaegerJSONSpanUnmarshaller) Unmarshal(data []byte) (pdata.Traces, error) {
span := &jaegerproto.Span{}
Expand Down
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/jaeger_unmarshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestUnmarshallJaeger(t *testing.T) {
jsonMarshaller.Marshal(jsonBytes, batches[0].Spans[0])

tests := []struct {
unmarshaller Unmarshaller
unmarshaller TracesUnmarshaller
encoding string
bytes []byte
}{
Expand Down
32 changes: 16 additions & 16 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ const (

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")

// kafkaConsumer uses sarama to consume and handle messages from kafka.
type kafkaConsumer struct {
// kafkaTracesConsumer uses sarama to consume and handle messages from kafka.
type kafkaTracesConsumer struct {
name string
consumerGroup sarama.ConsumerGroup
nextConsumer consumer.Traces
topics []string
cancelConsumeLoop context.CancelFunc
unmarshaller Unmarshaller
unmarshaller TracesUnmarshaller

logger *zap.Logger
}
Expand All @@ -60,10 +60,10 @@ type kafkaLogsConsumer struct {
logger *zap.Logger
}

var _ component.Receiver = (*kafkaConsumer)(nil)
var _ component.Receiver = (*kafkaTracesConsumer)(nil)
var _ component.Receiver = (*kafkaLogsConsumer)(nil)

func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.Traces) (*kafkaConsumer, error) {
func newTracesReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]TracesUnmarshaller, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) {
unmarshaller := unmarshalers[config.Encoding]
if unmarshaller == nil {
return nil, errUnrecognizedEncoding
Expand All @@ -88,7 +88,7 @@ func newReceiver(config Config, params component.ReceiverCreateParams, unmarshal
if err != nil {
return nil, err
}
return &kafkaConsumer{
return &kafkaTracesConsumer{
name: config.Name(),
consumerGroup: client,
topics: []string{config.Topic},
Expand All @@ -98,10 +98,10 @@ func newReceiver(config Config, params component.ReceiverCreateParams, unmarshal
}, nil
}

func (c *kafkaConsumer) Start(context.Context, component.Host) error {
func (c *kafkaTracesConsumer) Start(context.Context, component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel
consumerGroup := &consumerGroupHandler{
consumerGroup := &tracesConsumerGroupHandler{
name: c.name,
logger: c.logger,
unmarshaller: c.unmarshaller,
Expand All @@ -113,7 +113,7 @@ func (c *kafkaConsumer) Start(context.Context, component.Host) error {
return nil
}

func (c *kafkaConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
Expand All @@ -129,7 +129,7 @@ func (c *kafkaConsumer) consumeLoop(ctx context.Context, handler sarama.Consumer
}
}

func (c *kafkaConsumer) Shutdown(context.Context) error {
func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
c.cancelConsumeLoop()
return c.consumerGroup.Close()
}
Expand Down Expand Up @@ -205,9 +205,9 @@ func (c *kafkaLogsConsumer) Shutdown(context.Context) error {
return c.consumerGroup.Close()
}

type consumerGroupHandler struct {
type tracesConsumerGroupHandler struct {
name string
unmarshaller Unmarshaller
unmarshaller TracesUnmarshaller
nextConsumer consumer.Traces
ready chan bool
readyCloser sync.Once
Expand All @@ -225,10 +225,10 @@ type logsConsumerGroupHandler struct {
logger *zap.Logger
}

var _ sarama.ConsumerGroupHandler = (*consumerGroupHandler)(nil)
var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil)
var _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil)

func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
func (c *tracesConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
close(c.ready)
})
Expand All @@ -237,13 +237,13 @@ func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error
return nil
}

func (c *consumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1))
return nil
}

func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
for message := range claim.Messages() {
c.logger.Debug("Kafka message claimed",
Expand Down
40 changes: 19 additions & 21 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,27 @@ import (
"go.opentelemetry.io/collector/internal/testdata"
)

func TestNewReceiver_version_err(t *testing.T) {
func TestNewTracesReceiver_version_err(t *testing.T) {
c := Config{
Encoding: defaultEncoding,
ProtocolVersion: "none",
}
r, err := newReceiver(c, component.ReceiverCreateParams{}, defaultUnmarshallers(), consumertest.NewNop())
r, err := newTracesReceiver(c, component.ReceiverCreateParams{}, defaultTracesUnmarshallers(), consumertest.NewNop())
assert.Error(t, err)
assert.Nil(t, r)
}

func TestNewReceiver_encoding_err(t *testing.T) {
func TestNewTracesReceiver_encoding_err(t *testing.T) {
c := Config{
Encoding: "foo",
}
r, err := newReceiver(c, component.ReceiverCreateParams{}, defaultUnmarshallers(), consumertest.NewNop())
r, err := newTracesReceiver(c, component.ReceiverCreateParams{}, defaultTracesUnmarshallers(), consumertest.NewNop())
require.Error(t, err)
assert.Nil(t, r)
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
}

func TestNewReceiver_err_auth_type(t *testing.T) {
func TestNewTracesReceiver_err_auth_type(t *testing.T) {
c := Config{
ProtocolVersion: "2.0.0",
Authentication: kafkaexporter.Authentication{
Expand All @@ -73,15 +73,15 @@ func TestNewReceiver_err_auth_type(t *testing.T) {
Full: false,
},
}
r, err := newReceiver(c, component.ReceiverCreateParams{}, defaultUnmarshallers(), consumertest.NewNop())
r, err := newTracesReceiver(c, component.ReceiverCreateParams{}, defaultTracesUnmarshallers(), consumertest.NewNop())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
assert.Nil(t, r)
}

func TestReceiverStart(t *testing.T) {
func TestTracesReceiverStart(t *testing.T) {
testClient := testConsumerGroup{once: &sync.Once{}}
c := kafkaConsumer{
c := kafkaTracesConsumer{
nextConsumer: consumertest.NewNop(),
logger: zap.NewNop(),
consumerGroup: testClient,
Expand All @@ -92,29 +92,29 @@ func TestReceiverStart(t *testing.T) {
c.Shutdown(context.Background())
}

func TestReceiverStartConsume(t *testing.T) {
func TestTracesReceiverStartConsume(t *testing.T) {
testClient := testConsumerGroup{once: &sync.Once{}}
c := kafkaConsumer{
c := kafkaTracesConsumer{
nextConsumer: consumertest.NewNop(),
logger: zap.NewNop(),
consumerGroup: testClient,
}
ctx, cancelFunc := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancelFunc
c.Shutdown(context.Background())
err := c.consumeLoop(ctx, &consumerGroupHandler{
err := c.consumeLoop(ctx, &tracesConsumerGroupHandler{
ready: make(chan bool),
})
assert.EqualError(t, err, context.Canceled.Error())
}

func TestReceiver_error(t *testing.T) {
func TestTracesReceiver_error(t *testing.T) {
zcore, logObserver := observer.New(zapcore.ErrorLevel)
logger := zap.New(zcore)

expectedErr := fmt.Errorf("handler error")
testClient := testConsumerGroup{once: &sync.Once{}, err: expectedErr}
c := kafkaConsumer{
c := kafkaTracesConsumer{
nextConsumer: consumertest.NewNop(),
logger: logger,
consumerGroup: testClient,
Expand All @@ -129,12 +129,12 @@ func TestReceiver_error(t *testing.T) {
assert.True(t, logObserver.FilterField(zap.Error(expectedErr)).Len() > 0)
}

func TestConsumerGroupHandler(t *testing.T) {
func TestTracesConsumerGroupHandler(t *testing.T) {
views := MetricViews()
view.Register(views...)
defer view.Unregister(views...)

c := consumerGroupHandler{
c := tracesConsumerGroupHandler{
unmarshaller: &otlpTracesPbUnmarshaller{},
logger: zap.NewNop(),
ready: make(chan bool),
Expand Down Expand Up @@ -177,8 +177,8 @@ func TestConsumerGroupHandler(t *testing.T) {
wg.Wait()
}

func TestConsumerGroupHandler_error_unmarshall(t *testing.T) {
c := consumerGroupHandler{
func TestTracesConsumerGroupHandler_error_unmarshall(t *testing.T) {
c := tracesConsumerGroupHandler{
unmarshaller: &otlpTracesPbUnmarshaller{},
logger: zap.NewNop(),
ready: make(chan bool),
Expand All @@ -200,9 +200,9 @@ func TestConsumerGroupHandler_error_unmarshall(t *testing.T) {
wg.Wait()
}

func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) {
func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
consumerError := errors.New("failed to consumer")
c := consumerGroupHandler{
c := tracesConsumerGroupHandler{
unmarshaller: &otlpTracesPbUnmarshaller{},
logger: zap.NewNop(),
ready: make(chan bool),
Expand Down Expand Up @@ -419,8 +419,6 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
wg.Wait()
}

// end

type testConsumerGroupClaim struct {
messageChan chan *sarama.ConsumerMessage
}
Expand Down
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/otlp_unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
type otlpTracesPbUnmarshaller struct {
}

var _ Unmarshaller = (*otlpTracesPbUnmarshaller)(nil)
var _ TracesUnmarshaller = (*otlpTracesPbUnmarshaller)(nil)

func (p *otlpTracesPbUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) {
return pdata.TracesFromOtlpProtoBytes(bytes)
Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/otlp_unmarshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"go.opentelemetry.io/collector/internal/testdata"
)

func TestUnmarshallOTLP(t *testing.T) {
func TestUnmarshallOTLPTraces(t *testing.T) {
td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).Resource().Attributes().InsertString("foo", "bar")
Expand All @@ -40,7 +40,7 @@ func TestUnmarshallOTLP(t *testing.T) {
assert.Equal(t, "otlp_proto", p.Encoding())
}

func TestUnmarshallOTLP_error(t *testing.T) {
func TestUnmarshallOTLPTraces_error(t *testing.T) {
p := otlpTracesPbUnmarshaller{}
_, err := p.Unmarshal([]byte("+$%"))
assert.Error(t, err)
Expand Down
Loading

0 comments on commit d432377

Please sign in to comment.