Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restructured fileexporter to prepare for group_by #31068

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions exporter/fileexporter/error_component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"

import (
"context"

"go.opentelemetry.io/collector/component"
)

// errorComponent is used to return error from a factory method. SharedComponent does
// not handle errors, so wrapping the error into a component is necessary.
type errorComponent struct {
err error
}

// Start will return the cached error.
func (e *errorComponent) Start(context.Context, component.Host) error {
return e.err
}

// Shutdown will return the cached error.
func (e *errorComponent) Shutdown(context.Context) error {
return e.err
}
108 changes: 73 additions & 35 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"context"
"io"
"os"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter/internal/metadata"
Expand All @@ -30,6 +34,13 @@ const (
compressionZSTD = "zstd"
)

type FileExporter interface {
component.Component
consumeTraces(_ context.Context, td ptrace.Traces) error
consumeMetrics(_ context.Context, md pmetric.Metrics) error
consumeLogs(_ context.Context, ld plog.Logs) error
}

// NewFactory creates a factory for OTLP exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
Expand All @@ -52,19 +63,15 @@ func createTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
conf := cfg.(*Config)
writer, err := buildFileWriter(conf)
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf, writer)
})
return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
fe.Unwrap().(*fileExporter).consumeTraces,
fe.consumeTraces,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
Expand All @@ -76,19 +83,15 @@ func createMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
conf := cfg.(*Config)
writer, err := buildFileWriter(conf)
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf, writer)
})
return exporterhelper.NewMetricsExporter(
ctx,
set,
cfg,
fe.Unwrap().(*fileExporter).consumeMetrics,
fe.consumeMetrics,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
Expand All @@ -100,59 +103,94 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
conf := cfg.(*Config)
writer, err := buildFileWriter(conf)
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf, writer)
})
return exporterhelper.NewLogsExporter(
ctx,
set,
cfg,
fe.Unwrap().(*fileExporter).consumeLogs,
fe.consumeLogs,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
)
}

func newFileExporter(conf *Config, writer io.WriteCloser) *fileExporter {
return &fileExporter{
path: conf.Path,
// getOrCreateFileExporter creates a FileExporter and caches it for a particular configuration,
// or returns the already cached one. Caching is required because the factory is asked trace and
// metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver()
// but they must not create separate objects, they must use one Exporter object per configuration.
func getOrCreateFileExporter(cfg component.Config) (FileExporter, error) {
conf := cfg.(*Config)
fe := exporters.GetOrAdd(cfg, func() component.Component {
e, err := newFileExporter(conf)
if err != nil {
return &errorComponent{err: err}
}

return e
})

component := fe.Unwrap()
if errComponent, ok := component.(*errorComponent); ok {
return nil, errComponent.err
}

return component.(FileExporter), nil
}

func newFileExporter(conf *Config) (FileExporter, error) {
marshaller := &marshaller{
formatType: conf.FormatType,
file: writer,
tracesMarshaler: tracesMarshalers[conf.FormatType],
metricsMarshaler: metricsMarshalers[conf.FormatType],
logsMarshaler: logsMarshalers[conf.FormatType],
exporter: buildExportFunc(conf),
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
flushInterval: conf.FlushInterval,
}
export := buildExportFunc(conf)

writer, err := newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export)
if err != nil {
return nil, err
}

return &fileExporter{
marshaller: marshaller,
writer: writer,
}, nil
}

func buildFileWriter(cfg *Config) (io.WriteCloser, error) {
if cfg.Rotation == nil {
f, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
var wc io.WriteCloser
if rotation == nil {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
return newBufferedWriteCloser(f), nil
wc = newBufferedWriteCloser(f)
} else {
wc = &lumberjack.Logger{
Filename: path,
MaxSize: rotation.MaxMegabytes,
MaxAge: rotation.MaxDays,
MaxBackups: rotation.MaxBackups,
LocalTime: rotation.LocalTime,
}
}
return &lumberjack.Logger{
Filename: cfg.Path,
MaxSize: cfg.Rotation.MaxMegabytes,
MaxAge: cfg.Rotation.MaxDays,
MaxBackups: cfg.Rotation.MaxBackups,
LocalTime: cfg.Rotation.LocalTime,

return &fileWriter{
path: path,
file: wc,
exporter: export,
flushInterval: flushInterval,
}, nil
}

// This is the map of already created File exporters for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one Receiver object per configuration.
// create separate objects, they must use one Exporter object per configuration.
var exporters = sharedcomponent.NewSharedComponents()
35 changes: 19 additions & 16 deletions exporter/fileexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -96,25 +97,27 @@ func TestCreateLogsExporterError(t *testing.T) {
assert.Error(t, err)
}

func TestBuildFileWriter(t *testing.T) {
func TestNewFileWriter(t *testing.T) {
type args struct {
cfg *Config
}
tests := []struct {
name string
args args
want io.WriteCloser
validate func(*testing.T, io.WriteCloser)
validate func(*testing.T, *fileWriter)
}{
{
name: "single file",
args: args{
cfg: &Config{
Path: tempFileName(t),
Path: tempFileName(t),
FlushInterval: 5 * time.Second,
},
},
validate: func(t *testing.T, closer io.WriteCloser) {
_, ok := closer.(*bufferedWriteCloser)
validate: func(t *testing.T, writer *fileWriter) {
assert.Equal(t, 5*time.Second, writer.flushInterval)
_, ok := writer.file.(*bufferedWriteCloser)
assert.Equal(t, true, ok)
},
},
Expand All @@ -128,10 +131,10 @@ func TestBuildFileWriter(t *testing.T) {
},
},
},
validate: func(t *testing.T, closer io.WriteCloser) {
writer, ok := closer.(*lumberjack.Logger)
validate: func(t *testing.T, writer *fileWriter) {
logger, ok := writer.file.(*lumberjack.Logger)
assert.Equal(t, true, ok)
assert.Equal(t, defaultMaxBackups, writer.MaxBackups)
assert.Equal(t, defaultMaxBackups, logger.MaxBackups)
},
},
{
Expand All @@ -147,21 +150,21 @@ func TestBuildFileWriter(t *testing.T) {
},
},
},
validate: func(t *testing.T, closer io.WriteCloser) {
writer, ok := closer.(*lumberjack.Logger)
validate: func(t *testing.T, writer *fileWriter) {
logger, ok := writer.file.(*lumberjack.Logger)
assert.Equal(t, true, ok)
assert.Equal(t, 3, writer.MaxBackups)
assert.Equal(t, 30, writer.MaxSize)
assert.Equal(t, 100, writer.MaxAge)
assert.Equal(t, true, writer.LocalTime)
assert.Equal(t, 3, logger.MaxBackups)
assert.Equal(t, 30, logger.MaxSize)
assert.Equal(t, 100, logger.MaxAge)
assert.Equal(t, true, logger.LocalTime)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildFileWriter(tt.args.cfg)
got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil)
defer func() {
assert.NoError(t, got.Close())
assert.NoError(t, got.file.Close())
}()
assert.NoError(t, err)
tt.validate(t, got)
Expand Down
Loading
Loading