From 766b54d5b9512ba58ab39fd5dc1d64bd4709dacc Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Tue, 30 Apr 2024 11:17:13 -0600 Subject: [PATCH 01/13] Add a custom zapcore.Core for confmap logging --- confmap/provider/envprovider/provider.go | 2 + confmap/resolver.go | 3 ++ otelcol/collector.go | 58 +++++++++++++++++++++++- 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/confmap/provider/envprovider/provider.go b/confmap/provider/envprovider/provider.go index c153893cb87..07286f709fc 100644 --- a/confmap/provider/envprovider/provider.go +++ b/confmap/provider/envprovider/provider.go @@ -42,6 +42,8 @@ func NewFactory() confmap.ProviderFactory { } func (emp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { + emp.logger.Debug("env:debug_log") + if !strings.HasPrefix(uri, schemeName+":") { return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName) } diff --git a/confmap/resolver.go b/confmap/resolver.go index 05f7f964d03..6cc5cb12712 100644 --- a/confmap/resolver.go +++ b/confmap/resolver.go @@ -92,6 +92,9 @@ func NewResolver(set ResolverSettings) (*Resolver, error) { if set.ProviderSettings.Logger == nil { set.ProviderSettings.Logger = zap.NewNop() + } else { + set.ProviderSettings.Logger.Info("hey look, a logger") + set.ProviderSettings.Logger.Debug("a debug log") } var providers map[string]Provider diff --git a/otelcol/collector.go b/otelcol/collector.go index bca6e9b86dc..6e46747cd66 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -15,6 +15,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -109,6 +110,50 @@ type Collector struct { signalsChannel chan os.Signal // asyncErrorChannel is used to signal a fatal error from any component. asyncErrorChannel chan error + cc *collectorCore +} + +var _ zapcore.Core = &collectorCore{} + +type collectorCore struct { + core zapcore.Core +} + +func (c *collectorCore) Enabled(l zapcore.Level) bool { + return c.core.Enabled(l) +} + +func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { + return c.core.With(f) +} + +func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + return c.core.Check(e, ce) +} + +func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { + return c.core.Write(e, f) +} + +func (c *collectorCore) Sync() error { + return c.core.Sync() +} + +func newCollectorCore(options []zap.Option) (*collectorCore, error) { + ec := zap.NewProductionEncoderConfig() + ec.EncodeTime = zapcore.ISO8601TimeEncoder + zapCfg := &zap.Config{ + Level: zap.NewAtomicLevelAt(zapcore.InfoLevel), + Encoding: "console", + EncoderConfig: ec, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + logger, err := zapCfg.Build(options...) + if err != nil { + return nil, err + } + return &collectorCore{core: logger.Core()}, nil } // NewCollector creates and returns a new instance of Collector. @@ -116,7 +161,11 @@ func NewCollector(set CollectorSettings) (*Collector, error) { var err error configProvider := set.ConfigProvider - set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.NewNop()} + cc, err := newCollectorCore(set.LoggingOptions) + if err != nil { + return nil, err + } + set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.New(cc, set.LoggingOptions...)} set.ConfigProviderSettings.ResolverSettings.ConverterSettings = confmap.ConverterSettings{} if configProvider == nil { @@ -137,6 +186,7 @@ func NewCollector(set CollectorSettings) (*Collector, error) { signalsChannel: make(chan os.Signal, 3), asyncErrorChannel: make(chan error), configProvider: configProvider, + cc: cc, }, nil } @@ -203,6 +253,12 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return err } + col.cc.core = col.service.Logger().Core() + _, err = col.configProvider.Get(ctx, factories) + if err != nil { + return fmt.Errorf("failed to get config: %w", err) + } + if !col.set.SkipSettingGRPCLogger { grpclog.SetLogger(col.service.Logger(), cfg.Service.Telemetry.Logs.Level) } From cc65cfd7258e8f6e28e2a4d9b7ea0b1aaf0881b1 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Tue, 30 Apr 2024 14:58:01 -0600 Subject: [PATCH 02/13] Use observer and callback --- confmap/provider/envprovider/provider.go | 2 - confmap/resolver.go | 3 -- otelcol/collector.go | 65 ++++++++++++++---------- 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/confmap/provider/envprovider/provider.go b/confmap/provider/envprovider/provider.go index 07286f709fc..c153893cb87 100644 --- a/confmap/provider/envprovider/provider.go +++ b/confmap/provider/envprovider/provider.go @@ -42,8 +42,6 @@ func NewFactory() confmap.ProviderFactory { } func (emp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { - emp.logger.Debug("env:debug_log") - if !strings.HasPrefix(uri, schemeName+":") { return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName) } diff --git a/confmap/resolver.go b/confmap/resolver.go index 6cc5cb12712..05f7f964d03 100644 --- a/confmap/resolver.go +++ b/confmap/resolver.go @@ -92,9 +92,6 @@ func NewResolver(set ResolverSettings) (*Resolver, error) { if set.ProviderSettings.Logger == nil { set.ProviderSettings.Logger = zap.NewNop() - } else { - set.ProviderSettings.Logger.Info("hey look, a logger") - set.ProviderSettings.Logger.Debug("a debug log") } var providers map[string]Provider diff --git a/otelcol/collector.go b/otelcol/collector.go index 6e46747cd66..3f18f017c48 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -10,12 +10,14 @@ import ( "fmt" "os" "os/signal" + "sync" "sync/atomic" "syscall" "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -109,51 +111,56 @@ type Collector struct { // signalsChannel is used to receive termination signals from the OS. signalsChannel chan os.Signal // asyncErrorChannel is used to signal a fatal error from any component. - asyncErrorChannel chan error - cc *collectorCore + asyncErrorChannel chan error + ol *observer.ObservedLogs + updateConfigProviderLogger func(core zapcore.Core) } var _ zapcore.Core = &collectorCore{} type collectorCore struct { core zapcore.Core + mu sync.Mutex } func (c *collectorCore) Enabled(l zapcore.Level) bool { + c.mu.Lock() + defer c.mu.Unlock() return c.core.Enabled(l) } func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { + c.mu.Lock() + defer c.mu.Unlock() return c.core.With(f) } func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + c.mu.Lock() + defer c.mu.Unlock() return c.core.Check(e, ce) } func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { + c.mu.Lock() + defer c.mu.Unlock() return c.core.Write(e, f) } func (c *collectorCore) Sync() error { + c.mu.Lock() + defer c.mu.Unlock() return c.core.Sync() } -func newCollectorCore(options []zap.Option) (*collectorCore, error) { - ec := zap.NewProductionEncoderConfig() - ec.EncodeTime = zapcore.ISO8601TimeEncoder - zapCfg := &zap.Config{ - Level: zap.NewAtomicLevelAt(zapcore.InfoLevel), - Encoding: "console", - EncoderConfig: ec, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - } - logger, err := zapCfg.Build(options...) - if err != nil { - return nil, err - } - return &collectorCore{core: logger.Core()}, nil +func (c *collectorCore) SetCore(core zapcore.Core) { + c.mu.Lock() + c.mu.Unlock() + c.core = core +} + +func newCollectorCore(core zapcore.Core) (*collectorCore, error) { + return &collectorCore{core: core}, nil } // NewCollector creates and returns a new instance of Collector. @@ -161,7 +168,8 @@ func NewCollector(set CollectorSettings) (*Collector, error) { var err error configProvider := set.ConfigProvider - cc, err := newCollectorCore(set.LoggingOptions) + core, ol := observer.New(zap.DebugLevel) + cc, err := newCollectorCore(core) if err != nil { return nil, err } @@ -183,10 +191,11 @@ func NewCollector(set CollectorSettings) (*Collector, error) { shutdownChan: make(chan struct{}), // Per signal.Notify documentation, a size of the channel equaled with // the number of signals getting notified on is recommended. - signalsChannel: make(chan os.Signal, 3), - asyncErrorChannel: make(chan error), - configProvider: configProvider, - cc: cc, + signalsChannel: make(chan os.Signal, 3), + asyncErrorChannel: make(chan error), + configProvider: configProvider, + ol: ol, + updateConfigProviderLogger: cc.SetCore, }, nil } @@ -252,11 +261,13 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { if err != nil { return err } - - col.cc.core = col.service.Logger().Core() - _, err = col.configProvider.Get(ctx, factories) - if err != nil { - return fmt.Errorf("failed to get config: %w", err) + if col.updateConfigProviderLogger != nil { + col.updateConfigProviderLogger(col.service.Logger().Core()) + } + if col.ol != nil { + for _, log := range col.ol.All() { + col.service.Logger().Log(log.Level, log.Message, log.Context...) + } } if !col.set.SkipSettingGRPCLogger { From 99e7e47cba5eae8c2ad525a3671a52df0e96e55a Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Wed, 1 May 2024 12:00:38 -0400 Subject: [PATCH 03/13] Rough implementation of a zapcore.Core for buffering logs prior to user specified logger config --- otelcol/buffered_core.go | 70 ++++++++++++++++++++++++++++++++++++++++ otelcol/collector.go | 23 ++++--------- 2 files changed, 77 insertions(+), 16 deletions(-) create mode 100644 otelcol/buffered_core.go diff --git a/otelcol/buffered_core.go b/otelcol/buffered_core.go new file mode 100644 index 00000000000..d548dbbbca5 --- /dev/null +++ b/otelcol/buffered_core.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This logger implements zapcore.Core and is based on zaptest/observer. +package otelcol // import "go.opentelemetry.io/collector/otelcol" + +import ( + "sync" + + "go.uber.org/zap/zapcore" +) + +type loggedEntry struct { + zapcore.Entry + Context []zapcore.Field +} + +func newBufferedCore(enab zapcore.LevelEnabler) *bufferedCore { + return &bufferedCore{LevelEnabler: enab} +} + +var _ zapcore.Core = (*bufferedCore)(nil) + +type bufferedCore struct { + zapcore.LevelEnabler + mu sync.RWMutex + logs []loggedEntry + context []zapcore.Field +} + +func (bc *bufferedCore) Level() zapcore.Level { + return zapcore.LevelOf(bc.LevelEnabler) +} + +func (bc *bufferedCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if bc.Enabled(ent.Level) { + return ce.AddCore(ent, bc) + } + return ce +} + +func (bc *bufferedCore) With(fields []zapcore.Field) zapcore.Core { + return &bufferedCore{ + LevelEnabler: bc.LevelEnabler, + logs: bc.logs, + context: append(bc.context[:len(bc.context):len(bc.context)], fields...), + } +} + +func (bc *bufferedCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + all := make([]zapcore.Field, 0, len(fields)+len(bc.context)) + all = append(all, bc.context...) + all = append(all, fields...) + bc.mu.Lock() + bc.logs = append(bc.logs, loggedEntry{ent, all}) + bc.mu.Unlock() + return nil +} + +func (bc *bufferedCore) Sync() error { + return nil +} + +func (bc *bufferedCore) TakeLogs() []loggedEntry { + bc.mu.Lock() + logs := bc.logs + bc.logs = nil + bc.mu.Unlock() + return logs +} diff --git a/otelcol/collector.go b/otelcol/collector.go index 3f18f017c48..c9317705acb 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -17,7 +17,6 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -112,7 +111,7 @@ type Collector struct { signalsChannel chan os.Signal // asyncErrorChannel is used to signal a fatal error from any component. asyncErrorChannel chan error - ol *observer.ObservedLogs + bc *bufferedCore updateConfigProviderLogger func(core zapcore.Core) } @@ -155,24 +154,16 @@ func (c *collectorCore) Sync() error { func (c *collectorCore) SetCore(core zapcore.Core) { c.mu.Lock() - c.mu.Unlock() + defer c.mu.Unlock() c.core = core } -func newCollectorCore(core zapcore.Core) (*collectorCore, error) { - return &collectorCore{core: core}, nil -} - // NewCollector creates and returns a new instance of Collector. func NewCollector(set CollectorSettings) (*Collector, error) { var err error configProvider := set.ConfigProvider - - core, ol := observer.New(zap.DebugLevel) - cc, err := newCollectorCore(core) - if err != nil { - return nil, err - } + bc := newBufferedCore(zapcore.DebugLevel) + cc := &collectorCore{core: bc} set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.New(cc, set.LoggingOptions...)} set.ConfigProviderSettings.ResolverSettings.ConverterSettings = confmap.ConverterSettings{} @@ -194,7 +185,7 @@ func NewCollector(set CollectorSettings) (*Collector, error) { signalsChannel: make(chan os.Signal, 3), asyncErrorChannel: make(chan error), configProvider: configProvider, - ol: ol, + bc: bc, updateConfigProviderLogger: cc.SetCore, }, nil } @@ -264,8 +255,8 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { if col.updateConfigProviderLogger != nil { col.updateConfigProviderLogger(col.service.Logger().Core()) } - if col.ol != nil { - for _, log := range col.ol.All() { + if col.bc != nil { + for _, log := range col.bc.TakeLogs() { col.service.Logger().Log(log.Level, log.Message, log.Context...) } } From 97615fe0f1e9aa079e5a4fe0190a1987d1d61cfe Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 1 May 2024 16:59:38 -0600 Subject: [PATCH 04/13] Add fallback logger option --- otelcol/buffered_core.go | 3 +- otelcol/collector.go | 79 ++++++++++++++++----------------------- otelcol/collector_core.go | 53 ++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 47 deletions(-) create mode 100644 otelcol/collector_core.go diff --git a/otelcol/buffered_core.go b/otelcol/buffered_core.go index d548dbbbca5..7f0b611c2d1 100644 --- a/otelcol/buffered_core.go +++ b/otelcol/buffered_core.go @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 // This logger implements zapcore.Core and is based on zaptest/observer. + package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( @@ -43,7 +44,7 @@ func (bc *bufferedCore) With(fields []zapcore.Field) zapcore.Core { return &bufferedCore{ LevelEnabler: bc.LevelEnabler, logs: bc.logs, - context: append(bc.context[:len(bc.context):len(bc.context)], fields...), + context: append(bc.context, fields...), } } diff --git a/otelcol/collector.go b/otelcol/collector.go index c9317705acb..6d6e0da945f 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -10,9 +10,9 @@ import ( "fmt" "os" "os/signal" - "sync" "sync/atomic" "syscall" + "time" "go.uber.org/multierr" "go.uber.org/zap" @@ -115,49 +115,6 @@ type Collector struct { updateConfigProviderLogger func(core zapcore.Core) } -var _ zapcore.Core = &collectorCore{} - -type collectorCore struct { - core zapcore.Core - mu sync.Mutex -} - -func (c *collectorCore) Enabled(l zapcore.Level) bool { - c.mu.Lock() - defer c.mu.Unlock() - return c.core.Enabled(l) -} - -func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { - c.mu.Lock() - defer c.mu.Unlock() - return c.core.With(f) -} - -func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { - c.mu.Lock() - defer c.mu.Unlock() - return c.core.Check(e, ce) -} - -func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { - c.mu.Lock() - defer c.mu.Unlock() - return c.core.Write(e, f) -} - -func (c *collectorCore) Sync() error { - c.mu.Lock() - defer c.mu.Unlock() - return c.core.Sync() -} - -func (c *collectorCore) SetCore(core zapcore.Core) { - c.mu.Lock() - defer c.mu.Unlock() - c.core = core -} - // NewCollector creates and returns a new instance of Collector. func NewCollector(set CollectorSettings) (*Collector, error) { var err error @@ -236,6 +193,8 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return fmt.Errorf("invalid configuration: %w", err) } + time.Sleep(time.Second * 5) + col.serviceConfig = &cfg.Service col.service, err = service.New(ctx, service.Settings{ @@ -256,8 +215,11 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { col.updateConfigProviderLogger(col.service.Logger().Core()) } if col.bc != nil { - for _, log := range col.bc.TakeLogs() { - col.service.Logger().Log(log.Level, log.Message, log.Context...) + x := col.bc.TakeLogs() + fmt.Println(len(x)) + for _, log := range x { + ce := col.service.Logger().Core().Check(log.Entry, nil) + ce.Write(log.Context...) } } @@ -301,12 +263,37 @@ func (col *Collector) DryRun(ctx context.Context) error { return cfg.Validate() } +func newFallbackLogger(options []zap.Option) (*zap.Logger, error) { + ec := zap.NewProductionEncoderConfig() + ec.EncodeTime = zapcore.ISO8601TimeEncoder + zapCfg := &zap.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + Encoding: "console", + EncoderConfig: ec, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + return zapCfg.Build(options...) +} + // Run starts the collector according to the given configuration, and waits for it to complete. // Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down. // Sets up the control logic for config reloading and shutdown. func (col *Collector) Run(ctx context.Context) error { if err := col.setupConfigurationComponents(ctx); err != nil { col.setCollectorState(StateClosed) + logger, loggerErr := newFallbackLogger(col.set.LoggingOptions) + if loggerErr != nil { + fmt.Printf("unable to create fallback logger, %v", err) + } else { + if col.bc != nil { + for _, log := range col.bc.TakeLogs() { + logger.Log(log.Level, log.Message, log.Context...) + } + } + logger.Warn("unable to resolve configuration", zap.Error(err)) + } + return err } diff --git a/otelcol/collector_core.go b/otelcol/collector_core.go new file mode 100644 index 00000000000..84af8e72d9b --- /dev/null +++ b/otelcol/collector_core.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol // import "go.opentelemetry.io/collector/otelcol" + +import ( + "sync" + + "go.uber.org/zap/zapcore" +) + +var _ zapcore.Core = (*collectorCore)(nil) + +type collectorCore struct { + core zapcore.Core + mu sync.Mutex +} + +func (c *collectorCore) Enabled(l zapcore.Level) bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.core.Enabled(l) +} + +func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { + c.mu.Lock() + defer c.mu.Unlock() + return c.core.With(f) +} + +func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + c.mu.Lock() + defer c.mu.Unlock() + return c.core.Check(e, ce) +} + +func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { + c.mu.Lock() + defer c.mu.Unlock() + return c.core.Write(e, f) +} + +func (c *collectorCore) Sync() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.core.Sync() +} + +func (c *collectorCore) SetCore(core zapcore.Core) { + c.mu.Lock() + c.core = core + c.mu.Unlock() +} From cf83cf7ce9652fd7b0e50546c224822fff969e8f Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 1 May 2024 17:00:08 -0600 Subject: [PATCH 05/13] Remove test timeout --- otelcol/collector.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/otelcol/collector.go b/otelcol/collector.go index 6d6e0da945f..db435ae0523 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -12,7 +12,6 @@ import ( "os/signal" "sync/atomic" "syscall" - "time" "go.uber.org/multierr" "go.uber.org/zap" @@ -193,8 +192,6 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return fmt.Errorf("invalid configuration: %w", err) } - time.Sleep(time.Second * 5) - col.serviceConfig = &cfg.Service col.service, err = service.New(ctx, service.Settings{ From 2c5c953ac66e67829a99adc1aa2f5ad77a590571 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Fri, 3 May 2024 12:10:13 -0600 Subject: [PATCH 06/13] Update buffered logger to use WithCaller by default --- otelcol/collector.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/otelcol/collector.go b/otelcol/collector.go index db435ae0523..3ecff6cb6c3 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -118,9 +118,11 @@ type Collector struct { func NewCollector(set CollectorSettings) (*Collector, error) { var err error configProvider := set.ConfigProvider + bc := newBufferedCore(zapcore.DebugLevel) cc := &collectorCore{core: bc} - set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.New(cc, set.LoggingOptions...)} + options := append([]zap.Option{zap.WithCaller(true)}, set.LoggingOptions...) + set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.New(cc, options...)} set.ConfigProviderSettings.ResolverSettings.ConverterSettings = confmap.ConverterSettings{} if configProvider == nil { @@ -213,7 +215,6 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { } if col.bc != nil { x := col.bc.TakeLogs() - fmt.Println(len(x)) for _, log := range x { ce := col.service.Logger().Core().Check(log.Entry, nil) ce.Write(log.Context...) From 88c7f7eeb7346551888e69fa3549695c6099f710 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Fri, 3 May 2024 15:35:10 -0600 Subject: [PATCH 07/13] Add unit tests --- otelcol/buffered_core_test.go | 102 +++++++++++++++++++++++++++++++++ otelcol/collector_core.go | 9 ++- otelcol/collector_core_test.go | 89 ++++++++++++++++++++++++++++ 3 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 otelcol/buffered_core_test.go create mode 100644 otelcol/collector_core_test.go diff --git a/otelcol/buffered_core_test.go b/otelcol/buffered_core_test.go new file mode 100644 index 00000000000..a3e3fe34e3f --- /dev/null +++ b/otelcol/buffered_core_test.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" +) + +func Test_bufferedCore_Level(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + assert.Equal(t, zapcore.InfoLevel, bc.Level()) +} + +func Test_bufferedCore_Check(t *testing.T) { + t.Run("check passed", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.InfoLevel, + } + expected := &zapcore.CheckedEntry{} + expected = expected.AddCore(e, bc) + ce := bc.Check(e, nil) + assert.Equal(t, expected, ce) + }) + + t.Run("check did not pass", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + } + ce := bc.Check(e, nil) + assert.Nil(t, ce) + }) +} + +func Test_bufferedCore_With(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + bc.context = []zapcore.Field{ + {Key: "original", String: "context"}, + } + inputs := []zapcore.Field{ + {Key: "test", String: "passed"}, + } + expected := []zapcore.Field{ + {Key: "original", String: "context"}, + {Key: "test", String: "passed"}, + } + newBC := bc.With(inputs) + assert.Equal(t, expected, newBC.(*bufferedCore).context) +} + +func Test_bufferedCore_Write(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := bc.Write(e, fields) + require.NoError(t, err) + + expected := loggedEntry{ + e, + fields, + } + require.Equal(t, 1, len(bc.logs)) + require.Equal(t, expected, bc.logs[0]) +} + +func Test_bufferedCore_Sync(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + assert.NoError(t, bc.Sync()) +} + +func Test_bufferedCore_TakeLogs(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := bc.Write(e, fields) + require.NoError(t, err) + + expected := []loggedEntry{ + { + e, + fields, + }, + } + assert.Equal(t, expected, bc.TakeLogs()) + assert.Nil(t, bc.logs) +} diff --git a/otelcol/collector_core.go b/otelcol/collector_core.go index 84af8e72d9b..f378c303618 100644 --- a/otelcol/collector_core.go +++ b/otelcol/collector_core.go @@ -25,13 +25,18 @@ func (c *collectorCore) Enabled(l zapcore.Level) bool { func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { c.mu.Lock() defer c.mu.Unlock() - return c.core.With(f) + return &collectorCore{ + core: c.core.With(f), + } } func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { c.mu.Lock() defer c.mu.Unlock() - return c.core.Check(e, ce) + if c.core.Enabled(e.Level) { + return ce.AddCore(e, c) + } + return ce } func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { diff --git a/otelcol/collector_core_test.go b/otelcol/collector_core_test.go new file mode 100644 index 00000000000..6b9deb031e6 --- /dev/null +++ b/otelcol/collector_core_test.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelcol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" +) + +func Test_collectorCore_Enabled(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + assert.True(t, cc.Enabled(zapcore.ErrorLevel)) + assert.False(t, cc.Enabled(zapcore.DebugLevel)) +} + +func Test_collectorCore_Check(t *testing.T) { + t.Run("check passed", func(t *testing.T) { + bc := newBufferedCore(zapcore.InfoLevel) + cc := collectorCore{core: bc} + e := zapcore.Entry{ + Level: zapcore.InfoLevel, + } + expected := &zapcore.CheckedEntry{} + expected = expected.AddCore(e, &cc) + ce := cc.Check(e, nil) + assert.Equal(t, expected, ce) + }) + + t.Run("check did not pass", func(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + } + ce := cc.Check(e, nil) + assert.Nil(t, ce) + }) +} + +func Test_collectorCore_With(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + cc.core.(*bufferedCore).context = []zapcore.Field{ + {Key: "original", String: "context"}, + } + inputs := []zapcore.Field{ + {Key: "test", String: "passed"}, + } + expected := []zapcore.Field{ + {Key: "original", String: "context"}, + {Key: "test", String: "passed"}, + } + newCC := cc.With(inputs) + assert.Equal(t, expected, newCC.(*collectorCore).core.(*bufferedCore).context) +} + +func Test_collectorCore_Write(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + e := zapcore.Entry{ + Level: zapcore.DebugLevel, + Message: "test", + } + fields := []zapcore.Field{ + {Key: "field1", String: "value1"}, + } + err := cc.Write(e, fields) + require.NoError(t, err) + + expected := loggedEntry{ + e, + fields, + } + require.Equal(t, 1, len(cc.core.(*bufferedCore).logs)) + require.Equal(t, expected, cc.core.(*bufferedCore).logs[0]) +} + +func Test_collectorCore_Sync(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + assert.NoError(t, cc.Sync()) +} + +func Test_collectorCore_SetCore(t *testing.T) { + cc := collectorCore{core: newBufferedCore(zapcore.InfoLevel)} + newCore := newBufferedCore(zapcore.DebugLevel) + cc.SetCore(newCore) + assert.Equal(t, zapcore.DebugLevel, cc.core.(*bufferedCore).LevelEnabler) +} From 2529105a4a96a41b9083ddae938df624602d7d3e Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Fri, 3 May 2024 15:36:38 -0600 Subject: [PATCH 08/13] changelog --- .chloggen/confmap-logger-wrapper-idea.yaml | 25 ++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .chloggen/confmap-logger-wrapper-idea.yaml diff --git a/.chloggen/confmap-logger-wrapper-idea.yaml b/.chloggen/confmap-logger-wrapper-idea.yaml new file mode 100644 index 00000000000..ff35e92c2a3 --- /dev/null +++ b/.chloggen/confmap-logger-wrapper-idea.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otelcol + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enable logging during configuration resolution + +# One or more tracking issues or pull requests related to the change +issues: [10056] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 0d3ebda1598edd88df55c95b6e6efa8c05b8f2cb Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Mon, 6 May 2024 16:06:24 -0600 Subject: [PATCH 09/13] Apply feedback --- otelcol/buffered_core.go | 30 ++++++++++++++++++++---------- otelcol/buffered_core_test.go | 5 +++++ otelcol/collector.go | 12 +++++++++--- otelcol/collector_core.go | 26 +++++++++++++------------- 4 files changed, 47 insertions(+), 26 deletions(-) diff --git a/otelcol/buffered_core.go b/otelcol/buffered_core.go index 7f0b611c2d1..447bf13cb52 100644 --- a/otelcol/buffered_core.go +++ b/otelcol/buffered_core.go @@ -6,6 +6,7 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( + "fmt" "sync" "go.uber.org/zap/zapcore" @@ -24,9 +25,10 @@ var _ zapcore.Core = (*bufferedCore)(nil) type bufferedCore struct { zapcore.LevelEnabler - mu sync.RWMutex - logs []loggedEntry - context []zapcore.Field + mu sync.RWMutex + logs []loggedEntry + context []zapcore.Field + logsTaken bool } func (bc *bufferedCore) Level() zapcore.Level { @@ -44,17 +46,21 @@ func (bc *bufferedCore) With(fields []zapcore.Field) zapcore.Core { return &bufferedCore{ LevelEnabler: bc.LevelEnabler, logs: bc.logs, + logsTaken: bc.logsTaken, context: append(bc.context, fields...), } } func (bc *bufferedCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + bc.mu.Lock() + defer bc.mu.Unlock() + if bc.logsTaken { + return fmt.Errorf("the buffered logs have already been taken so writing is no longer supported") + } all := make([]zapcore.Field, 0, len(fields)+len(bc.context)) all = append(all, bc.context...) all = append(all, fields...) - bc.mu.Lock() bc.logs = append(bc.logs, loggedEntry{ent, all}) - bc.mu.Unlock() return nil } @@ -63,9 +69,13 @@ func (bc *bufferedCore) Sync() error { } func (bc *bufferedCore) TakeLogs() []loggedEntry { - bc.mu.Lock() - logs := bc.logs - bc.logs = nil - bc.mu.Unlock() - return logs + if !bc.logsTaken { + bc.mu.Lock() + defer bc.mu.Unlock() + logs := bc.logs + bc.logs = nil + bc.logsTaken = true + return logs + } + return nil } diff --git a/otelcol/buffered_core_test.go b/otelcol/buffered_core_test.go index a3e3fe34e3f..6417b8a1d9d 100644 --- a/otelcol/buffered_core_test.go +++ b/otelcol/buffered_core_test.go @@ -40,6 +40,7 @@ func Test_bufferedCore_Check(t *testing.T) { func Test_bufferedCore_With(t *testing.T) { bc := newBufferedCore(zapcore.InfoLevel) + bc.logsTaken = true bc.context = []zapcore.Field{ {Key: "original", String: "context"}, } @@ -52,6 +53,7 @@ func Test_bufferedCore_With(t *testing.T) { } newBC := bc.With(inputs) assert.Equal(t, expected, newBC.(*bufferedCore).context) + assert.True(t, newBC.(*bufferedCore).logsTaken) } func Test_bufferedCore_Write(t *testing.T) { @@ -99,4 +101,7 @@ func Test_bufferedCore_TakeLogs(t *testing.T) { } assert.Equal(t, expected, bc.TakeLogs()) assert.Nil(t, bc.logs) + + assert.Error(t, bc.Write(e, fields)) + assert.Nil(t, bc.TakeLogs()) } diff --git a/otelcol/collector.go b/otelcol/collector.go index 3ecff6cb6c3..faa51d2dcd5 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -217,7 +217,9 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { x := col.bc.TakeLogs() for _, log := range x { ce := col.service.Logger().Core().Check(log.Entry, nil) - ce.Write(log.Context...) + if ce != nil { + ce.Write(log.Context...) + } } } @@ -285,8 +287,12 @@ func (col *Collector) Run(ctx context.Context) error { fmt.Printf("unable to create fallback logger, %v", err) } else { if col.bc != nil { - for _, log := range col.bc.TakeLogs() { - logger.Log(log.Level, log.Message, log.Context...) + x := col.bc.TakeLogs() + for _, log := range x { + ce := logger.Core().Check(log.Entry, nil) + if ce != nil { + ce.Write(log.Context...) + } } } logger.Warn("unable to resolve configuration", zap.Error(err)) diff --git a/otelcol/collector_core.go b/otelcol/collector_core.go index f378c303618..b0a379fedc9 100644 --- a/otelcol/collector_core.go +++ b/otelcol/collector_core.go @@ -13,26 +13,26 @@ var _ zapcore.Core = (*collectorCore)(nil) type collectorCore struct { core zapcore.Core - mu sync.Mutex + rw sync.RWMutex } func (c *collectorCore) Enabled(l zapcore.Level) bool { - c.mu.Lock() - defer c.mu.Unlock() + c.rw.RLock() + defer c.rw.RUnlock() return c.core.Enabled(l) } func (c *collectorCore) With(f []zapcore.Field) zapcore.Core { - c.mu.Lock() - defer c.mu.Unlock() + c.rw.RLock() + defer c.rw.RUnlock() return &collectorCore{ core: c.core.With(f), } } func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { - c.mu.Lock() - defer c.mu.Unlock() + c.rw.RLock() + defer c.rw.RUnlock() if c.core.Enabled(e.Level) { return ce.AddCore(e, c) } @@ -40,19 +40,19 @@ func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcor } func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error { - c.mu.Lock() - defer c.mu.Unlock() + c.rw.RLock() + defer c.rw.RUnlock() return c.core.Write(e, f) } func (c *collectorCore) Sync() error { - c.mu.Lock() - defer c.mu.Unlock() + c.rw.RLock() + defer c.rw.RUnlock() return c.core.Sync() } func (c *collectorCore) SetCore(core zapcore.Core) { - c.mu.Lock() + c.rw.Lock() + defer c.rw.Unlock() c.core = core - c.mu.Unlock() } From da77d4d1672fa954f3ed7bc8efe0b9401596a324 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Tue, 7 May 2024 10:34:19 -0600 Subject: [PATCH 10/13] Update otelcol/collector.go Co-authored-by: Pablo Baeyens --- otelcol/collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/otelcol/collector.go b/otelcol/collector.go index faa51d2dcd5..bf79b663d96 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -284,7 +284,7 @@ func (col *Collector) Run(ctx context.Context) error { col.setCollectorState(StateClosed) logger, loggerErr := newFallbackLogger(col.set.LoggingOptions) if loggerErr != nil { - fmt.Printf("unable to create fallback logger, %v", err) + return errors.Join(err, fmt.Errorf("unable to create fallback logger: %w", loggerErr)) } else { if col.bc != nil { x := col.bc.TakeLogs() From cef009dba9240f7d773c255471d7565d1d037f51 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Tue, 7 May 2024 10:34:40 -0600 Subject: [PATCH 11/13] fix import --- otelcol/collector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/otelcol/collector.go b/otelcol/collector.go index bf79b663d96..af71cd2ed97 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -7,6 +7,7 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( "context" + "errors" "fmt" "os" "os/signal" From 83f90b1e29ddca11eaa3e08e00dc54e21c66c137 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 8 May 2024 08:45:23 -0600 Subject: [PATCH 12/13] Fix lint --- otelcol/collector.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/otelcol/collector.go b/otelcol/collector.go index af71cd2ed97..ff059d23f8b 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -286,18 +286,18 @@ func (col *Collector) Run(ctx context.Context) error { logger, loggerErr := newFallbackLogger(col.set.LoggingOptions) if loggerErr != nil { return errors.Join(err, fmt.Errorf("unable to create fallback logger: %w", loggerErr)) - } else { - if col.bc != nil { - x := col.bc.TakeLogs() - for _, log := range x { - ce := logger.Core().Check(log.Entry, nil) - if ce != nil { - ce.Write(log.Context...) - } + } + + if col.bc != nil { + x := col.bc.TakeLogs() + for _, log := range x { + ce := logger.Core().Check(log.Entry, nil) + if ce != nil { + ce.Write(log.Context...) } } - logger.Warn("unable to resolve configuration", zap.Error(err)) } + logger.Warn("unable to resolve configuration", zap.Error(err)) return err } From c3b748877633bcdc283ef6a27ea9b7e2e9b5f84f Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Thu, 9 May 2024 08:44:28 -0600 Subject: [PATCH 13/13] Debug windows test --- otelcol/collector.go | 1 - otelcol/collector_windows.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/otelcol/collector.go b/otelcol/collector.go index ff059d23f8b..fb36217cb11 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -297,7 +297,6 @@ func (col *Collector) Run(ctx context.Context) error { } } } - logger.Warn("unable to resolve configuration", zap.Error(err)) return err } diff --git a/otelcol/collector_windows.go b/otelcol/collector_windows.go index cc0a3611628..3df08386bbf 100644 --- a/otelcol/collector_windows.go +++ b/otelcol/collector_windows.go @@ -202,7 +202,7 @@ func (w windowsEventLogCore) Sync() error { func withWindowsCore(elog *eventlog.Log, serviceConfig **service.Config) func(zapcore.Core) zapcore.Core { return func(core zapcore.Core) zapcore.Core { - if serviceConfig != nil { + if serviceConfig != nil && *serviceConfig != nil { for _, output := range (*serviceConfig).Telemetry.Logs.OutputPaths { if output != "stdout" && output != "stderr" { // A log file was specified in the configuration, so we should not use the Windows Event Log