Skip to content

Commit

Permalink
[pkg/stanza] add symlink related test for fileconsumer (#33428)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

#32292
was closed due to inactivity. This PR resumes from there.

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>


Co-authored-by: Shaunak Kashyap <ycombinator@gmail.com>

---------

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
Co-authored-by: Shaunak Kashyap <ycombinator@gmail.com>
  • Loading branch information
ChrsMark and ycombinator authored Jun 10, 2024
1 parent 18d0d39 commit 35fd68a
Showing 1 changed file with 60 additions and 0 deletions.
60 changes: 60 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,54 @@ func TestStartAtEnd(t *testing.T) {
sink.ExpectToken(t, []byte("testlog2"))
}

// TestSymlinkedFiles tests reading from a single file that's actually a symlink
// to another file, while the symlink target is changed frequently, reads all
// the logs from all the files ever targeted by that symlink.
func TestSymlinkedFiles(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Time sensitive tests disabled for now on Windows. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32715#issuecomment-2107737828")
}

t.Parallel()

// Create 30 files with a predictable naming scheme, each containing
// 100 log lines.
const numFiles = 30
const logLinesPerFile = 100
const pollInterval = 10 * time.Millisecond
tempDir := t.TempDir()
expectedTokens := [][]byte{}
for i := 1; i <= numFiles; i++ {
expectedTokensBatch := symlinkTestCreateLogFile(t, tempDir, i, logLinesPerFile)
expectedTokens = append(expectedTokens, expectedTokensBatch...)
}

targetTempDir := t.TempDir()
symlinkFilePath := filepath.Join(targetTempDir, "sym.log")
cfg := NewConfig().includeDir(targetTempDir)
cfg.StartAt = "beginning"
cfg.PollInterval = pollInterval
sink := emittest.NewSink(emittest.WithCallBuffer(numFiles * logLinesPerFile))
operator := testManagerWithSink(t, cfg, sink)

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

sink.ExpectNoCalls(t)

// Create and update symlink to each of the files over time.
for i := 1; i <= numFiles; i++ {
targetLogFilePath := filepath.Join(tempDir, fmt.Sprintf("%d.log", i))
require.NoError(t, os.Symlink(targetLogFilePath, symlinkFilePath))
// The sleep time here must be larger than the poll_interval value
time.Sleep(pollInterval + 1*time.Millisecond)
require.NoError(t, os.Remove(symlinkFilePath))
}
sink.ExpectTokens(t, expectedTokens...)
}

// StartAtEndNewFile tests that when `start_at` is configured to `end`,
// a file created after the operator has been started is read from the
// beginning
Expand Down Expand Up @@ -1468,3 +1516,15 @@ func TestNoTracking(t *testing.T) {
})
}
}

func symlinkTestCreateLogFile(t *testing.T, tempDir string, fileIdx, numLogLines int) (tokens [][]byte) {
logFilePath := fmt.Sprintf("%s/%d.log", tempDir, fileIdx)
temp1 := filetest.OpenFile(t, logFilePath)
for i := 0; i < numLogLines; i++ {
msg := fmt.Sprintf("[fileIdx %2d] This is a simple log line with the number %3d", fileIdx, i)
filetest.WriteString(t, temp1, msg+"\n")
tokens = append(tokens, []byte(msg))
}
temp1.Close()
return tokens
}

0 comments on commit 35fd68a

Please sign in to comment.