Skip to content

Commit

Permalink
[chore][pkg/stanza]: add new fileset package (open-telemetry#30550)
Browse files Browse the repository at this point in the history
**Description:** Following up from
open-telemetry#30219.
Adding a new package for fileset.

**Link to tracking Issue:**
[29273](open-telemetry#29273 (comment))

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
VihasMakwana authored and cparkins committed Feb 1, 2024
1 parent 151d26e commit 60b8672
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 0 deletions.
81 changes: 81 additions & 0 deletions pkg/stanza/fileconsumer/internal/fileset/fileset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileset // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"

import (
"errors"

"golang.org/x/exp/slices"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

var errFilesetEmpty = errors.New("pop() on empty Fileset")

var (
_ Matchable = (*reader.Reader)(nil)
_ Matchable = (*reader.Metadata)(nil)
)

type Matchable interface {
GetFingerprint() *fingerprint.Fingerprint
}

type Fileset[T Matchable] struct {
readers []T
}

func New[T Matchable](capacity int) *Fileset[T] {
return &Fileset[T]{readers: make([]T, 0, capacity)}
}

func (set *Fileset[T]) Len() int {
return len(set.readers)
}

func (set *Fileset[T]) Get() []T {
return set.readers
}

func (set *Fileset[T]) Pop() (T, error) {
// return first element from the array and remove it
var val T
if len(set.readers) == 0 {
return val, errFilesetEmpty
}
r := set.readers[0]
set.readers = slices.Delete(set.readers, 0, 1)
return r, nil
}

func (set *Fileset[T]) Add(readers ...T) {
// add open readers
set.readers = append(set.readers, readers...)
}

func (set *Fileset[T]) Clear() {
// clear the underlying readers
set.readers = make([]T, 0, cap(set.readers))
}

func (set *Fileset[T]) Reset(readers ...T) []T {
// empty the underlying set and return the old array
arr := make([]T, len(set.readers))
copy(arr, set.readers)
set.Clear()
set.readers = append(set.readers, readers...)
return arr
}

func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint) T {
var val T
for idx, r := range set.readers {
if fp.StartsWith(r.GetFingerprint()) {
set.readers = append(set.readers[:idx], set.readers[idx+1:]...)
return r
}
}
return val
}
123 changes: 123 additions & 0 deletions pkg/stanza/fileconsumer/internal/fileset/fileset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileset // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type test[T Matchable] struct {
name string
fileset *Fileset[T]
ops []func(t *testing.T, fileset *Fileset[T])
}

func (t *test[T]) init() {
t.fileset = New[T](10)
}

func push[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) {
return func(t *testing.T, fileset *Fileset[T]) {
pr := fileset.Len()
fileset.Add(ele...)
require.Equal(t, pr+len(ele), fileset.Len())
}
}

func pop[T Matchable](expectedErr error, expectedElemet T) func(t *testing.T, fileset *Fileset[T]) {
return func(t *testing.T, fileset *Fileset[T]) {
el, err := fileset.Pop()
if expectedErr == nil {
require.NoError(t, err)
require.Equal(t, el, expectedElemet)
} else {
require.ErrorIs(t, err, expectedErr)
}
}
}

func reset[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) {
return func(t *testing.T, fileset *Fileset[T]) {
fileset.Reset(ele...)
require.Equal(t, fileset.Len(), len(ele))
}
}

func match[T Matchable](ele T, expect bool) func(t *testing.T, fileset *Fileset[T]) {
return func(t *testing.T, fileset *Fileset[T]) {
pr := fileset.Len()
r := fileset.Match(ele.GetFingerprint())
if expect {
require.NotNil(t, r)
require.Equal(t, pr-1, fileset.Len())
} else {
require.Nil(t, r)
require.Equal(t, pr, fileset.Len())
}

}
}

func newFingerprint(bytes []byte) *fingerprint.Fingerprint {
return &fingerprint.Fingerprint{
FirstBytes: bytes,
}
}
func newMetadata(bytes []byte) *reader.Metadata {
return &reader.Metadata{
Fingerprint: newFingerprint(bytes),
}
}

func newReader(bytes []byte) *reader.Reader {
return &reader.Reader{
Metadata: newMetadata(bytes),
}
}

func TestFilesetReader(t *testing.T) {
testCases := []test[*reader.Reader]{
{
name: "test_match_push_reset",
ops: []func(t *testing.T, fileset *Fileset[*reader.Reader]){
push(newReader([]byte("ABCDEF")), newReader([]byte("QWERT"))),

// match() removes the matched item and returns it
match(newReader([]byte("ABCDEFGHI")), true),
match(newReader([]byte("ABCEFGHI")), false),

reset(newReader([]byte("XYZ"))),
match(newReader([]byte("ABCDEF")), false),
match(newReader([]byte("QWERT")), false),
match(newReader([]byte("XYZabc")), true),
},
},
{
name: "test_pop",
ops: []func(t *testing.T, fileset *Fileset[*reader.Reader]){
push(newReader([]byte("ABCDEF")), newReader([]byte("QWERT"))),
pop(nil, newReader([]byte("ABCDEF"))),
pop(nil, newReader([]byte("QWERT"))),
pop(errFilesetEmpty, newReader([]byte(""))),

reset(newReader([]byte("XYZ"))),
pop(nil, newReader([]byte("XYZ"))),
pop(errFilesetEmpty, newReader([]byte(""))),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tc.init()
for _, op := range tc.ops {
op(t, tc.fileset)
}
})
}
}
4 changes: 4 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,7 @@ func (r *Reader) Validate() bool {
}
return false
}

func (m Metadata) GetFingerprint() *fingerprint.Fingerprint {
return m.Fingerprint
}
1 change: 1 addition & 0 deletions pkg/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230711023510-fffb14384f22
golang.org/x/sys v0.16.0
golang.org/x/text v0.14.0
gonum.org/v1/gonum v0.14.0
Expand Down
1 change: 1 addition & 0 deletions pkg/stanza/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 60b8672

Please sign in to comment.