Skip to content

Initialise disk queue frame IDs from persisted state#50534

Open
belimawr wants to merge 6 commits intoelastic:mainfrom
belimawr:fix-disk-queue-initialisation
Open

Initialise disk queue frame IDs from persisted state#50534
belimawr wants to merge 6 commits intoelastic:mainfrom
belimawr:fix-disk-queue-initialisation

Conversation

@belimawr
Copy link
Copy Markdown
Member

@belimawr belimawr commented May 6, 2026

Proposed commit message

Fix a restart regression in disk queue where already-ACKed tail events could be replayed when multiple segment files existed.

On startup, `state.dat` restores `queuePosition.frameIndex`, but in-memory frame counters were reinitialized to zero. This desynchronized persisted progress from runtime frame ID tracking:

- read path used `segments.nextReadFrameID`
- ACK path used `acks.nextFrameID`
- persisted state tracked `queuePosition.frameIndex`

After restart, segment-boundary ACK bookkeeping could run with incorrect frame IDs, producing inconsistent persisted position and causing the last event from the newest segment to be replayed on a subsequent restart.

Initialize both runtime counters from persisted `frameIndex` during queue startup:

- set `segments.nextReadFrameID = frameID(nextReadPosition.frameIndex)`
- set `acks.nextFrameID = frameID(nextReadPosition.frameIndex)`

This keeps read/ACK frame ID progression aligned with persisted state across restarts and prevents duplicate replay of already-ACKed events.

Assisted-By: Codex 5.3

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

## Disruptive User Impact

How to test this PR locally

Run the tests:

cd libbeat
./publisher/queue/diskqueue -count=1 -v -run=TestQueueDoesNotReplyLastEventAfterRestart

Or follow the instructions from the bug report below

Related issues

## Use cases
## Screenshots
## Logs

@belimawr belimawr self-assigned this May 6, 2026
@belimawr belimawr added Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team backport-active-all Automated backport with mergify to all the active branches labels May 6, 2026
@botelastic botelastic Bot added the needs_team Indicates that the issue/PR needs a Team:* label label May 6, 2026
@cla-checker-service
Copy link
Copy Markdown

cla-checker-service Bot commented May 6, 2026

💚 CLA has been signed

@botelastic botelastic Bot removed the needs_team Indicates that the issue/PR needs a Team:* label label May 6, 2026
@belimawr belimawr requested a review from Copilot May 6, 2026 22:57
Fix a restart regression in disk queue where already-ACKed tail events could be replayed when multiple segment files existed.

On startup, `state.dat` restores `queuePosition.frameIndex`, but in-memory frame counters were reinitialized to zero. This desynchronized persisted progress from runtime frame ID tracking:

- read path used `segments.nextReadFrameID`
- ACK path used `acks.nextFrameID`
- persisted state tracked `queuePosition.frameIndex`

After restart, segment-boundary ACK bookkeeping could run with incorrect frame IDs, producing inconsistent persisted position and causing the last event from the newest segment to be replayed on a subsequent restart.

Initialize both runtime counters from persisted `frameIndex` during queue startup:

- set `segments.nextReadFrameID = frameID(nextReadPosition.frameIndex)`
- set `acks.nextFrameID = frameID(nextReadPosition.frameIndex)`

This keeps read/ACK frame ID progression aligned with persisted state across restarts and prevents duplicate replay of already-ACKed events.

Assisted-By: Codex 5.3
@belimawr belimawr force-pushed the fix-disk-queue-initialisation branch from e8e12ee to cfd18ab Compare May 6, 2026 23:01
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Fixes a diskqueue restart regression where already-ACKed tail events could be replayed when multiple segment files exist by aligning in-memory frame ID counters with persisted state.dat on startup.

Changes:

  • Initialize segments.nextReadFrameID and acks.nextFrameID from persisted queuePosition.frameIndex during NewQueue.
  • Add a regression test covering multi-run restart behavior and segment rollover conditions.
  • Add a changelog fragment documenting the bug fix.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
libbeat/publisher/queue/diskqueue/queue.go Initializes runtime frame ID counters from persisted state during startup.
libbeat/publisher/queue/diskqueue/core_loop.go Removes now-incorrect comment about nextReadFrameID initialization behavior.
libbeat/publisher/queue/diskqueue/queue_test.go Adds restart regression test and helpers creating multiple segments and asserting no replay.
changelog/fragments/1778107949-fix-disk-queue-initialisation.yaml Adds changelog entry for the diskqueue initialization fix.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread libbeat/publisher/queue/diskqueue/queue_test.go Outdated
Comment thread libbeat/publisher/queue/diskqueue/queue_test.go Outdated
Comment thread libbeat/publisher/queue/diskqueue/queue_test.go
Comment thread libbeat/publisher/queue/diskqueue/queue.go Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /test : Run the Buildkite pipeline.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

