forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[chore][pkg/stanza] Extract checkpointing logic into internal package (…
…open-telemetry#27775) This PR moves checkpointing logic into a dedicated package. Since we only actually save and load `reader.Metadata`, the `fileconsumer` package is still responsible for pulling `Metadata` out of each `Reader` prior to saving, and wrapping `Metadata` into a `Reader` when loading.
- Loading branch information
1 parent
b61a4b8
commit e1f7e61
Showing
4 changed files
with
278 additions
and
84 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package checkpoint // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" | ||
) | ||
|
||
const knownFilesKey = "knownFiles" | ||
|
||
// Save syncs the most recent set of files to the database | ||
func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata) error { | ||
var buf bytes.Buffer | ||
enc := json.NewEncoder(&buf) | ||
|
||
// Encode the number of known files | ||
if err := enc.Encode(len(rmds)); err != nil { | ||
return fmt.Errorf("encode num files: %w", err) | ||
} | ||
|
||
var errs []error | ||
// Encode each known file | ||
for _, rmd := range rmds { | ||
if err := enc.Encode(rmd); err != nil { | ||
errs = append(errs, fmt.Errorf("encode metadata: %w", err)) | ||
} | ||
} | ||
|
||
if err := persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil { | ||
errs = append(errs, fmt.Errorf("persist known files: %w", err)) | ||
} | ||
|
||
return errors.Join(errs...) | ||
} | ||
|
||
// Load loads the most recent set of files to the database | ||
func Load(ctx context.Context, persister operator.Persister) ([]*reader.Metadata, error) { | ||
encoded, err := persister.Get(ctx, knownFilesKey) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if encoded == nil { | ||
return []*reader.Metadata{}, nil | ||
} | ||
|
||
dec := json.NewDecoder(bytes.NewReader(encoded)) | ||
|
||
// Decode the number of entries | ||
var knownFileCount int | ||
if err = dec.Decode(&knownFileCount); err != nil { | ||
return nil, fmt.Errorf("decoding file count: %w", err) | ||
} | ||
|
||
// Decode each of the known files | ||
var errs []error | ||
rmds := make([]*reader.Metadata, 0, knownFileCount) | ||
for i := 0; i < knownFileCount; i++ { | ||
rmd := new(reader.Metadata) | ||
if err = dec.Decode(rmd); err != nil { | ||
return nil, err | ||
} | ||
|
||
// Migrate readers that used FileAttributes.HeaderAttributes | ||
// This block can be removed in a future release, tentatively v0.90.0 | ||
if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok { | ||
switch hat := ha.(type) { | ||
case map[string]any: | ||
for k, v := range hat { | ||
rmd.FileAttributes[k] = v | ||
} | ||
delete(rmd.FileAttributes, "HeaderAttributes") | ||
default: | ||
errs = append(errs, errors.New("migrate header attributes: unexpected format")) | ||
} | ||
} | ||
|
||
// This reader won't be used for anything other than metadata reference, so just wrap the metadata | ||
rmds = append(rmds, rmd) | ||
} | ||
|
||
return rmds, errors.Join(errs...) | ||
} |
149 changes: 149 additions & 0 deletions
149
pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package checkpoint | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" | ||
) | ||
|
||
func TestLoadNothing(t *testing.T) { | ||
reloaded, err := Load(context.Background(), testutil.NewUnscopedMockPersister()) | ||
assert.NoError(t, err) | ||
assert.Equal(t, []*reader.Metadata{}, reloaded) | ||
} | ||
|
||
func TestSaveErr(t *testing.T) { | ||
assert.Error(t, Save(context.Background(), | ||
testutil.NewErrPersister(map[string]error{ | ||
"knownFiles": assert.AnError, | ||
}), []*reader.Metadata{})) | ||
} | ||
|
||
func TestLoadErr(t *testing.T) { | ||
_, err := Load(context.Background(), | ||
testutil.NewErrPersister(map[string]error{ | ||
"knownFiles": assert.AnError, | ||
})) | ||
assert.Error(t, err) | ||
} | ||
|
||
func TestNopEncodingDifferentLogSizes(t *testing.T) { | ||
testCases := []struct { | ||
name string | ||
rmds []*reader.Metadata | ||
}{ | ||
{ | ||
"empty", | ||
[]*reader.Metadata{}, | ||
}, | ||
{ | ||
"one", | ||
[]*reader.Metadata{ | ||
{ | ||
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, | ||
Offset: 3, | ||
}, | ||
}, | ||
}, | ||
{ | ||
"two", | ||
[]*reader.Metadata{ | ||
{ | ||
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, | ||
Offset: 3, | ||
}, | ||
{ | ||
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")}, | ||
Offset: 6, | ||
}, | ||
}, | ||
}, | ||
{ | ||
"other_fields", | ||
[]*reader.Metadata{ | ||
{ | ||
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, | ||
Offset: 3, | ||
FileAttributes: map[string]interface{}{ | ||
"hello": "world", | ||
}, | ||
}, | ||
{ | ||
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")}, | ||
Offset: 6, | ||
HeaderFinalized: true, | ||
}, | ||
{ | ||
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("ab")}, | ||
Offset: 2, | ||
FileAttributes: map[string]interface{}{ | ||
"hello2": "world2", | ||
}, | ||
HeaderFinalized: true, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
p := testutil.NewUnscopedMockPersister() | ||
assert.NoError(t, Save(context.Background(), p, tc.rmds)) | ||
reloaded, err := Load(context.Background(), p) | ||
assert.NoError(t, err) | ||
assert.Equal(t, tc.rmds, reloaded) | ||
}) | ||
} | ||
} | ||
|
||
type deprecatedMetadata struct { | ||
reader.Metadata | ||
HeaderAttributes map[string]any | ||
} | ||
|
||
func TestMigrateHeaderAttributes(t *testing.T) { | ||
p := testutil.NewUnscopedMockPersister() | ||
saveDeprecated(t, p, &deprecatedMetadata{ | ||
Metadata: reader.Metadata{ | ||
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, | ||
Offset: 3, | ||
FileAttributes: map[string]any{ | ||
"HeaderAttributes": map[string]any{ | ||
"hello": "world", | ||
}, | ||
}, | ||
}, | ||
}) | ||
reloaded, err := Load(context.Background(), p) | ||
assert.NoError(t, err) | ||
assert.Equal(t, []*reader.Metadata{ | ||
{ | ||
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, | ||
Offset: 3, | ||
FileAttributes: map[string]interface{}{ | ||
"hello": "world", | ||
}, | ||
}, | ||
}, reloaded) | ||
|
||
} | ||
|
||
func saveDeprecated(t *testing.T, persister operator.Persister, dep *deprecatedMetadata) { | ||
var buf bytes.Buffer | ||
enc := json.NewEncoder(&buf) | ||
require.NoError(t, enc.Encode(1)) | ||
require.NoError(t, enc.Encode(dep)) | ||
require.NoError(t, persister.Set(context.Background(), knownFilesKey, buf.Bytes())) | ||
} |
Oops, something went wrong.