Skip to content
Open
45 changes: 45 additions & 0 deletions changelog/fragments/1778052475-fix-k8s-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: Introduce `wait_for_processor_ready` and `wait_for_processor_ready_timeout` settings to `add_kubernetes_metadata` processor to allow waiting for Kubernetes API availability before processing events.

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: all

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
6 changes: 6 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
DefaultIndexers Enabled `config:"default_indexers"`

AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
WaitReady bool `config:"wait_for_processor_ready"`
WaitReadyTimeout time.Duration `config:"wait_for_processor_ready_timeout"`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WaitReadyTimeout time.Duration `config:"wait_for_processor_ready_timeout"`
WaitMetadata bool `config:"wait_for_metadata"`
WaitMetadata Timeout time.Duration `config:"wait_for_metadata_timeout"`

I like the k8sattributes names, they are specific about what we are waiting for.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly I don't like including the word processor again in something that is already obviously a processor configuration :)

}

type Enabled struct {
Expand All @@ -58,6 +60,7 @@
k.DefaultIndexers = Enabled{true}
k.Scope = "node"
k.AddResourceMetadata = metadata.GetDefaultResourceMetadataConfig()
k.WaitReadyTimeout = 30 * time.Second
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think WaitReady/WaitMetadata should be true by default, that is the least surprising way for this to work. People who have problems with it can manually switch back. I don't view this as breaking because if everything is working properly it isn't.

}

func (k *kubeAnnotatorConfig) Validate() error {
Expand All @@ -69,6 +72,9 @@
k.Node = ""
}

if k.WaitReady && k.WaitReadyTimeout <= 0 {
return fmt.Errorf("wait_for_processor_ready_timeout must be a positive duration")
}
// Checks below were added to warn the users early on and avoid initialising the processor in case the `logs_path`
// matcher config is not valid: supported paths defined as a `logs_path` configuration setting are strictly defined
// if `resource_type` is set
Expand All @@ -90,7 +96,7 @@
if logsPathMatcher.ResourceType != "pod" && logsPathMatcher.ResourceType != "container" {
return fmt.Errorf("invalid resource_type %s, valid values include `pod`, `container`", logsPathMatcher.ResourceType)
}
if logsPathMatcher.ResourceType == "pod" && !(logsPathMatcher.LogsPath == "/var/lib/kubelet/pods/" || logsPathMatcher.LogsPath == "/var/log/pods/") {

Check failure on line 99 in libbeat/processors/add_kubernetes_metadata/config.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

QF1001: could apply De Morgan's law (staticcheck)

Check failure on line 99 in libbeat/processors/add_kubernetes_metadata/config.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

QF1001: could apply De Morgan's law (staticcheck)

Check failure on line 99 in libbeat/processors/add_kubernetes_metadata/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

QF1001: could apply De Morgan's law (staticcheck)
return fmt.Errorf("invalid logs_path defined for resource_type: %s, valid values include `/var/lib/kubelet/pods/`, `/var/log/pods/`", logsPathMatcher.ResourceType)
}
if logsPathMatcher.ResourceType == "container" && logsPathMatcher.LogsPath != "/var/log/containers/" {
Expand Down
21 changes: 21 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ func TestConfigValidate(t *testing.T) {
cfg: map[string]interface{}{},
error: false,
},
{
cfg: map[string]interface{}{
"wait_for_processor_ready_timeout": "invalid_duration",
"wait_for_processor_ready": true,
},
error: true,
},
{
cfg: map[string]interface{}{
"wait_for_processor_ready_timeout": "20s",
"wait_for_processor_ready": true,
},
error: false,
},
{
cfg: map[string]interface{}{
"wait_for_processor_ready_timeout": "0s",
"wait_for_processor_ready": true,
},
error: true,
},
}

