Skip to content

Commit

Permalink
[chore][pkg/stanza] Extract checkpointing logic into internal package
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 16, 2023
1 parent d5344ca commit bcc1ba3
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 130 deletions.
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func BenchmarkFileInput(b *testing.B) {
}

b.ResetTimer()
err = op.Start(testutil.NewMockPersister("test"))
err = op.Start(testutil.NewUnscopedMockPersister())
defer func() {
require.NoError(b, op.Stop())
}()
Expand Down
104 changes: 21 additions & 83 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
Expand All @@ -28,9 +27,9 @@ type Manager struct {
readerFactory reader.Factory
fileMatcher *matcher.Matcher
roller roller
persister operator.Persister

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int

Expand All @@ -43,12 +42,20 @@ type Manager struct {
func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

m.persister = persister

// Load offsets from disk
if err := m.loadLastPollFiles(ctx); err != nil {
offsets, err := checkpoint.Load(ctx, m.persister)
if err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
}
}

if _, err := m.fileMatcher.MatchFiles(); err != nil {
m.Warnf("finding files: %v", err)
Expand Down Expand Up @@ -150,7 +157,15 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

m.roller.roll(ctx, readers)
m.saveCurrent(readers)
m.syncLastPollFiles(ctx)

rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
}

m.clearCurrentFingerprints()
}

Expand Down Expand Up @@ -263,80 +278,3 @@ func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Rea
}
return nil, false
}

const knownFilesKey = "knownFiles"

// syncLastPollFiles syncs the most recent set of files to the database
func (m *Manager) syncLastPollFiles(ctx context.Context) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

// Encode the number of known files
if err := enc.Encode(len(m.knownFiles)); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
return
}

// Encode each known file
for _, fileReader := range m.knownFiles {
if err := enc.Encode(fileReader.Metadata); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
}
}

if err := m.persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil {
m.Errorw("Failed to sync to database", zap.Error(err))
}
}

// syncLastPollFiles loads the most recent set of files to the database
func (m *Manager) loadLastPollFiles(ctx context.Context) error {
encoded, err := m.persister.Get(ctx, knownFilesKey)
if err != nil {
return err
}

if encoded == nil {
return nil
}

dec := json.NewDecoder(bytes.NewReader(encoded))

// Decode the number of entries
var knownFileCount int
if err = dec.Decode(&knownFileCount); err != nil {
return fmt.Errorf("decoding file count: %w", err)
}

if knownFileCount > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
}

// Decode each of the known files
for i := 0; i < knownFileCount; i++ {
rmd := new(reader.Metadata)
if err = dec.Decode(rmd); err != nil {
return err
}

// Migrate readers that used FileAttributes.HeaderAttributes
// This block can be removed in a future release, tentatively v0.90.0
if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok {
switch hat := ha.(type) {
case map[string]any:
for k, v := range hat {
rmd.FileAttributes[k] = v
}
delete(rmd.FileAttributes, "HeaderAttributes")
default:
m.Errorw("migrate header attributes: unexpected format")
}
}

// This reader won't be used for anything other than metadata reference, so just wrap the metadata
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: rmd})
}

return nil
}
Loading

0 comments on commit bcc1ba3

Please sign in to comment.