Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/filelog] Implement ExcludeOlderThan matcher criterion #31916

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/receiver-filelog-exclude-older-than.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add `exclude_older_than` configuration setting"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31053]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
40 changes: 40 additions & 0 deletions pkg/stanza/fileconsumer/matcher/internal/filter/exclude.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package filter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/filter"
import (
"os"
"time"

"go.uber.org/multierr"
)

type excludeOlderThanOption struct {
age time.Duration
}

func (eot excludeOlderThanOption) apply(items []*item) ([]*item, error) {
filteredItems := make([]*item, 0, len(items))
var errs error
for _, item := range items {
fi, err := os.Stat(item.value)
if err != nil {
errs = multierr.Append(errs, err)
continue
}

// Keep (include) the file if its age (since last modification)
// is the same or less than the configured age.
fileAge := time.Since(fi.ModTime())
if fileAge <= eot.age {
filteredItems = append(filteredItems, item)
}
}

return filteredItems, errs
}

// ExcludeOlderThan excludes files whose modification time is older than the specified age.
func ExcludeOlderThan(age time.Duration) Option {
return excludeOlderThanOption{age: age}
}
110 changes: 110 additions & 0 deletions pkg/stanza/fileconsumer/matcher/internal/filter/exclude_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package filter

import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestExcludeOlderThanFilter(t *testing.T) {
twoHoursAgo := time.Now().Add(-2 * time.Hour)
threeHoursAgo := twoHoursAgo.Add(-1 * time.Hour)

cases := map[string]struct {
files []string
fileMTimes []time.Time
excludeOlderThan time.Duration

expect []string
expectedErr string
}{
"no_files": {
files: []string{},
fileMTimes: []time.Time{},
excludeOlderThan: 2 * time.Hour,

expect: []string{},
expectedErr: "",
},
"exclude_no_files": {
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{twoHoursAgo, twoHoursAgo},
excludeOlderThan: 3 * time.Hour,

expect: []string{"a.log", "b.log"},
expectedErr: "",
},
"exclude_some_files": {
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{twoHoursAgo, threeHoursAgo},
excludeOlderThan: 3 * time.Hour,
Comment on lines +45 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a followup PR but I think the test is set up a bit strange here because we're comparing "exactly 3 hours" to "3 hour + trivial time" (because we initialize threeHoursAgo above and compare it just a few cycles later.) I think we should instead make this an unambiguous comparison and consider ways to test the exactly equal case.


expect: []string{"a.log"},
expectedErr: "",
},
"exclude_all_files": {
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{twoHoursAgo, threeHoursAgo},
excludeOlderThan: 90 * time.Minute,

expect: []string{},
expectedErr: "",
},
"file_not_present": {
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{twoHoursAgo, {}},
excludeOlderThan: 3 * time.Hour,

expect: []string{"a.log"},
expectedErr: "b.log: no such file or directory",
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
tmpDir := t.TempDir()
var items []*item
// Create files with specified mtime
for i, file := range tc.files {
mtime := tc.fileMTimes[i]
fullPath := filepath.Join(tmpDir, file)

// Only create file if mtime is specified
if !mtime.IsZero() {
f, err := os.Create(fullPath)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, os.Chtimes(fullPath, twoHoursAgo, mtime))
}

it, err := newItem(fullPath, nil)
require.NoError(t, err)

items = append(items, it)
}

f := ExcludeOlderThan(tc.excludeOlderThan)
result, err := f.apply(items)
if tc.expectedErr != "" {
require.ErrorContains(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
}

relativeResult := make([]string, 0, len(result))
for _, r := range result {
rel, err := filepath.Rel(tmpDir, r.value)
require.NoError(t, err)
relativeResult = append(relativeResult, rel)
}

require.Equal(t, tc.expect, relativeResult)
})
}
}
43 changes: 25 additions & 18 deletions pkg/stanza/fileconsumer/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"regexp"
"time"

"go.opentelemetry.io/collector/featuregate"

Expand All @@ -33,8 +34,12 @@ var mtimeSortTypeFeatureGate = featuregate.GlobalRegistry().MustRegister(
)

type Criteria struct {
Include []string `mapstructure:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty"`
Include []string `mapstructure:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty"`

// ExcludeOlderThan allows excluding files whose modification time is older
// than the specified age.
ExcludeOlderThan time.Duration `mapstructure:"exclude_older_than"`
OrderingCriteria OrderingCriteria `mapstructure:"ordering_criteria,omitempty"`
}

