Skip to content

Commit 2ddb445

Browse files
belimawrCopilot
andauthored
Add tests for Filestream's fileWatcher to test the rotation using the copy-truncate strategy (#50442)
GenAI-Assisted: Yes Human-Reviewed: Yes Tool: Cursor-CLI, Model: GPT-5.3 Codex High Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent 79a0485 commit 2ddb445

1 file changed

Lines changed: 201 additions & 0 deletions

File tree

filebeat/input/filestream/fswatch_test.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
conf "github.com/elastic/elastic-agent-libs/config"
4040
"github.com/elastic/elastic-agent-libs/logp"
4141
"github.com/elastic/elastic-agent-libs/logp/logptest"
42+
"github.com/elastic/elastic-agent-libs/testing/fs"
4243
)
4344

4445
func TestFileWatcher(t *testing.T) {
@@ -455,6 +456,206 @@ scanner:
455456
})
456457
}
457458

459+
func TestFileWatcherCopyTruncateWithFingerprint(t *testing.T) {
460+
t.Run("copy truncate happens at once", func(t *testing.T) {
461+
w, activePath, rotatedPath := newFileWatcherForCopyTruncateTests(t)
462+
ctx := context.Background()
463+
464+
// 1. A single file exists
465+
initialContent := strings.Repeat("a", 96)
466+
require.NoError(t, os.WriteFile(activePath, []byte(initialContent), 0o600), "failed to write initial active file")
467+
w.watch(ctx)
468+
469+
initialEvents := drainPendingFSEvents(w.events)
470+
requireEventSignatures(t, initialEvents, []loginp.FSEvent{
471+
{Op: loginp.OpCreate, NewPath: activePath},
472+
})
473+
initialCreateEvt := findEvent(initialEvents, loginp.FSEvent{Op: loginp.OpCreate, NewPath: activePath})
474+
initialFingerprint := initialCreateEvt.Descriptor.Fingerprint
475+
require.NotEmpty(t, initialFingerprint, "initial active file fingerprint must be present")
476+
477+
// 2. Copy+truncate:
478+
// - copy foo.log -> foo.log.1
479+
// - truncate foo.log and add data (less than previously)
480+
copyFile(t, activePath, rotatedPath)
481+
require.NoError(t, os.WriteFile(activePath, []byte(strings.Repeat("b", 64)), 0o600), "failed to rewrite active file after rotation")
482+
w.watch(ctx)
483+
484+
events := drainPendingFSEvents(w.events)
485+
requireEventSignatures(t, events, []loginp.FSEvent{
486+
{Op: loginp.OpRename, OldPath: activePath, NewPath: rotatedPath},
487+
{Op: loginp.OpCreate, NewPath: activePath},
488+
})
489+
490+
renamedEvt := findEvent(events, loginp.FSEvent{Op: loginp.OpRename, OldPath: activePath, NewPath: rotatedPath})
491+
createdActiveEvt := findEvent(events, loginp.FSEvent{Op: loginp.OpCreate, NewPath: activePath})
492+
require.Equal(t, initialFingerprint, renamedEvt.Descriptor.Fingerprint, "rotated file should keep initial fingerprint")
493+
require.NotEqual(t, initialFingerprint, createdActiveEvt.Descriptor.Fingerprint, "rewritten active file should get a new fingerprint")
494+
})
495+
496+
t.Run("copy truncate happens in two steps", func(t *testing.T) {
497+
w, activePath, rotatedPath := newFileWatcherForCopyTruncateTests(t)
498+
ctx := context.Background()
499+
500+
// 1. A single file exists
501+
initialContent := strings.Repeat("c", 96)
502+
require.NoError(t, os.WriteFile(activePath, []byte(initialContent), 0o600), "failed to write initial active file")
503+
w.watch(ctx)
504+
505+
initialEvents := drainPendingFSEvents(w.events)
506+
requireEventSignatures(t, initialEvents, []loginp.FSEvent{
507+
{Op: loginp.OpCreate, NewPath: activePath},
508+
})
509+
initialCreateEvt := findEvent(initialEvents, loginp.FSEvent{Op: loginp.OpCreate, NewPath: activePath})
510+
initialFingerprint := initialCreateEvt.Descriptor.Fingerprint
511+
require.NotEmpty(t, initialFingerprint, "initial active file fingerprint must be present")
512+
513+
// 2. The file is copied: foo.log -> foo.log.1
514+
copyFile(t, activePath, rotatedPath)
515+
w.watch(ctx)
516+
517+
// Expectation: no file events, because both files are considered the same
518+
copyStepEvents := drainPendingFSEvents(w.events)
519+
require.Empty(t, copyStepEvents, "no file events when a file is copied (same fingerprint)")
520+
requireEventSignatures(t, copyStepEvents, []loginp.FSEvent{})
521+
522+
// 3. foo.log is truncated & written to (less data than before).
523+
require.NoError(t, os.WriteFile(activePath, []byte(strings.Repeat("d", 64)), 0o600), "failed to truncate and rewrite active file")
524+
w.watch(ctx)
525+
526+
// Expectation: 'foo.log' is considered new and 'foo.log.1' is considered a rename
527+
truncateStepEvents := drainPendingFSEvents(w.events)
528+
requireEventSignatures(t, truncateStepEvents, []loginp.FSEvent{
529+
{Op: loginp.OpCreate, NewPath: activePath},
530+
{Op: loginp.OpRename, OldPath: activePath, NewPath: rotatedPath},
531+
})
532+
})
533+
534+
t.Run("copy truncate happens in three steps", func(t *testing.T) {
535+
w, activePath, rotatedPath := newFileWatcherForCopyTruncateTests(t)
536+
ctx := context.Background()
537+
538+
// 1. A single file exists
539+
initialContent := strings.Repeat("e", 96)
540+
require.NoError(t, os.WriteFile(activePath, []byte(initialContent), 0o600), "failed to write initial active file")
541+
w.watch(ctx)
542+
543+
initialEvents := drainPendingFSEvents(w.events)
544+
requireEventSignatures(t, initialEvents, []loginp.FSEvent{
545+
{Op: loginp.OpCreate, NewPath: activePath},
546+
})
547+
initialCreateEvt := findEvent(initialEvents, loginp.FSEvent{Op: loginp.OpCreate, NewPath: activePath})
548+
initialFingerprint := initialCreateEvt.Descriptor.Fingerprint
549+
require.NotEmpty(t, initialFingerprint, "initial active file fingerprint must be present")
550+
551+
// 2. The file is copied: foo.log -> foo.log.1
552+
copyFile(t, activePath, rotatedPath)
553+
w.watch(ctx)
554+
555+
// Expectation: no file events, because both files are considered the same
556+
copyStepEvents := drainPendingFSEvents(w.events)
557+
require.Empty(t, copyStepEvents, "no file events when a file is copied (same fingerprint)")
558+
requireEventSignatures(t, copyStepEvents, []loginp.FSEvent{})
559+
560+
// 3. foo.log is truncated (0 bytes)
561+
require.NoError(t, os.WriteFile(activePath, nil, 0o600), "failed to truncate active file to empty")
562+
w.watch(ctx)
563+
564+
// Expectation: foo.log is considered renamed: foo.log -> foo.log.1
565+
// the empty file foo.log is ignored because it is empty
566+
emptyStepEvents := drainPendingFSEvents(w.events)
567+
requireEventSignatures(t, emptyStepEvents, []loginp.FSEvent{
568+
{Op: loginp.OpRename, OldPath: activePath, NewPath: rotatedPath},
569+
})
570+
571+
// 4. data is added to foo.log
572+
require.NoError(t, os.WriteFile(activePath, []byte(strings.Repeat("f", 64)), 0o600), "failed to add new data to active file")
573+
w.watch(ctx)
574+
575+
// Expectation: foo.log is discovered as a new file
576+
newDataStepEvents := drainPendingFSEvents(w.events)
577+
requireEventSignatures(t, newDataStepEvents, []loginp.FSEvent{
578+
{Op: loginp.OpCreate, NewPath: activePath},
579+
})
580+
newActiveEvt := findEvent(newDataStepEvents, loginp.FSEvent{Op: loginp.OpCreate, NewPath: activePath})
581+
require.NotEqual(t, initialFingerprint, newActiveEvt.Descriptor.Fingerprint, "newly recreated active file should have a different fingerprint")
582+
})
583+
}
584+
585+
// newFileWatcherForCopyTruncateTests returns a file watcher configured to
586+
// harvest rotated files and two file paths used for rotation.
587+
func newFileWatcherForCopyTruncateTests(t *testing.T) (watcher *fileWatcher, activePath string, rotatedPath string) {
588+
dir := fs.TempDir(t, "..", "..", "build")
589+
activePath = filepath.Join(dir, "foo.log")
590+
rotatedPath = filepath.Join(dir, "foo.log.1")
591+
paths := []string{filepath.Join(dir, "foo.log*")}
592+
cfgStr := `
593+
scanner:
594+
check_interval: 10ms
595+
fingerprint:
596+
length: 64
597+
`
598+
599+
logger := logptest.NewFileLogger(t, dir)
600+
w := createWatcherWithConfig(t, logger.Logger, paths, cfgStr)
601+
w.events = make(chan loginp.FSEvent, 16)
602+
return w, activePath, rotatedPath
603+
}
604+
605+
func copyFile(t *testing.T, from, to string) {
606+
t.Helper()
607+
608+
content, err := os.ReadFile(from)
609+
require.NoError(t, err, "failed to read source file %q", from)
610+
require.NoError(t, os.WriteFile(to, content, 0o600), "failed to write destination file %q", to)
611+
}
612+
613+
// fsEventToString returns a stable string representation that includes
614+
// only the fields: Op, OldPath and NewPath. The returned string is
615+
// human-friendly.
616+
func fsEventToString(e loginp.FSEvent) string {
617+
return fmt.Sprintf("Op: '%s'|OldPath: '%s'|NewPath: '%s'", e.Op, e.OldPath, e.NewPath)
618+
}
619+
620+
func requireEventSignatures(t *testing.T, events, expected []loginp.FSEvent) {
621+
t.Helper()
622+
623+
actualKeys := make([]string, 0, len(events))
624+
for _, e := range events {
625+
actualKeys = append(actualKeys, fsEventToString(e))
626+
}
627+
628+
expectedKeys := make([]string, 0, len(expected))
629+
for _, e := range expected {
630+
expectedKeys = append(expectedKeys, fsEventToString(e))
631+
}
632+
633+
require.ElementsMatch(t, expectedKeys, actualKeys, "unexpected file watcher events (order ignored)")
634+
}
635+
636+
// findEvent finds expected in events by comparing Op, OldPath and NewPath.
637+
func findEvent(events []loginp.FSEvent, expected loginp.FSEvent) loginp.FSEvent {
638+
for _, e := range events {
639+
if e.Op == expected.Op && e.OldPath == expected.OldPath && e.NewPath == expected.NewPath {
640+
return e
641+
}
642+
}
643+
return loginp.FSEvent{}
644+
}
645+
646+
func drainPendingFSEvents(events <-chan loginp.FSEvent) []loginp.FSEvent {
647+
drained := make([]loginp.FSEvent, 0)
648+
649+
for {
650+
select {
651+
case e := <-events:
652+
drained = append(drained, e)
653+
default:
654+
return drained
655+
}
656+
}
657+
}
658+
458659
func TestFileScanner(t *testing.T) {
459660
dir := t.TempDir()
460661
dir2 := t.TempDir() // for symlink testing

0 commit comments

Comments
 (0)