From bcc1ba38e567e4c2e8fabf18a112d3c3501805a5 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Mon, 16 Oct 2023 16:11:17 -0600 Subject: [PATCH] [chore][pkg/stanza] Extract checkpointing logic into internal package --- pkg/stanza/fileconsumer/benchmark_test.go | 2 +- pkg/stanza/fileconsumer/file.go | 104 +++--------- pkg/stanza/fileconsumer/file_test.go | 66 ++++---- .../internal/checkpoint/checkpoint.go | 91 +++++++++++ .../internal/checkpoint/checkpoint_test.go | 149 ++++++++++++++++++ pkg/stanza/fileconsumer/rotation_test.go | 24 +-- pkg/stanza/testutil/util.go | 18 ++- 7 files changed, 324 insertions(+), 130 deletions(-) create mode 100644 pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go create mode 100644 pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index 42315e78bbb9..a373129aa4c7 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -159,7 +159,7 @@ func BenchmarkFileInput(b *testing.B) { } b.ResetTimer() - err = op.Start(testutil.NewMockPersister("test")) + err = op.Start(testutil.NewUnscopedMockPersister()) defer func() { require.NoError(b, op.Stop()) }() diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 7c32574e19f7..daed4854bb1d 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -4,9 +4,7 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( - "bytes" "context" - "encoding/json" "fmt" "os" "sync" @@ -14,6 +12,7 @@ import ( "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" @@ -28,9 +27,9 @@ type Manager struct { readerFactory reader.Factory fileMatcher *matcher.Matcher roller roller - persister operator.Persister pollInterval time.Duration + persister operator.Persister maxBatches int maxBatchFiles int @@ -43,12 +42,20 @@ type Manager struct { func (m *Manager) Start(persister operator.Persister) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel + m.persister = persister - // Load offsets from disk - if err := m.loadLastPollFiles(ctx); err != nil { + offsets, err := checkpoint.Load(ctx, m.persister) + if err != nil { return fmt.Errorf("read known files from database: %w", err) } + if len(offsets) > 0 { + m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") + m.readerFactory.FromBeginning = true + for _, offset := range offsets { + m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset}) + } + } if _, err := m.fileMatcher.MatchFiles(); err != nil { m.Warnf("finding files: %v", err) @@ -150,7 +157,15 @@ func (m *Manager) consume(ctx context.Context, paths []string) { m.roller.roll(ctx, readers) m.saveCurrent(readers) - m.syncLastPollFiles(ctx) + + rmds := make([]*reader.Metadata, 0, len(readers)) + for _, r := range readers { + rmds = append(rmds, r.Metadata) + } + if err := checkpoint.Save(ctx, m.persister, rmds); err != nil { + m.Errorw("save offsets", zap.Error(err)) + } + m.clearCurrentFingerprints() } @@ -263,80 +278,3 @@ func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Rea } return nil, false } - -const knownFilesKey = "knownFiles" - -// syncLastPollFiles syncs the most recent set of files to the database -func (m *Manager) syncLastPollFiles(ctx context.Context) { - var buf bytes.Buffer - enc := json.NewEncoder(&buf) - - // Encode the number of known files - if err := enc.Encode(len(m.knownFiles)); err != nil { - m.Errorw("Failed to encode known files", zap.Error(err)) - return - } - - // Encode each known file - for _, fileReader := range m.knownFiles { - if err := enc.Encode(fileReader.Metadata); err != nil { - m.Errorw("Failed to encode known files", zap.Error(err)) - } - } - - if err := m.persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil { - m.Errorw("Failed to sync to database", zap.Error(err)) - } -} - -// syncLastPollFiles loads the most recent set of files to the database -func (m *Manager) loadLastPollFiles(ctx context.Context) error { - encoded, err := m.persister.Get(ctx, knownFilesKey) - if err != nil { - return err - } - - if encoded == nil { - return nil - } - - dec := json.NewDecoder(bytes.NewReader(encoded)) - - // Decode the number of entries - var knownFileCount int - if err = dec.Decode(&knownFileCount); err != nil { - return fmt.Errorf("decoding file count: %w", err) - } - - if knownFileCount > 0 { - m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") - m.readerFactory.FromBeginning = true - } - - // Decode each of the known files - for i := 0; i < knownFileCount; i++ { - rmd := new(reader.Metadata) - if err = dec.Decode(rmd); err != nil { - return err - } - - // Migrate readers that used FileAttributes.HeaderAttributes - // This block can be removed in a future release, tentatively v0.90.0 - if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok { - switch hat := ha.(type) { - case map[string]any: - for k, v := range hat { - rmd.FileAttributes[k] = v - } - delete(rmd.FileAttributes, "HeaderAttributes") - default: - m.Errorw("migrate header attributes: unexpected format") - } - } - - // This reader won't be used for anything other than metadata reference, so just wrap the metadata - m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: rmd}) - } - - return nil -} diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 06aea9b2cf81..c84ad5800c5e 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -44,7 +44,7 @@ func TestDefaultBehaviors(t *testing.T) { tempName := filepath.Base(temp.Name()) writeString(t, temp, " testlog1 \n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -82,7 +82,7 @@ See this issue for details: https://github.com/census-instrumentation/opencensus operator, _ := buildTestManager(t, cfg) _ = openTemp(t, tempDir) - err := operator.Start(testutil.NewMockPersister("test")) + err := operator.Start(testutil.NewUnscopedMockPersister()) require.NoError(t, err) defer func() { require.NoError(t, operator.Stop()) @@ -107,7 +107,7 @@ func TestAddFileFields(t *testing.T) { temp := openTemp(t, tempDir) writeString(t, temp, "testlog\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -159,7 +159,7 @@ func TestAddFileResolvedFields(t *testing.T) { resolved, err := filepath.Abs(realPath) require.NoError(t, err) - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -229,7 +229,7 @@ func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) { // Populate data writeString(t, file1, "testlog\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -318,7 +318,7 @@ func TestReadExistingLogs(t *testing.T) { temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\ntestlog2\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -387,7 +387,7 @@ func TestReadUsingNopEncoding(t *testing.T) { bytesWritten, err := temp.Write(tc.input) require.Greater(t, bytesWritten, 0) require.NoError(t, err) - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -471,7 +471,7 @@ func TestNopEncodingDifferentLogSizes(t *testing.T) { bytesWritten, err := temp.Write(tc.input) require.Greater(t, bytesWritten, 0) require.NoError(t, err) - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -490,7 +490,7 @@ func TestReadNewLogs(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() // Poll once so we know this isn't a new file operator.poll(context.Background()) @@ -518,7 +518,7 @@ func TestReadExistingAndNewLogs(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() // Start with a file with an entry in it, and expect that entry // to come through when we poll for the first time @@ -542,7 +542,7 @@ func TestStartAtEnd(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\n") @@ -570,7 +570,7 @@ func TestStartAtEndNewFile(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() operator.poll(context.Background()) temp := openTemp(t, tempDir) @@ -595,7 +595,7 @@ func TestNoNewline(t *testing.T) { temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\ntestlog2") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -616,7 +616,7 @@ func TestEmptyLine(t *testing.T) { temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\n\ntestlog2\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -639,7 +639,7 @@ func TestMultipleEmpty(t *testing.T) { temp := openTemp(t, tempDir) writeString(t, temp, "\n\ntestlog1\n\n\ntestlog2\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -666,7 +666,7 @@ func TestLeadingEmpty(t *testing.T) { temp := openTemp(t, tempDir) writeString(t, temp, "\ntestlog1\ntestlog2\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -686,7 +686,7 @@ func TestSplitWrite(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() temp := openTemp(t, tempDir) writeString(t, temp, "testlog1") @@ -706,7 +706,7 @@ func TestIgnoreEmptyFiles(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() temp := openTemp(t, tempDir) temp2 := openTemp(t, tempDir) @@ -734,7 +734,7 @@ func TestDecodeBufferIsResized(t *testing.T) { cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -760,7 +760,7 @@ func TestMultiFileSimple(t *testing.T) { writeString(t, temp1, "testlog1\n") writeString(t, temp2, "testlog2\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -792,7 +792,7 @@ func TestMultiFileSort(t *testing.T) { writeString(t, temp1, "testlog1\n") writeString(t, temp2, "testlog2\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -826,7 +826,7 @@ func TestMultiFileSortTimestamp(t *testing.T) { writeString(t, temp1, "testlog1\n") writeString(t, temp2, "testlog2\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -867,7 +867,7 @@ func TestMultiFileParallel_PreloadedFiles(t *testing.T) { }(temp, i) } - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -896,7 +896,7 @@ func TestMultiFileParallel_LiveFiles(t *testing.T) { } } - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -941,7 +941,7 @@ func TestRestartOffsets(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = tc.startAt - persister := testutil.NewMockPersister("test") + persister := testutil.NewUnscopedMockPersister() logFile := openTemp(t, tempDir) @@ -989,7 +989,7 @@ func TestManyLogsDelivered(t *testing.T) { } // Start the operator - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -1027,7 +1027,7 @@ func TestFileBatching(t *testing.T) { cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() core, observedLogs := observer.New(zap.DebugLevel) operator.SugaredLogger = zap.New(core).Sugar() @@ -1338,7 +1338,7 @@ func TestEncodings(t *testing.T) { _, err := temp.Write(tc.contents) require.NoError(t, err) - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -1383,7 +1383,7 @@ func TestDeleteAfterRead(t *testing.T) { cfg.DeleteAfterRead = true emitCalls := make(chan *emitParams, totalLines) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() operator.poll(context.Background()) actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, totalLines)...) @@ -1414,7 +1414,7 @@ func TestMaxBatching(t *testing.T) { cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() core, observedLogs := observer.New(zap.DebugLevel) operator.SugaredLogger = zap.New(core).Sugar() @@ -1501,7 +1501,7 @@ func TestReadExistingLogsWithHeader(t *testing.T) { temp := openTemp(t, tempDir) writeString(t, temp, "#headerField: headerValue\ntestlog\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -1530,7 +1530,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { cfg.DeleteAfterRead = true emitCalls := make(chan *emitParams, longFileLines+1) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() shortFile := openTemp(t, tempDir) _, err := shortFile.WriteString(string(shortFileLine) + "\n") @@ -1675,7 +1675,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { cfg.FingerprintSize = 18 cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() // Both of they will be include file1 := openTempWithPattern(t, tempDir, "*.log1") diff --git a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go new file mode 100644 index 000000000000..964a2e324c09 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package checkpoint // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" +) + +const knownFilesKey = "knownFiles" + +// Save syncs the most recent set of files to the database +func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata) error { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + + // Encode the number of known files + if err := enc.Encode(len(rmds)); err != nil { + return fmt.Errorf("encode num files: %w", err) + } + + var errs []error + // Encode each known file + for _, rmd := range rmds { + if err := enc.Encode(rmd); err != nil { + errs = append(errs, fmt.Errorf("encode metadata: %w", err)) + } + } + + if err := persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil { + errs = append(errs, fmt.Errorf("persist known files: %w", err)) + } + + return errors.Join(errs...) +} + +// Load loads the most recent set of files to the database +func Load(ctx context.Context, persister operator.Persister) ([]*reader.Metadata, error) { + encoded, err := persister.Get(ctx, knownFilesKey) + if err != nil { + return nil, err + } + + if encoded == nil { + return []*reader.Metadata{}, nil + } + + dec := json.NewDecoder(bytes.NewReader(encoded)) + + // Decode the number of entries + var knownFileCount int + if err = dec.Decode(&knownFileCount); err != nil { + return nil, fmt.Errorf("decoding file count: %w", err) + } + + // Decode each of the known files + var errs []error + rmds := make([]*reader.Metadata, 0, knownFileCount) + for i := 0; i < knownFileCount; i++ { + rmd := new(reader.Metadata) + if err = dec.Decode(rmd); err != nil { + return nil, err + } + + // Migrate readers that used FileAttributes.HeaderAttributes + // This block can be removed in a future release, tentatively v0.90.0 + if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok { + switch hat := ha.(type) { + case map[string]any: + for k, v := range hat { + rmd.FileAttributes[k] = v + } + delete(rmd.FileAttributes, "HeaderAttributes") + default: + errs = append(errs, errors.New("migrate header attributes: unexpected format")) + } + } + + // This reader won't be used for anything other than metadata reference, so just wrap the metadata + rmds = append(rmds, rmd) + } + + return rmds, errors.Join(errs...) +} diff --git a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go new file mode 100644 index 000000000000..406a8b262bd2 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go @@ -0,0 +1,149 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package checkpoint + +import ( + "bytes" + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" +) + +func TestLoadNothing(t *testing.T) { + reloaded, err := Load(context.Background(), testutil.NewUnscopedMockPersister()) + assert.NoError(t, err) + assert.Equal(t, []*reader.Metadata{}, reloaded) +} + +func TestSaveErr(t *testing.T) { + assert.Error(t, Save(context.Background(), + testutil.NewErrPersister(map[string]error{ + "knownFiles": assert.AnError, + }), []*reader.Metadata{})) +} + +func TestLoadErr(t *testing.T) { + _, err := Load(context.Background(), + testutil.NewErrPersister(map[string]error{ + "knownFiles": assert.AnError, + })) + assert.Error(t, err) +} + +func TestNopEncodingDifferentLogSizes(t *testing.T) { + testCases := []struct { + name string + rmds []*reader.Metadata + }{ + { + "empty", + []*reader.Metadata{}, + }, + { + "one", + []*reader.Metadata{ + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + }, + }, + }, + { + "two", + []*reader.Metadata{ + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + }, + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")}, + Offset: 6, + }, + }, + }, + { + "other_fields", + []*reader.Metadata{ + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + FileAttributes: map[string]interface{}{ + "hello": "world", + }, + }, + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")}, + Offset: 6, + HeaderFinalized: true, + }, + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("ab")}, + Offset: 2, + FileAttributes: map[string]interface{}{ + "hello2": "world2", + }, + HeaderFinalized: true, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := testutil.NewUnscopedMockPersister() + assert.NoError(t, Save(context.Background(), p, tc.rmds)) + reloaded, err := Load(context.Background(), p) + assert.NoError(t, err) + assert.Equal(t, tc.rmds, reloaded) + }) + } +} + +type deprecatedMetadata struct { + reader.Metadata + HeaderAttributes map[string]any +} + +func TestMigrateHeaderAttributes(t *testing.T) { + p := testutil.NewUnscopedMockPersister() + saveDeprecated(t, p, &deprecatedMetadata{ + Metadata: reader.Metadata{ + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + FileAttributes: map[string]any{ + "HeaderAttributes": map[string]any{ + "hello": "world", + }, + }, + }, + }) + reloaded, err := Load(context.Background(), p) + assert.NoError(t, err) + assert.Equal(t, []*reader.Metadata{ + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + FileAttributes: map[string]interface{}{ + "hello": "world", + }, + }, + }, reloaded) + +} + +func saveDeprecated(t *testing.T, persister operator.Persister, dep *deprecatedMetadata) { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + require.NoError(t, enc.Encode(1)) + require.NoError(t, enc.Encode(dep)) + require.NoError(t, persister.Set(context.Background(), knownFilesKey, buf.Bytes())) +} diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index ff8408003124..f627312a0ad3 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -48,7 +48,7 @@ func TestMultiFileRotate(t *testing.T) { } } - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -109,7 +109,7 @@ func TestMultiFileRotateSlow(t *testing.T) { } } - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -163,7 +163,7 @@ func TestMultiCopyTruncateSlow(t *testing.T) { } } - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -263,7 +263,7 @@ func (rt rotationTest) run(tc rotationTest, copyTruncate, sequential bool) func( expected = append(expected, []byte(fmt.Sprintf("%s %3d", baseStr, i))) } - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -358,7 +358,7 @@ func TestMoveFile(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() temp1 := openTemp(t, tempDir) writeString(t, temp1, "testlog1\n") @@ -390,7 +390,7 @@ func TestTrackMovedAwayFiles(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() temp1 := openTemp(t, tempDir) writeString(t, temp1, "testlog1\n") @@ -439,7 +439,7 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) { orginalName := originalFile.Name() writeString(t, originalFile, "testlog1\n") - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(t, operator.Stop()) }() @@ -474,7 +474,7 @@ func TestRotatedOutOfPatternMoveCreate(t *testing.T) { cfg.Include = append(cfg.Include, fmt.Sprintf("%s/*.log1", tempDir)) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() originalFile := openTempWithPattern(t, tempDir, "*.log1") originalFileName := originalFile.Name() @@ -511,7 +511,7 @@ func TestRotatedOutOfPatternCopyTruncate(t *testing.T) { cfg.Include = append(cfg.Include, fmt.Sprintf("%s/*.log1", tempDir)) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() originalFile := openTempWithPattern(t, tempDir, "*.log1") writeString(t, originalFile, "testlog1\n") @@ -551,7 +551,7 @@ func TestTruncateThenWrite(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() temp1 := openTemp(t, tempDir) writeString(t, temp1, "testlog1\ntestlog2\n") @@ -588,7 +588,7 @@ func TestCopyTruncateWriteBoth(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - operator.persister = testutil.NewMockPersister("test") + operator.persister = testutil.NewUnscopedMockPersister() temp1 := openTemp(t, tempDir) writeString(t, temp1, "testlog1\ntestlog2\n") @@ -631,7 +631,7 @@ func TestFileMovedWhileOff_BigFiles(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) - persister := testutil.NewMockPersister("test") + persister := testutil.NewUnscopedMockPersister() log1 := tokenWithLength(1000) log2 := tokenWithLength(1000) diff --git a/pkg/stanza/testutil/util.go b/pkg/stanza/testutil/util.go index 948c4406b739..eace557ebacb 100644 --- a/pkg/stanza/testutil/util.go +++ b/pkg/stanza/testutil/util.go @@ -24,17 +24,24 @@ func Logger(t testing.TB) *zap.SugaredLogger { type mockPersister struct { data map[string][]byte dataMux sync.Mutex + errKeys map[string]error } func (p *mockPersister) Get(_ context.Context, k string) ([]byte, error) { p.dataMux.Lock() defer p.dataMux.Unlock() + if _, ok := p.errKeys[k]; ok { + return nil, p.errKeys[k] + } return p.data[k], nil } func (p *mockPersister) Set(_ context.Context, k string, v []byte) error { p.dataMux.Lock() defer p.dataMux.Unlock() + if _, ok := p.errKeys[k]; ok { + return p.errKeys[k] + } p.data[k] = v return nil } @@ -42,6 +49,9 @@ func (p *mockPersister) Set(_ context.Context, k string, v []byte) error { func (p *mockPersister) Delete(_ context.Context, k string) error { p.dataMux.Lock() defer p.dataMux.Unlock() + if _, ok := p.errKeys[k]; ok { + return p.errKeys[k] + } delete(p.data, k) return nil } @@ -52,11 +62,17 @@ func NewUnscopedMockPersister() operator.Persister { return &mockPersister{data: data} } -// NewMockPersister will return a new persister for testing func NewMockPersister(scope string) operator.Persister { return operator.NewScopedPersister(scope, NewUnscopedMockPersister()) } +// NewErrPersister will return a new persister for testing +// which will return an error if any of the specified keys are used +func NewErrPersister(errKeys map[string]error) operator.Persister { + data := make(map[string][]byte) + return &mockPersister{data: data, errKeys: errKeys} +} + // Trim removes white space from the lines of a string func Trim(s string) string { lines := strings.Split(s, "\n")