Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/kafka] support custom metrics & logs marshalers #8248

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
}
}

// WithMetricsMarshalers adds metricsMarshalers.
func WithMetricsMarshalers(metricsMarshalers ...MetricsMarshaler) FactoryOption {
Comment on lines +59 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being references anywhere within that would allow for these to ether be added by default or added via some configuration.

Can I ask how you'd want someone to use this in their exporter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MovieStoreGuy I think the question for how to use these factory options predates this PR. The WithTraceMarshalers() option existed before this PR. I've attempted 2 methods to utilize the custom options:

  1. pass in explicit options to NewFactory() in components.go. However, this isn't supported with the collector-builder and I could no longer use it
  2. create a custom exporter factory where the NewFactory() just delegates to kafkaexporter.NewFactory() with additional options

Taking a step back, are you aware of precedence for extending subcomponents of exporters/receivers? At a high level, extensions for auth looked close, but when I looked deeper the requirements for high-level abstractions to exist in the collector didn't feel right either (e.g go.opentelemetry.io/collector/config/configauth). I know you are aware #8272 which would have been less messy if custom marshalers could be more easily added.

return func(factory *kafkaExporterFactory) {
for _, marshaler := range metricsMarshalers {
factory.metricsMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// WithLogsMarshalers adds logsMarshalers.
func WithLogsMarshalers(logsMarshalers ...LogsMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range logsMarshalers {
factory.logsMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// NewFactory creates Kafka exporter factory.
func NewFactory(options ...FactoryOption) component.ExporterFactory {
f := &kafkaExporterFactory{
Expand Down
78 changes: 72 additions & 6 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func TestCreateLogsExporter_err(t *testing.T) {
assert.Nil(t, mr)
}

func TestWithMarshalers(t *testing.T) {
cm := &customMarshaler{}
func TestWithTracesMarshalers(t *testing.T) {
cm := &customTracesMarshaler{}
f := NewFactory(WithTracesMarshalers(cm))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
Expand All @@ -122,15 +122,81 @@ func TestWithMarshalers(t *testing.T) {
})
}

type customMarshaler struct {
type customTracesMarshaler struct {}

var _ TracesMarshaler = (*customTracesMarshaler)(nil)

func (c customTracesMarshaler) Marshal(_ pdata.Traces, topic string) ([]*sarama.ProducerMessage, error) {
panic("implement me")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly sure why this should panic in test code, what are you trying to achieve here?

}

func (c customTracesMarshaler) Encoding() string {
return "custom"
}

func TestWithLogsMarshalers(t *testing.T) {
cm := &customLogsMarshaler{}
f := NewFactory(WithLogsMarshalers(cm))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = cm.Encoding()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this tests actually are only somewhat useful. They only check that Encoding() function yields a result and factory accepts WithLogsMarshalers/WithMetricMarshalers.

TBH, the currently existing test (the only one which calls WithTracesMarshallers) is not very useful either for the same reasons.

Perhaps that's OK (what's your guidance on this @MovieStoreGuy @pavolloffay?). I was thinking that maybe some sorts of tests matching non default marshaler encoding could be created instead, but perhaps it's too much of a stretch for a relatively simple API change here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, didn't mean to leave this for so long. Github notifications are not my friend for some time now.

Currently late evening atm, so I come to review back tomorrow to full understand what is happening.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with @pmm-sumo.

The minimal required testing of custom encodings is that the encoded data matches a known output.
Testing that the set encoding matches the expected result is just the start.

exporter, err := f.CreateLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
exporter, err := f.CreateLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
}

type customLogsMarshaler struct {}

var _ LogsMarshaler = (*customLogsMarshaler)(nil)

func (c customLogsMarshaler) Marshal(logs pdata.Logs, topic string) ([]*sarama.ProducerMessage, error) {
panic("implement me")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really worry about panicing in tests, it isn't go idiomatic and not discouraged by the contribution guide.

Can I ask what you're wanting to achieve here?

}

func (c customLogsMarshaler) Encoding() string {
return "custom"
}

func TestWithMetricsMarshalers(t *testing.T) {
cm := &customMetricsMarshaler{}
f := NewFactory(WithMetricsMarshalers(cm))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = cm.Encoding()
exporter, err := f.CreateMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
exporter, err := f.CreateMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
}

var _ TracesMarshaler = (*customMarshaler)(nil)
type customMetricsMarshaler struct {}

var _ MetricsMarshaler = (*customMetricsMarshaler)(nil)

func (c customMarshaler) Marshal(_ pdata.Traces, topic string) ([]*sarama.ProducerMessage, error) {
func (c customMetricsMarshaler) Marshal(metrics pdata.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
panic("implement me")
}

func (c customMarshaler) Encoding() string {
func (c customMetricsMarshaler) Encoding() string {
return "custom"
}