Skip to content
Open
45 changes: 45 additions & 0 deletions changelog/fragments/1778052475-fix-k8s-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: This fixes a bug where events were not enriched by add_kubernetes_metadata processor in case of delay in processor initalization.

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: all

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
39 changes: 30 additions & 9 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type kubernetesAnnotator struct {
cache *cache
kubernetesAvailable bool
initOnce sync.Once
wg sync.WaitGroup
}

func init() {
Expand Down Expand Up @@ -98,7 +99,8 @@ func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool {

// kubernetesMetadataExist checks whether an event is already enriched with kubernetes metadata
func kubernetesMetadataExist(event *beat.Event) bool {
if _, err := event.GetValue("kubernetes"); err != nil {
v, err := event.GetValue("kubernetes")
if err != nil || v == nil {
return false
}
return true
Expand All @@ -112,15 +114,21 @@ func New(cfg *config.C, log *logp.Logger) (beat.Processor, error) {
}

log = log.Named(selector).With("libbeat.processor", "add_kubernetes_metadata")

processor := &kubernetesAnnotator{
log: log,
cache: newCache(config.CleanupTimeout),
kubernetesAvailable: false,
log: log,
cache: newCache(config.CleanupTimeout),
wg: sync.WaitGroup{},
}

// complete processor's initialisation asynchronously to re-try on failing k8s client initialisations in case
// the k8s node is not yet ready.
go processor.init(config, cfg)
processor.wg.Add(1)
go func() {
defer processor.wg.Done()
log.Debug("Initializing kubernetes metadata processor")
processor.init(config, cfg)
}()

return processor, nil
}
Expand Down Expand Up @@ -333,10 +341,16 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
// contains a map with various Kubernetes metadata.
// This processor does not access or modify the `Meta` of the event.
func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
if !k.kubernetesAvailable {
if kubernetesMetadataExist(event) {
return event, nil
}
if kubernetesMetadataExist(event) {

// wait for kubernetes metadata processor to be initialized before processing any events
k.wg.Wait()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worst case scenario: We have to retry multiple times to connect to kubernetes. Until we do that, the event will not be processed and we're essentially blocked. Subsequently, filebeat cannot process more messages because the add_kubernetes_metadata.Run has not returned yet.

Is this acceptable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would lead to a temporary reduction in throughput.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But again, k8sattributes processor blocks until it either establishes the connection, or it fails.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With k8sattributes, if it blocks forever and you want data anyway you can remove it from the configuration. For agent, you can only remove add_kubernetes_metadata once everything runs as a beats receiver. So I don't think it's completely safe to backport this, as there's no way to work around it if doesn't work as expected.

We could similarly add configuration controlling whether to block and how to handle failure but in agent you can't configure it yet.


// if initialization fails then K8's will not be available
// in that case return event as is
if !k.kubernetesAvailable {
return event, nil
}

Expand All @@ -359,8 +373,15 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
// much cheaper than cloning the full metadata. Transform it in place:
// drop container.name and rewrite container.image -> container.image.name.
if containerVal, err := kubeMeta.GetValue("kubernetes.container"); err == nil {
if cm, ok := containerVal.(mapstr.M); ok {
ociContainer := cm.Clone()
var containerMap mapstr.M
switch cm := containerVal.(type) {
case mapstr.M:
containerMap = cm
case map[string]interface{}:
containerMap = mapstr.M(cm)
}
if containerMap != nil {
ociContainer := containerMap.Clone()
_ = ociContainer.Delete("name")
if img, imgErr := ociContainer.GetValue("image"); imgErr == nil {
_ = ociContainer.Delete("image")
Expand Down
6 changes: 6 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package add_kubernetes_metadata

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -49,6 +50,7 @@ func TestAnnotatorSkipped(t *testing.T) {
matchers: &Matchers{
matchers: []Matcher{matcher},
},
wg: sync.WaitGroup{},
kubernetesAvailable: true,
}

Expand Down Expand Up @@ -108,6 +110,7 @@ func TestAnnotatorWithNoKubernetesAvailable(t *testing.T) {
matchers: &Matchers{
matchers: []Matcher{matcher},
},
wg: sync.WaitGroup{},
kubernetesAvailable: false,
}

Expand Down Expand Up @@ -249,6 +252,7 @@ func newAnnotatorForTest(t *testing.T, cacheKey string, meta mapstr.M) *kubernet
matchers: &Matchers{
matchers: []Matcher{matcher},
},
wg: sync.WaitGroup{},
kubernetesAvailable: true,
}
processor.cache.set(cacheKey, meta)
Expand Down Expand Up @@ -406,6 +410,7 @@ func TestAnnotatorRunNoContainerSubMap(t *testing.T) {
matchers: &Matchers{
matchers: []Matcher{matcher},
},
wg: sync.WaitGroup{},
kubernetesAvailable: true,
}
processor.cache.set("mypod", meta)
Expand Down Expand Up @@ -557,6 +562,7 @@ func BenchmarkKubernetesAnnotatorRun(b *testing.B) {
matchers: &Matchers{
matchers: []Matcher{matcher},
},
wg: sync.WaitGroup{},
kubernetesAvailable: true,
}

Expand Down
Loading