Skip to content

Commit

Permalink
[chore][pkg/stanza] Skip persister operations if nil (#28580)
Browse files Browse the repository at this point in the history
Although the persister is generally expected, we can easily protect
against cases where it is not provided and save some work as well. This
becomes more important with #27823 which interacts with the persister
during the Stop function.
  • Loading branch information
djaglowski authored Oct 25, 2023
1 parent 5762219 commit 38d5bd6
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

m.persister = persister

offsets, err := checkpoint.Load(ctx, m.persister)
if err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
if persister != nil {
m.persister = persister
offsets, err := checkpoint.Load(ctx, m.persister)
if err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
}
}
}

Expand Down Expand Up @@ -169,12 +170,14 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

m.saveCurrent(readers)

rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
if m.persister != nil {
rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}

m.clearCurrentFingerprints()
Expand Down

0 comments on commit 38d5bd6

Please sign in to comment.