diff --git a/changelog/fragments/1778052475-fix-k8s-processor.yaml b/changelog/fragments/1778052475-fix-k8s-processor.yaml new file mode 100644 index 000000000000..a72290272c15 --- /dev/null +++ b/changelog/fragments/1778052475-fix-k8s-processor.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: Introduce `wait_for_processor_ready` and `wait_for_processor_ready_timeout` settings to `add_kubernetes_metadata` processor to allow waiting for Kubernetes API availability before processing events. + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: all + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index aaf219f8642b..d6ac35396b19 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -43,6 +43,8 @@ type kubeAnnotatorConfig struct { DefaultIndexers Enabled `config:"default_indexers"` AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` + WaitReady bool `config:"wait_for_processor_ready"` + WaitReadyTimeout time.Duration `config:"wait_for_processor_ready_timeout"` } type Enabled struct { @@ -58,6 +60,7 @@ func (k *kubeAnnotatorConfig) InitDefaults() { k.DefaultIndexers = Enabled{true} k.Scope = "node" k.AddResourceMetadata = metadata.GetDefaultResourceMetadataConfig() + k.WaitReadyTimeout = 30 * time.Second } func (k *kubeAnnotatorConfig) Validate() error { @@ -69,6 +72,9 @@ func (k *kubeAnnotatorConfig) Validate() error { k.Node = "" } + if k.WaitReady && k.WaitReadyTimeout <= 0 { + return fmt.Errorf("wait_for_processor_ready_timeout must be a positive duration") + } // Checks below were added to warn the users early on and avoid initialising the processor in case the `logs_path` // matcher config is not valid: supported paths defined as a `logs_path` configuration setting are strictly defined // if `resource_type` is set diff --git a/libbeat/processors/add_kubernetes_metadata/config_test.go b/libbeat/processors/add_kubernetes_metadata/config_test.go index 68054979d093..9a733e9756ec 100644 --- a/libbeat/processors/add_kubernetes_metadata/config_test.go +++ b/libbeat/processors/add_kubernetes_metadata/config_test.go @@ -46,6 +46,27 @@ func TestConfigValidate(t *testing.T) { cfg: map[string]interface{}{}, error: false, }, + { + cfg: map[string]interface{}{ + "wait_for_processor_ready_timeout": "invalid_duration", + "wait_for_processor_ready": true, + }, + error: true, + }, + { + cfg: map[string]interface{}{ + "wait_for_processor_ready_timeout": "20s", + "wait_for_processor_ready": true, + }, + error: false, + }, + { + cfg: map[string]interface{}{ + "wait_for_processor_ready_timeout": "0s", + "wait_for_processor_ready": true, + }, + error: true, + }, } for _, test := range tests { diff --git a/libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc b/libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc index df795135803a..4be8d1ca5107 100644 --- a/libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc +++ b/libbeat/processors/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc @@ -133,6 +133,10 @@ It is unset by default. The enrichment of `node` or `namespace` metadata can be individually disabled by setting `enabled: false`. - `deployment`: If resource is `pod` and it is created from a `deployment`, by default the deployment name is added, this can be disabled by setting `deployment: false`. - `cronjob`: If resource is `pod` and it is created from a `cronjob`, by default the cronjob name is added, this can be disabled by setting `cronjob: false`. +`wait_for_processor_ready` :: (false) This specifies if startup should wait for processor initialization. If Kubernetes cannot be reached within `wait_for_processor_ready_timeout`, the processor is marked unavailable. +If `false`, initialization runs asynchronously and some early events could be processed without Kubernetes metadata enrichment. +`wait_for_processor_ready_timeout` :: (10s) Maximum time to wait for Kubernetes connectivity when `wait_for_processor_ready` is enabled. + + Example: ["source","yaml",subs="attributes"] diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 5fef49f2d184..d8d43db271e9 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -57,6 +57,7 @@ type kubernetesAnnotator struct { cache *cache kubernetesAvailable bool initOnce sync.Once + wg sync.WaitGroup } func init() { @@ -71,29 +72,43 @@ func init() { Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher) } -func isKubernetesAvailable(client k8sclient.Interface) (bool, error) { +func isKubernetesAvailable(client k8sclient.Interface, logger *logp.Logger) (bool, error) { server, err := client.Discovery().ServerVersion() if err != nil { return false, err } - logp.Info("%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server) + logger.Infof("add_kubernetes_metadata: kubernetes env detected, with version: %v", server) return true, nil } -func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool { - connectionAttempts := 1 +func isKubernetesAvailableWithTimeout(client k8sclient.Interface, + waitReady bool, + waitReadyTimeout time.Duration, + logger *logp.Logger, +) bool { + var timer *time.Timer + var err error + var kubernetesAvailable bool + if waitReady { + timer = time.NewTimer(waitReadyTimeout) + } else { + // hard coding a 5 minutes timeout in case the function is called without waiting for metadata, to avoid infinite loops + timer = time.NewTimer(5 * time.Minute) + } + for { - kubernetesAvailable, err := isKubernetesAvailable(client) - if kubernetesAvailable { - return true - } - if connectionAttempts > checkNodeReadyAttempts { - logp.Info("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err) + select { + case <-timer.C: + logger.Infof("add_kubernetes_metadata: could not detect kubernetes env: %v", err) return false + default: + kubernetesAvailable, err = isKubernetesAvailable(client, logger) + if kubernetesAvailable { + return true + } } - time.Sleep(3 * time.Second) - connectionAttempts += 1 } + } // kubernetesMetadataExist checks whether an event is already enriched with kubernetes metadata @@ -112,15 +127,26 @@ func New(cfg *config.C, log *logp.Logger) (beat.Processor, error) { } log = log.Named(selector).With("libbeat.processor", "add_kubernetes_metadata") + processor := &kubernetesAnnotator{ - log: log, - cache: newCache(config.CleanupTimeout), - kubernetesAvailable: false, + log: log, + cache: newCache(config.CleanupTimeout), + wg: sync.WaitGroup{}, } - // complete processor's initialisation asynchronously to re-try on failing k8s client initialisations in case - // the k8s node is not yet ready. - go processor.init(config, cfg) + if config.WaitReady { + processor.init(config, cfg) + if !processor.kubernetesAvailable { + log.Info("Kubernetes environment is not available after waiting for metadata") + return processor, nil + } + } else { + // complete processor's initialisation asynchronously to re-try on failing k8s client initialisations in case + // the k8s node is not yet ready. + go func() { + processor.init(config, cfg) + }() + } return processor, nil } @@ -170,7 +196,7 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { return } - if !isKubernetesAvailableWithRetry(client) { + if !isKubernetesAvailableWithTimeout(client, config.WaitReady, config.WaitReadyTimeout, k.log) { return } @@ -333,10 +359,11 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { // contains a map with various Kubernetes metadata. // This processor does not access or modify the `Meta` of the event. func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { - if !k.kubernetesAvailable { + if kubernetesMetadataExist(event) { return event, nil } - if kubernetesMetadataExist(event) { + + if !k.kubernetesAvailable { return event, nil }