Skip to content

Commit 1bce872

Browse files
hunter32292Copilot
andcommitted
Fix missing labels in watch events
Storage-generated watch events did not include pod/node labels because the storage layer has no access to the pod/node listers. This caused any watcher with a non-empty label selector to receive zero events, since matchesFilter() checked labels.Set(m.Labels) which was always nil. Fix: Add a LabelLookupFunc to metricsWatcher that enriches events with labels from the authoritative source (pod/node listers) before filtering and forwarding. The Watch helpers in the API layer create this function from the pod/node listers available at construction time. The enrichLabels() method is called in Send() before matchesFilter(), so both filtering and the forwarded event contain correct labels. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 05b7488 commit 1bce872

4 files changed

Lines changed: 134 additions & 26 deletions

File tree

pkg/api/node.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ func newNodeMetrics(groupResource schema.GroupResource, metrics NodeMetricsGette
6262

6363
// Set up watch helper if the metrics getter supports watching
6464
if watchable, ok := metrics.(WatchableNodeMetricsGetter); ok {
65-
nm.watchHelper = NewNodeMetricsWatchHelper(watchable)
65+
labelLookup := func(_, name string) map[string]string {
66+
node, err := nodeLister.Get(name)
67+
if err != nil {
68+
return nil
69+
}
70+
return node.Labels
71+
}
72+
nm.watchHelper = NewNodeMetricsWatchHelper(watchable, labelLookup)
6673
}
6774

6875
return nm

pkg/api/pod.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,17 @@ func newPodMetrics(groupResource schema.GroupResource, metrics PodMetricsGetter,
6161

6262
// Set up watch helper if the metrics getter supports watching
6363
if watchable, ok := metrics.(WatchablePodMetricsGetter); ok {
64-
pm.watchHelper = NewPodMetricsWatchHelper(watchable)
64+
labelLookup := func(namespace, name string) map[string]string {
65+
obj, err := podLister.ByNamespace(namespace).Get(name)
66+
if err != nil {
67+
return nil
68+
}
69+
if pom, ok := obj.(*metav1.PartialObjectMetadata); ok {
70+
return pom.Labels
71+
}
72+
return nil
73+
}
74+
pm.watchHelper = NewPodMetricsWatchHelper(watchable, labelLookup)
6575
}
6676

6777
return pm

pkg/api/watcher.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ const (
3434
resourceTypeNode = "node"
3535
)
3636

37+
// LabelLookupFunc returns labels for a given resource identified by namespace and name.
38+
// For cluster-scoped resources (nodes), namespace is empty.
39+
type LabelLookupFunc func(namespace, name string) map[string]string
40+
3741
// metricsWatcher implements watch.Interface for metrics resources.
3842
// It receives events from the storage layer and filters them based on
3943
// namespace and label selector before forwarding to clients.
@@ -48,13 +52,17 @@ type metricsWatcher struct {
4852

4953
// Resource type for correct bookmark generation
5054
resourceType string // "pod" or "node"
55+
56+
// labelLookup enriches events with labels from the authoritative source (pod/node lister).
57+
// Storage-generated events don't carry labels, so this is needed for label selector filtering.
58+
labelLookup LabelLookupFunc
5159
}
5260

5361
var _ watch.Interface = &metricsWatcher{}
5462
var _ MetricsWatcher = &metricsWatcher{}
5563

5664
// newMetricsWatcher creates a new watcher with the given filter criteria.
57-
func newMetricsWatcher(namespace string, labelSelector labels.Selector, resourceType string) *metricsWatcher {
65+
func newMetricsWatcher(namespace string, labelSelector labels.Selector, resourceType string, labelLookup LabelLookupFunc) *metricsWatcher {
5866
if labelSelector == nil {
5967
labelSelector = labels.Everything()
6068
}
@@ -64,6 +72,7 @@ func newMetricsWatcher(namespace string, labelSelector labels.Selector, resource
6472
namespace: namespace,
6573
labelSelector: labelSelector,
6674
resourceType: resourceType,
75+
labelLookup: labelLookup,
6776
}
6877
}
6978

@@ -103,15 +112,18 @@ func (w *metricsWatcher) Send(event WatchEvent) bool {
103112
default:
104113
}
105114

115+
// Enrich object with labels from the authoritative source if available
116+
obj := w.enrichLabels(event.Object)
117+
106118
// Filter the event
107-
if !w.matchesFilter(event.Object) {
119+
if !w.matchesFilter(obj) {
108120
return true // Event filtered out, but watcher is still alive
109121
}
110122

111123
// Convert to watch.Event
112124
watchEvent := watch.Event{
113125
Type: event.Type,
114-
Object: w.toRuntimeObject(event.Object),
126+
Object: w.toRuntimeObject(obj),
115127
}
116128

117129
// Try to send, close if buffer is full (slow consumer)
@@ -145,6 +157,29 @@ func (w *metricsWatcher) matchesFilter(obj interface{}) bool {
145157
}
146158
}
147159

160+
// enrichLabels looks up current labels for the given metrics object.
161+
// Storage-generated events don't carry labels, so this uses the label lookup
162+
// function (backed by pod/node listers) to add them.
163+
func (w *metricsWatcher) enrichLabels(obj interface{}) interface{} {
164+
if w.labelLookup == nil {
165+
return obj
166+
}
167+
switch m := obj.(type) {
168+
case metrics.PodMetrics:
169+
if m.Labels == nil {
170+
m.Labels = w.labelLookup(m.Namespace, m.Name)
171+
}
172+
return m
173+
case metrics.NodeMetrics:
174+
if m.Labels == nil {
175+
m.Labels = w.labelLookup("", m.Name)
176+
}
177+
return m
178+
default:
179+
return obj
180+
}
181+
}
182+
148183
// toRuntimeObject converts the metrics object to a runtime.Object
149184
func (w *metricsWatcher) toRuntimeObject(obj interface{}) runtime.Object {
150185
switch m := obj.(type) {
@@ -232,19 +267,21 @@ func (w *metricsWatcher) sendBookmark(resourceVersion string) bool {
232267

233268
// PodMetricsWatchHelper helps create watches for pod metrics
234269
type PodMetricsWatchHelper struct {
235-
storage WatchablePodMetricsGetter
270+
storage WatchablePodMetricsGetter
271+
labelLookup LabelLookupFunc
236272
}
237273

238-
// NewPodMetricsWatchHelper creates a new watch helper for pod metrics
239-
func NewPodMetricsWatchHelper(storage WatchablePodMetricsGetter) *PodMetricsWatchHelper {
240-
return &PodMetricsWatchHelper{storage: storage}
274+
// NewPodMetricsWatchHelper creates a new watch helper for pod metrics.
275+
// labelLookup provides labels for pod metrics objects (used for label selector filtering).
276+
func NewPodMetricsWatchHelper(storage WatchablePodMetricsGetter, labelLookup LabelLookupFunc) *PodMetricsWatchHelper {
277+
return &PodMetricsWatchHelper{storage: storage, labelLookup: labelLookup}
241278
}
242279

243280
// Watch creates a new watch for pod metrics with the given filters.
244281
// If sendInitialEvents is true, it sends all current metrics as ADDED events
245282
// followed by a BOOKMARK before streaming updates.
246283
func (h *PodMetricsWatchHelper) Watch(ctx context.Context, namespace string, labelSelector labels.Selector, sendInitialEvents bool) (watch.Interface, error) {
247-
w := newMetricsWatcher(namespace, labelSelector, resourceTypePod)
284+
w := newMetricsWatcher(namespace, labelSelector, resourceTypePod, h.labelLookup)
248285

249286
var watcherID uint64
250287

@@ -287,19 +324,21 @@ func (h *PodMetricsWatchHelper) Watch(ctx context.Context, namespace string, lab
287324

288325
// NodeMetricsWatchHelper helps create watches for node metrics
289326
type NodeMetricsWatchHelper struct {
290-
storage WatchableNodeMetricsGetter
327+
storage WatchableNodeMetricsGetter
328+
labelLookup LabelLookupFunc
291329
}
292330

293-
// NewNodeMetricsWatchHelper creates a new watch helper for node metrics
294-
func NewNodeMetricsWatchHelper(storage WatchableNodeMetricsGetter) *NodeMetricsWatchHelper {
295-
return &NodeMetricsWatchHelper{storage: storage}
331+
// NewNodeMetricsWatchHelper creates a new watch helper for node metrics.
332+
// labelLookup provides labels for node metrics objects (used for label selector filtering).
333+
func NewNodeMetricsWatchHelper(storage WatchableNodeMetricsGetter, labelLookup LabelLookupFunc) *NodeMetricsWatchHelper {
334+
return &NodeMetricsWatchHelper{storage: storage, labelLookup: labelLookup}
296335
}
297336

298337
// Watch creates a new watch for node metrics with the given filters.
299338
// If sendInitialEvents is true, it sends all current metrics as ADDED events
300339
// followed by a BOOKMARK before streaming updates.
301340
func (h *NodeMetricsWatchHelper) Watch(ctx context.Context, labelSelector labels.Selector, sendInitialEvents bool) (watch.Interface, error) {
302-
w := newMetricsWatcher("", labelSelector, resourceTypeNode) // Nodes are cluster-scoped
341+
w := newMetricsWatcher("", labelSelector, resourceTypeNode, h.labelLookup) // Nodes are cluster-scoped
303342

304343
var watcherID uint64
305344

pkg/api/watcher_test.go

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
)
2828

2929
func TestMetricsWatcher_BasicSendReceive(t *testing.T) {
30-
w := newMetricsWatcher("", labels.Everything(), resourceTypePod)
30+
w := newMetricsWatcher("", labels.Everything(), resourceTypePod, nil)
3131
defer w.Stop()
3232

3333
// Send an event
@@ -51,7 +51,7 @@ func TestMetricsWatcher_BasicSendReceive(t *testing.T) {
5151
}
5252

5353
func TestMetricsWatcher_StopClosesChannel(t *testing.T) {
54-
w := newMetricsWatcher("", labels.Everything(), resourceTypePod)
54+
w := newMetricsWatcher("", labels.Everything(), resourceTypePod, nil)
5555

5656
// Stop the watcher
5757
w.Stop()
@@ -75,7 +75,7 @@ func TestMetricsWatcher_StopClosesChannel(t *testing.T) {
7575
}
7676

7777
func TestMetricsWatcher_NamespaceFilter(t *testing.T) {
78-
w := newMetricsWatcher("test-ns", labels.Everything(), resourceTypePod)
78+
w := newMetricsWatcher("test-ns", labels.Everything(), resourceTypePod, nil)
7979
defer w.Stop()
8080

8181
// Event in matching namespace
@@ -113,7 +113,7 @@ func TestMetricsWatcher_NamespaceFilter(t *testing.T) {
113113

114114
func TestMetricsWatcher_LabelSelectorFilter(t *testing.T) {
115115
selector, _ := labels.Parse("app=myapp")
116-
w := newMetricsWatcher("", selector, resourceTypeNode)
116+
w := newMetricsWatcher("", selector, resourceTypeNode, nil)
117117
defer w.Stop()
118118

119119
// Event with matching labels
@@ -150,7 +150,7 @@ func TestMetricsWatcher_LabelSelectorFilter(t *testing.T) {
150150
}
151151

152152
func TestMetricsWatcher_SendInitialEvents(t *testing.T) {
153-
w := newMetricsWatcher("", labels.Everything(), resourceTypePod)
153+
w := newMetricsWatcher("", labels.Everything(), resourceTypePod, nil)
154154
defer w.Stop()
155155

156156
// Create some initial objects
@@ -202,7 +202,7 @@ func TestMetricsWatcher_SendInitialEvents(t *testing.T) {
202202
}
203203

204204
func TestMetricsWatcher_SlowConsumer(t *testing.T) {
205-
w := newMetricsWatcher("", labels.Everything(), resourceTypePod)
205+
w := newMetricsWatcher("", labels.Everything(), resourceTypePod, nil)
206206
defer w.Stop()
207207

208208
event := WatchEvent{
@@ -239,7 +239,7 @@ func TestMetricsWatcher_ContextCancellation(t *testing.T) {
239239
allMetrics: []metrics.PodMetrics{},
240240
}
241241

242-
helper := NewPodMetricsWatchHelper(storage)
242+
helper := NewPodMetricsWatchHelper(storage, nil)
243243

244244
w, err := helper.Watch(ctx, "", labels.Everything(), false)
245245
if err != nil {
@@ -281,7 +281,7 @@ func TestPodMetricsWatchHelper_WatchWithInitialEvents(t *testing.T) {
281281
allMetrics: []metrics.PodMetrics{pod1, pod2},
282282
}
283283

284-
helper := NewPodMetricsWatchHelper(storage)
284+
helper := NewPodMetricsWatchHelper(storage, nil)
285285
ctx, cancel := context.WithCancel(context.Background())
286286
defer cancel()
287287

@@ -329,7 +329,7 @@ func TestNodeMetricsWatchHelper_WatchWithInitialEvents(t *testing.T) {
329329
allMetrics: []metrics.NodeMetrics{node1, node2},
330330
}
331331

332-
helper := NewNodeMetricsWatchHelper(storage)
332+
helper := NewNodeMetricsWatchHelper(storage, nil)
333333
ctx, cancel := context.WithCancel(context.Background())
334334
defer cancel()
335335

@@ -368,7 +368,7 @@ func TestNodeMetricsWatchHelper_WatchWithInitialEvents(t *testing.T) {
368368
func TestMetricsWatcher_BookmarkType(t *testing.T) {
369369
// Test that pod watchers get PodMetrics bookmarks
370370
t.Run("pod watcher gets PodMetrics bookmark", func(t *testing.T) {
371-
w := newMetricsWatcher("", labels.Everything(), resourceTypePod)
371+
w := newMetricsWatcher("", labels.Everything(), resourceTypePod, nil)
372372
defer w.Stop()
373373

374374
// Send bookmark directly - it goes to result channel
@@ -393,7 +393,7 @@ func TestMetricsWatcher_BookmarkType(t *testing.T) {
393393

394394
// Test that node watchers get NodeMetrics bookmarks
395395
t.Run("node watcher gets NodeMetrics bookmark", func(t *testing.T) {
396-
w := newMetricsWatcher("", labels.Everything(), resourceTypeNode)
396+
w := newMetricsWatcher("", labels.Everything(), resourceTypeNode, nil)
397397
defer w.Stop()
398398

399399
ok := w.sendBookmark("456")
@@ -416,6 +416,58 @@ func TestMetricsWatcher_BookmarkType(t *testing.T) {
416416
})
417417
}
418418

419+
func TestMetricsWatcher_LabelEnrichment(t *testing.T) {
420+
// Simulate: storage events have no labels, but labelLookup provides them
421+
lookup := func(namespace, name string) map[string]string {
422+
if name == "pod1" {
423+
return map[string]string{"app": "myapp"}
424+
}
425+
if name == "pod2" {
426+
return map[string]string{"app": "other"}
427+
}
428+
return nil
429+
}
430+
431+
selector, _ := labels.Parse("app=myapp")
432+
w := newMetricsWatcher("", selector, resourceTypePod, lookup)
433+
defer w.Stop()
434+
435+
// Send events WITHOUT labels (as storage does)
436+
pod1 := metrics.PodMetrics{}
437+
pod1.Name = "pod1"
438+
pod1.Namespace = "default"
439+
// Note: no Labels set
440+
441+
pod2 := metrics.PodMetrics{}
442+
pod2.Name = "pod2"
443+
pod2.Namespace = "default"
444+
445+
w.Send(WatchEvent{Type: watch.Modified, Object: pod1})
446+
w.Send(WatchEvent{Type: watch.Modified, Object: pod2})
447+
448+
// Should receive pod1 (labels enriched from lookup → matches selector)
449+
select {
450+
case event := <-w.ResultChan():
451+
pm := event.Object.(*metrics.PodMetrics)
452+
if pm.Name != "pod1" {
453+
t.Errorf("Expected pod1, got %s", pm.Name)
454+
}
455+
if pm.Labels["app"] != "myapp" {
456+
t.Errorf("Expected enriched labels, got %v", pm.Labels)
457+
}
458+
case <-time.After(time.Second):
459+
t.Fatal("Timeout waiting for enriched event")
460+
}
461+
462+
// pod2 should be filtered out (labels enriched → doesn't match selector)
463+
select {
464+
case event := <-w.ResultChan():
465+
t.Errorf("Unexpected event: %v", event)
466+
case <-time.After(100 * time.Millisecond):
467+
// Expected
468+
}
469+
}
470+
419471
// fakeWatchablePodStorage implements WatchablePodMetricsGetter for testing
420472
type fakeWatchablePodStorage struct {
421473
currentRV string

0 commit comments

Comments
 (0)