TL;DR

Buildkite failed in Libbeat: Run check/update because make -C libbeat check update rewrites libbeat/publisher/queue/diskqueue/queue_test.go (import grouping), so make check-no-changes exits with status 2. Commit the formatter output from make -C libbeat update.

Remediation

  • Run make -C libbeat update and commit the resulting import reorder in libbeat/publisher/queue/diskqueue/queue_test.go.
  • Re-run the same CI step (make -C libbeat check update && make check-no-changes) to confirm the tree stays clean.
Investigation details

Root Cause

queue_test.go is not in the formatter-updated state expected by libbeat checks.

In PR HEAD, the imports are currently ordered with local-module imports before testify:

  • libbeat/publisher/queue/diskqueue/queue_test.go:28-36

make -C libbeat update rewrites that block to place third-party testify imports in their own group before local imports:

  • libbeat/publisher/queue/diskqueue/queue_test.go:28-37 (after formatter output)

Evidence

Error: some files are not up-to-date. Run 'make update' then review and commit the changes. Modified: [libbeat/publisher/queue/diskqueue/queue_test.go]
make: *** [scripts/Makefile:141: check] Error 1

Local reproduction of formatter drift in this PR checkout produced:

@@ -25,6 +25,9 @@ import (
+    "github.com/stretchr/testify/assert"
+    "github.com/stretchr/testify/require"
@@ -32,8 +35,6 @@ import (
-    "github.com/stretchr/testify/assert"
-    "github.com/stretchr/testify/require"

Verification

  • Reproduced the file drift from make -C libbeat check update.
  • Full local check completion was blocked by environment Python tooling (autopep8 requiring lib2to3), but the Go import rewrite happened before that failure and matches the CI-reported modified file.

Follow-up

If CI still fails after committing the updated queue_test.go, share the new failed-step log and I can re-trace from that exact step.

Note

🔒 Integrity filter blocked 2 items

The following items were blocked because they don't meet the GitHub integrity level.

To allow these resources, lower min-integrity in your GitHub frontmatter:

tools:
  github:
    min-integrity: approved  # merged | approved | unapproved | none

What is this? | From workflow: PR Buildkite Detective

Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.

@belimawr belimawr marked this pull request as ready for review May 7, 2026 19:12
@belimawr belimawr requested a review from a team as a code owner May 7, 2026 19:12
@belimawr belimawr requested review from leehinman and mauri870 May 7, 2026 19:12
@infra-vault-gh-plugin-prod
Copy link
Copy Markdown

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 7, 2026

Review Change Stack
No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: c70d1696-2c39-4be4-a6d5-a3ca188b62ce

📥 Commits

Reviewing files that changed from the base of the PR and between 80fb8bc and 32a4d3a.

📒 Files selected for processing (5)
  • changelog/fragments/1778107949-fix-disk-queue-initialisation.yaml
  • libbeat/publisher/queue/diskqueue/acks.go
  • libbeat/publisher/queue/diskqueue/core_loop.go
  • libbeat/publisher/queue/diskqueue/queue.go
  • libbeat/publisher/queue/diskqueue/queue_test.go
💤 Files with no reviewable changes (1)
  • libbeat/publisher/queue/diskqueue/core_loop.go

📝 Walkthrough

Walkthrough

This PR fixes a bug where the disk queue replays the last ACKed event on startup when multiple segment files exist. The fix initializes frame IDs (nextFrameID in ACK tracking and nextReadFrameID in segment reading) from the persisted queue position index instead of always starting from zero. Changes are made in acks.go, queue.go, and supported by a comprehensive integration test that verifies no event replay occurs across three sequential queue restart cycles.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Pull request implements the required fix to initialize disk queue frame IDs from persisted state, addressing all objectives from issue #32560.
Out of Scope Changes check ✅ Passed All changes are directly scoped to fixing disk queue frame ID initialization and include appropriate test coverage.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation: Commit on current branch
  • 🛠️ Update Documentation: Create PR

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-active-all Automated backport with mergify to all the active branches Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Disk queue re-sends last segment file when there is more than on segment on disk

2 participants