feat(inkless): add lag metrics for consolidated topic partitions#590
feat(inkless): add lag metrics for consolidated topic partitions#590EelisK wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces per-partition JMX metrics to track diskless→tiered-storage consolidation progress (lag vs local log and vs remote tier, plus “deletable” backlog), wiring them into the consolidation fetcher lifecycle so metrics are registered/unregistered as partitions start/stop consolidating.
Changes:
- Add
ConsolidationMetricsto register per-partition gauges and update them from the consolidation fetcher thread. - Wire metrics lifecycle into
ReplicaManagerpartition start/stop flows and close metrics on broker shutdown. - Expose
UnifiedLog.highestOffsetInRemoteStorage()publicly so consolidation code can compute remote-tier lag.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java | Makes highestOffsetInRemoteStorage() publicly accessible for consolidation lag calculations. |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Creates/owns ConsolidationMetrics, registers/unregisters per-partition gauges during leader/follower transitions, closes metrics on shutdown. |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationMetrics.scala | New metrics helper that manages per-partition gauges (lag/local lag/deletable). |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala | Updates consolidation metrics after processing fetched partition data. |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherManager.scala | Passes optional metrics into fetcher threads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
viktorsomogyi
left a comment
There was a problem hiding this comment.
Would you mind creating a few tests on the ConsolidationFetcherThread level?
| replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) | ||
| consolidationFetcherManager.foreach(_.removeFetcherForPartitions( | ||
| partitions.filter(p => _inklessMetadataView.isConsolidatingDisklessTopic(p.topic)))) | ||
| val consolidatingPartitionsToStop = partitions.filter(p => _inklessMetadataView.isConsolidatingDisklessTopic(p.topic)) |
There was a problem hiding this comment.
Please add the && config.disklessRemoteStorageConsolidationEnabled check too to filter out whether the feature has been enabled or not.
Introduce per-partition metrics to monitor the consolidation process:
inkless.remote.consolidation.lag: diskless log end offset minus remote log end offsetinkless.remote.consolidation.local.lag: diskless log end offset minus local log end offsetinkless.remote.consolidation.deletable.messages: messages consolidated to tiered storage that can be removed