From 5545be179ad3262eb24264dbb36d73b354e499ce Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 24 Oct 2023 11:45:05 -0600 Subject: [PATCH] [chore][pkg/stanza] Skip persister operations if nil --- pkg/stanza/fileconsumer/file.go | 37 ++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 1d2c1be33a70..ff8881f4c843 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -43,17 +43,18 @@ func (m *Manager) Start(persister operator.Persister) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel - m.persister = persister - - offsets, err := checkpoint.Load(ctx, m.persister) - if err != nil { - return fmt.Errorf("read known files from database: %w", err) - } - if len(offsets) > 0 { - m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") - m.readerFactory.FromBeginning = true - for _, offset := range offsets { - m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset}) + if persister != nil { + m.persister = persister + offsets, err := checkpoint.Load(ctx, m.persister) + if err != nil { + return fmt.Errorf("read known files from database: %w", err) + } + if len(offsets) > 0 { + m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") + m.readerFactory.FromBeginning = true + for _, offset := range offsets { + m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset}) + } } } @@ -165,12 +166,14 @@ func (m *Manager) consume(ctx context.Context, paths []string) { m.saveCurrent(readers) - rmds := make([]*reader.Metadata, 0, len(readers)) - for _, r := range readers { - rmds = append(rmds, r.Metadata) - } - if err := checkpoint.Save(ctx, m.persister, rmds); err != nil { - m.Errorw("save offsets", zap.Error(err)) + if m.persister != nil { + rmds := make([]*reader.Metadata, 0, len(readers)) + for _, r := range readers { + rmds = append(rmds, r.Metadata) + } + if err := checkpoint.Save(ctx, m.persister, rmds); err != nil { + m.Errorw("save offsets", zap.Error(err)) + } } m.clearCurrentFingerprints()