Configure options on add_kubernetes_metadata to wait for processor intialization#50509
Configure options on add_kubernetes_metadata to wait for processor intialization#50509khushijain21 wants to merge 15 commits intoelastic:mainfrom
Conversation
🤖 GitHub commentsJust comment with:
|
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
📝 WalkthroughWalkthroughThis PR adds readiness coordination to the Kubernetes metadata processor to address the race condition where events are processed before Kubernetes discovery initialization completes. The changes introduce 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@libbeat/processors/add_kubernetes_metadata/kubernetes_test.go`:
- Around line 98-116: The test TestAnnotatorRunWhenMatchersNil intends to
validate behavior when k.matchers is nil, but kubernetesAvailable defaults false
so Run exits early; fix by adding a nil-guard in kubernetesAnnotator.Run that
checks if k.matchers == nil (or k.matchers.MetadataIndex is nil) and returns the
event unchanged, and update the test to set kubernetesAvailable: true on the
kubernetesAnnotator so the Run path actually reaches the matchers check;
reference kubernetesAnnotator.Run, the kubernetesAvailable field, k.matchers and
MetadataIndex, and the TestAnnotatorRunWhenMatchersNil test when making these
changes.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Enterprise
Run ID: 95c65146-c8c4-48b2-afcc-9afe743550ef
📒 Files selected for processing (3)
changelog/fragments/1778052475-fix-k8s-processor.yamllibbeat/processors/add_kubernetes_metadata/kubernetes.golibbeat/processors/add_kubernetes_metadata/kubernetes_test.go
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
libbeat/processors/add_kubernetes_metadata/kubernetes_test.go (1)
1-619:⚠️ Potential issue | 🔴 CriticalAdd nil-matchers guard to
kubernetes.goand test coverage for that case.
kubernetes.goRun() callsk.matchers.MetadataIndex()at line 356 without checking ifk.matchersis nil. IfkubernetesAvailableis true andk.matchersis nil, this causes a runtime panic. The test file should includeTestAnnotatorRunWhenMatchersNil(referenced in the PR summary but absent) to cover this scenario and prevent regression.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@libbeat/processors/add_kubernetes_metadata/kubernetes_test.go` around lines 1 - 619, The Run path in kubernetesAnnotator can panic because kubernetesAnnotator.Run calls k.matchers.MetadataIndex() without checking k.matchers for nil; update kubernetesAnnotator.Run in kubernetes.go to guard against a nil k.matchers (e.g., if k.matchers == nil or k.matchers.MetadataIndex() cannot be called, behave as when no matcher matches: skip lookup and return the original event/error nil), and add a unit test named TestAnnotatorRunWhenMatchersNil in kubernetes_test.go that constructs a kubernetesAnnotator with kubernetesAvailable=true but matchers=nil and asserts Run does not panic and returns the unmodified event.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@libbeat/processors/add_kubernetes_metadata/kubernetes_test.go`:
- Around line 1-619: The Run path in kubernetesAnnotator can panic because
kubernetesAnnotator.Run calls k.matchers.MetadataIndex() without checking
k.matchers for nil; update kubernetesAnnotator.Run in kubernetes.go to guard
against a nil k.matchers (e.g., if k.matchers == nil or
k.matchers.MetadataIndex() cannot be called, behave as when no matcher matches:
skip lookup and return the original event/error nil), and add a unit test named
TestAnnotatorRunWhenMatchersNil in kubernetes_test.go that constructs a
kubernetesAnnotator with kubernetesAvailable=true but matchers=nil and asserts
Run does not panic and returns the unmodified event.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Enterprise
Run ID: aeff278d-15f0-4d7d-95ed-b73316858703
📒 Files selected for processing (1)
libbeat/processors/add_kubernetes_metadata/kubernetes_test.go
|
I see that we retry 10 times before we error out. Would this be an acceptable tradeoff to block the pipeline for a while until we fail/succeed the initialization so that no events pass through the pipeline un-enriched. Or we could reduce the number of attempts as well to not block the pipeline for long cc: @rdner beats/libbeat/processors/add_kubernetes_metadata/kubernetes.go Lines 83 to 97 in caea6ba |
|
@khushijain21 looks like we have 3 seconds in between and it's going to be at least 30 seconds in total. We need to check what kind of timeouts the k8sclient itself has. We don't want to end up in a situation when a wrong k8s API address or connectivity issues result in minutes of a blocked event processing pipeline. Waiting for 30 seconds and printing a warning sounds acceptable, but I suspect we might have hidden timeouts in this code that need to be checked. |
|
@khushijain21 another concern: once we failed to initialize the processor on startup, what do we do? Do we crash the process? Do we ignore and never retry again? I can see both of these options can be an issue. I think it would be better to issue a warning, start ingesting and retry again later. @cmacknz what do you think? |
| // wait for kubernetes metadata processor to be initialized before processing any events | ||
| k.wg.Wait() |
There was a problem hiding this comment.
Worst case scenario: We have to retry multiple times to connect to kubernetes. Until we do that, the event will not be processed and we're essentially blocked. Subsequently, filebeat cannot process more messages because the add_kubernetes_metadata.Run has not returned yet.
Is this acceptable?
There was a problem hiding this comment.
This would lead to a temporary reduction in throughput.
There was a problem hiding this comment.
But again, k8sattributes processor blocks until it either establishes the connection, or it fails.
There was a problem hiding this comment.
With k8sattributes, if it blocks forever and you want data anyway you can remove it from the configuration. For agent, you can only remove add_kubernetes_metadata once everything runs as a beats receiver. So I don't think it's completely safe to backport this, as there's no way to work around it if doesn't work as expected.
We could similarly add configuration controlling whether to block and how to handle failure but in agent you can't configure it yet.
Crashing the process would be a breaking change. We don't do that right now. |
|
I think the safest thing to do is to make this behaviour configurable, the trouble is we don't have a way to configure this processor in agent yet (hopefully in 9.5.0 we will). |
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
|
TL;DRThe failing Buildkite job is the pipeline upload step itself, and it exits immediately with Remediation
Investigation detailsRoot CauseThe only failed step is the Buildkite pipeline upload step, and its log contains a single error:
This indicates the upload command is being executed without a valid agent bootstrap context. I also checked the PR commit referenced by the build (
No Evidence
Verification
Follow-up
Note 🔒 Integrity filter blocked 2 itemsThe following items were blocked because they don't meet the GitHub integrity level.
To allow these resources, lower tools:
github:
min-integrity: approved # merged | approved | unapproved | noneWhat is this? | From workflow: PR Buildkite Detective Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
| if waitReady && waitReadyTimeout > 0 { | ||
| timer = time.NewTimer(waitReadyTimeout) | ||
| } else { |
There was a problem hiding this comment.
Perhaps we can do some form of validation here? Something like "waitReadyTimeout should be a positive integer if waitReady is true"
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc`:
- Line 138: The docs state wait_for_processor_ready_timeout default is 10s but
the actual default is set to 30s in kubeAnnotatorConfig.InitDefaults(); update
the documentation to match the code (or change InitDefaults() if the intended
default is 10s) — locate the symbol wait_for_processor_ready_timeout in
add_kubernetes_metadata.asciidoc and sync its default to the value from
kubeAnnotatorConfig.InitDefaults() in config.go so doc and implementation agree.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Enterprise
Run ID: 3047d836-9f77-4ecd-b278-3815317a4a96
📒 Files selected for processing (5)
changelog/fragments/1778052475-fix-k8s-processor.yamllibbeat/processors/add_kubernetes_metadata/config.golibbeat/processors/add_kubernetes_metadata/config_test.golibbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoclibbeat/processors/add_kubernetes_metadata/kubernetes.go
🚧 Files skipped from review as they are similar to previous changes (2)
- changelog/fragments/1778052475-fix-k8s-processor.yaml
- libbeat/processors/add_kubernetes_metadata/kubernetes.go
|
|
||
| AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` | ||
| WaitReady bool `config:"wait_for_processor_ready"` | ||
| WaitReadyTimeout time.Duration `config:"wait_for_processor_ready_timeout"` |
There was a problem hiding this comment.
| WaitReadyTimeout time.Duration `config:"wait_for_processor_ready_timeout"` | |
| WaitMetadata bool `config:"wait_for_metadata"` | |
| WaitMetadata Timeout time.Duration `config:"wait_for_metadata_timeout"` |
I like the k8sattributes names, they are specific about what we are waiting for.
There was a problem hiding this comment.
Mainly I don't like including the word processor again in something that is already obviously a processor configuration :)
| k.DefaultIndexers = Enabled{true} | ||
| k.Scope = "node" | ||
| k.AddResourceMetadata = metadata.GetDefaultResourceMetadataConfig() | ||
| k.WaitReadyTimeout = 30 * time.Second |
There was a problem hiding this comment.
I think WaitReady/WaitMetadata should be true by default, that is the least surprising way for this to work. People who have problems with it can manually switch back. I don't view this as breaking because if everything is working properly it isn't.
| timer = time.NewTimer(waitReadyTimeout) | ||
| } else { | ||
| // hard coding a 5 minutes timeout in case the function is called without waiting for metadata, to avoid infinite loops | ||
| timer = time.NewTimer(5 * time.Minute) |
There was a problem hiding this comment.
IMO 5 minutes is too long when someone has explicitly opted out of waiting for metadata.
Proposed commit message
This PR adds two new configuration options
wait_for_processor_readyandwait_for_processor_ready_timeout. This allows user to configure if processor initialization should occur synchronously or async - both of which has its own benefits and downside as listed in the docs. We setwait_for_processor_readyto false for now until maybe we can configure it from elastic-agentChecklist
stresstest.shscript to run them under stress conditions and race detector to verify their stability../changelog/fragmentsusing the changelog tool.Disruptive User Impact
None since by default
wait_for_processor_readyis still set to false. We can choose to enable this when it is configurable from inside elastic-agent.How to test this PR locally
Related issues