for _, test := range tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ It is unset by default.
The enrichment of `node` or `namespace` metadata can be individually disabled by setting `enabled: false`.
- `deployment`: If resource is `pod` and it is created from a `deployment`, by default the deployment name is added, this can be disabled by setting `deployment: false`.
- `cronjob`: If resource is `pod` and it is created from a `cronjob`, by default the cronjob name is added, this can be disabled by setting `cronjob: false`.
`wait_for_processor_ready` :: (false) This specifies if startup should wait for processor initialization. If Kubernetes cannot be reached within `wait_for_processor_ready_timeout`, the processor is marked unavailable.
If `false`, initialization runs asynchronously and some early events could be processed without Kubernetes metadata enrichment.
`wait_for_processor_ready_timeout` :: (10s) Maximum time to wait for Kubernetes connectivity when `wait_for_processor_ready` is enabled.
Comment thread
khushijain21 marked this conversation as resolved.

+
Example:
["source","yaml",subs="attributes"]
Expand Down
69 changes: 48 additions & 21 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type kubernetesAnnotator struct {
cache *cache
kubernetesAvailable bool
initOnce sync.Once
wg sync.WaitGroup
}

func init() {
Expand All @@ -71,29 +72,43 @@ func init() {
Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher)
}

func isKubernetesAvailable(client k8sclient.Interface) (bool, error) {
func isKubernetesAvailable(client k8sclient.Interface, logger *logp.Logger) (bool, error) {
server, err := client.Discovery().ServerVersion()
if err != nil {
return false, err
}
logp.Info("%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server)
logger.Infof("add_kubernetes_metadata: kubernetes env detected, with version: %v", server)
return true, nil
}

func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool {
connectionAttempts := 1
func isKubernetesAvailableWithTimeout(client k8sclient.Interface,
waitReady bool,
waitReadyTimeout time.Duration,
logger *logp.Logger,
) bool {
var timer *time.Timer
var err error
var kubernetesAvailable bool
if waitReady {
timer = time.NewTimer(waitReadyTimeout)
} else {
// hard coding a 5 minutes timeout in case the function is called without waiting for metadata, to avoid infinite loops
timer = time.NewTimer(5 * time.Minute)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO 5 minutes is too long when someone has explicitly opted out of waiting for metadata.

}

for {
kubernetesAvailable, err := isKubernetesAvailable(client)
if kubernetesAvailable {
return true
}
if connectionAttempts > checkNodeReadyAttempts {
logp.Info("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err)
select {
case <-timer.C:
logger.Infof("add_kubernetes_metadata: could not detect kubernetes env: %v", err)
return false
default:
kubernetesAvailable, err = isKubernetesAvailable(client, logger)
if kubernetesAvailable {
return true
}
}
time.Sleep(3 * time.Second)
connectionAttempts += 1
}

}

// kubernetesMetadataExist checks whether an event is already enriched with kubernetes metadata
Expand All @@ -112,15 +127,26 @@ func New(cfg *config.C, log *logp.Logger) (beat.Processor, error) {
}

log = log.Named(selector).With("libbeat.processor", "add_kubernetes_metadata")

processor := &kubernetesAnnotator{
log: log,
cache: newCache(config.CleanupTimeout),
kubernetesAvailable: false,
log: log,
cache: newCache(config.CleanupTimeout),
wg: sync.WaitGroup{},
}

// complete processor's initialisation asynchronously to re-try on failing k8s client initialisations in case
// the k8s node is not yet ready.
go processor.init(config, cfg)
if config.WaitReady {
processor.init(config, cfg)
if !processor.kubernetesAvailable {
log.Info("Kubernetes environment is not available after waiting for metadata")
return processor, nil
}
} else {
// complete processor's initialisation asynchronously to re-try on failing k8s client initialisations in case
// the k8s node is not yet ready.
go func() {
processor.init(config, cfg)
}()
}

return processor, nil
}
Expand Down Expand Up @@ -170,7 +196,7 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
return
}

if !isKubernetesAvailableWithRetry(client) {
if !isKubernetesAvailableWithTimeout(client, config.WaitReady, config.WaitReadyTimeout, k.log) {
return
}

Expand Down Expand Up @@ -333,10 +359,11 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
// contains a map with various Kubernetes metadata.
// This processor does not access or modify the `Meta` of the event.
func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
if !k.kubernetesAvailable {
if kubernetesMetadataExist(event) {
return event, nil
}
if kubernetesMetadataExist(event) {

if !k.kubernetesAvailable {
return event, nil
}

Expand Down
Loading