Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions changelog/fragments/1778107949-fix-disk-queue-initialisation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: Initialize disk queue frame IDs from persisted state

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: filebeat

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/acks.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newDiskQueueACKs(
) *diskQueueACKs {
return &diskQueueACKs{
logger: logger,
nextFrameID: 0,
nextFrameID: frameID(position.frameIndex),
nextPosition: position,
frameSize: make(map[frameID]uint64),
segmentBoundaries: make(map[frameID]*queueSegment),
Expand Down
2 changes: 0 additions & 2 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,6 @@ func (dq *diskQueue) maybeReadPending() {
// the reading position to the end of the segment header.
// The first segment we read might not have the initial nextReadPosition
// set to 0 if it was already partially read on a previous run.
// However that can only happen when nextReadFrameID == 0, so in that
// case firstFrameID is already initialized to the correct value.
segment.firstFrameID = dq.segments.nextReadFrameID
dq.segments.nextReadPosition = segment.headerSize()
}
Expand Down
1 change: 1 addition & 0 deletions libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func NewQueue(
reading: initialSegments,
acked: ackedSegments,
nextID: nextSegmentID,
nextReadFrameID: frameID(nextReadPosition.frameIndex),
nextReadPosition: nextReadPosition.byteIndex,
},

Expand Down
132 changes: 132 additions & 0 deletions libbeat/publisher/queue/diskqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@ package diskqueue
import (
"flag"
"math/rand/v2"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
"github.com/elastic/elastic-agent-libs/testing/fs"
)

var seed int64
Expand Down Expand Up @@ -91,3 +98,128 @@ func (t testQueue) Close(force bool) error {
err := t.diskQueue.Close(force)
return err
}

func TestQueueDoesNotReplayLastEventAfterRestart(t *testing.T) {
workDir := fs.TempDir(t, "..", "..", "..", "build", "integration-tests")
diskQueuePath := filepath.Join(workDir, "queue")
settings := DefaultSettings()
settings.Path = diskQueuePath
// Keep segment size small enough to produce multiple segments quickly.
settings.MaxSegmentSize = 4 * 1024

fileLogger := logptest.NewFileLogger(t, workDir)

// Run 1: publish and ACK two events.
run1Queue, err := NewQueue(fileLogger.Logger, nil, settings, nil, &paths.Path{})
require.NoError(t, err, "run1 queue should be created successfully")

run1Producer := run1Queue.Producer(queue.ProducerConfig{})
publishAndACKSingleEvent(t, run1Queue, run1Producer, "event-1")
publishAndACKSingleEvent(t, run1Queue, run1Producer, "event-2")
run1Producer.Close()
closeQueueAndWait(t, run1Queue)

// Run 2: reopen queue, publish one event and ACK it.
run2Queue, err := NewQueue(fileLogger.Logger, nil, settings, nil, &paths.Path{})
require.NoError(t, err, "run2 queue should be created successfully")

run2Producer := run2Queue.Producer(queue.ProducerConfig{})
publishAndACKSingleEvent(t, run2Queue, run2Producer, "event-3")
run2Producer.Close()
requireSegmentFiles(t, diskQueuePath, 2)
closeQueueAndWait(t, run2Queue)
Comment thread
belimawr marked this conversation as resolved.

// Run 3: reopen queue without publishing a new event. Correct behavior is
// that no event is replayed. This used to fail with the last event being
// replayed.
run3Queue, err := NewQueue(fileLogger.Logger, nil, settings, nil, &paths.Path{})
require.NoError(t, err, "run3 queue should be created successfully")

replayedBatch := readBatch(t, run3Queue, 3*time.Second)
if replayedBatch != nil {
count := replayedBatch.Count()
var msg interface{}
if count > 0 {
msg, _ = replayedBatch.Entry(0).Content.Fields.GetValue("message")
}
replayedBatch.Done()
t.Fatalf("unexpected replayed event after restart"+
"found replayed batch with count=%d and first message=%v",
count,
msg,
)
}
closeQueueAndWait(t, run3Queue)
}

func publishAndACKSingleEvent(
t *testing.T,
queueInstance *diskQueue,
producer queue.Producer[publisher.Event],
msg string,
) {
_, ok := producer.Publish(makeDiskQueueTestEvent(msg))
require.True(t, ok, "publishing test event %q should succeed", msg)

batch := readBatch(t, queueInstance, 3*time.Second)
require.NotNil(t, batch, "queue should return a batch for message %q", msg)
require.Equal(t, 1, batch.Count(), "queue should return a single event batch for message %q", msg)
assertEventMessage(t, batch.Entry(0), msg)
batch.Done()
}

func readBatch(t *testing.T, queueInstance *diskQueue, timeout time.Duration) queue.Batch[publisher.Event] {
type getResult struct {
batch queue.Batch[publisher.Event]
err error
}

results := make(chan getResult, 1)
go func() {
batch, err := queueInstance.Get(1)
results <- getResult{batch: batch, err: err}
}()

select {
case result := <-results:
require.NoError(t, result.err, "reading from queue should not return an error")
return result.batch
case <-time.After(timeout):
return nil
}
}

func closeQueueAndWait(t *testing.T, queueInstance *diskQueue) {
err := queueInstance.Close(false)
require.NoError(t, err, "closing queue should not return an error")

select {
case <-queueInstance.Done():
case <-time.After(5 * time.Second):
require.Fail(t, "queue did not close in time", "queue.Done() should close within timeout")
}
}

func assertEventMessage(t *testing.T, event publisher.Event, expectedMsg string) {
msg, _ := event.Content.Fields.GetValue("message")
assert.Equal(t, expectedMsg, msg, "unexpected message in consumed event")
}

func makeDiskQueueTestEvent(msg string) publisher.Event {
return queuetest.MakeEvent(mapstr.M{
"message": msg,
"payload": strings.Repeat("x", 2048),
})
}

func requireSegmentFiles(t *testing.T, dir string, expected int) {
segFiles, err := filepath.Glob(filepath.Join(dir, "*.seg"))
if err != nil {
t.Fatalf("cannot resolve segment files glob: %s", err)
}

gotSegments := len(segFiles)
if expected != gotSegments {
t.Fatalf("expecting %d segment files, got %d. Segment files:\n%s", expected, gotSegments, strings.Join(segFiles, "\n"))
}
}
Loading