Expand Down Expand Up @@ -66,11 +71,17 @@ func New(c Criteria) (*Matcher, error) {
return nil, fmt.Errorf("exclude: %w", err)
}

m := &Matcher{
include: c.Include,
exclude: c.Exclude,
}

if c.ExcludeOlderThan != 0 {
m.filterOpts = append(m.filterOpts, filter.ExcludeOlderThan(c.ExcludeOlderThan))
}

if len(c.OrderingCriteria.SortBy) == 0 {
return &Matcher{
include: c.Include,
exclude: c.Exclude,
}, nil
return m, nil
}

if c.OrderingCriteria.TopN < 0 {
Expand All @@ -80,6 +91,7 @@ func New(c Criteria) (*Matcher, error) {
if c.OrderingCriteria.TopN == 0 {
c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN
}
m.topN = c.OrderingCriteria.TopN

var regex *regexp.Regexp
if orderingCriteriaNeedsRegex(c.OrderingCriteria.SortBy) {
Expand All @@ -92,46 +104,41 @@ func New(c Criteria) (*Matcher, error) {
if err != nil {
return nil, fmt.Errorf("compile regex: %w", err)
}

m.regex = regex
}

var filterOpts []filter.Option
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
for _, sc := range c.OrderingCriteria.SortBy {
switch sc.SortType {
case sortTypeNumeric:
f, err := filter.SortNumeric(sc.RegexKey, sc.Ascending)
if err != nil {
return nil, fmt.Errorf("numeric sort: %w", err)
}
filterOpts = append(filterOpts, f)
m.filterOpts = append(m.filterOpts, f)
case sortTypeAlphabetical:
f, err := filter.SortAlphabetical(sc.RegexKey, sc.Ascending)
if err != nil {
return nil, fmt.Errorf("alphabetical sort: %w", err)
}
filterOpts = append(filterOpts, f)
m.filterOpts = append(m.filterOpts, f)
case sortTypeTimestamp:
f, err := filter.SortTemporal(sc.RegexKey, sc.Ascending, sc.Layout, sc.Location)
if err != nil {
return nil, fmt.Errorf("timestamp sort: %w", err)
}
filterOpts = append(filterOpts, f)
m.filterOpts = append(m.filterOpts, f)
case sortTypeMtime:
if !mtimeSortTypeFeatureGate.IsEnabled() {
return nil, fmt.Errorf("the %q feature gate must be enabled to use %q sort type", mtimeSortTypeFeatureGate.ID(), sortTypeMtime)
}
filterOpts = append(filterOpts, filter.SortMtime())
m.filterOpts = append(m.filterOpts, filter.SortMtime())
default:
return nil, fmt.Errorf("'sort_type' must be specified")
}
}

return &Matcher{
include: c.Include,
exclude: c.Exclude,
regex: regex,
topN: c.OrderingCriteria.TopN,
filterOpts: filterOpts,
}, nil
return m, nil
}

// orderingCriteriaNeedsRegex returns true if any of the sort options require a regex to be set.
Expand Down
8 changes: 8 additions & 0 deletions pkg/stanza/fileconsumer/matcher/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -209,6 +210,13 @@ func TestNew(t *testing.T) {
},
expectedErr: `the "filelog.mtimeSortType" feature gate must be enabled to use "mtime" sort type`,
},
{
name: "ExcludeOlderThan",
criteria: Criteria{
Include: []string{"*.log"},
ExcludeOlderThan: 24 * time.Hour,
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ Tails and parses logs from files.
| Field | Default | Description |
|-------------------------------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `include` | required | A list of file glob patterns that match the file paths to be read. |
| `exclude` | [] | A list of file glob patterns to exclude from reading. This is applied against the paths matched by `include`. |
| `exclude` | [] | A list of file glob patterns to exclude from reading. This is applied against the paths matched by `include`. |
| `exclude_older_than` | | Exclude files whose modification time is older than the specified [age](#time-parameters). |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. |
| `multiline` | | A `multiline` configuration block. See [below](#multiline-configuration) for more details. |
| `force_flush_period` | `500ms` | [Time](#time-parameters) since last read of data from file, after which currently buffered log should be send to pipeline. A value of `0` will disable forced flushing. |
Expand Down