Skip to content

feat(inkless): extended metrics for diskless migration states tracking#589

Draft
giuseppelillo wants to merge 1 commit into
mainfrom
giuseppelillo/diskless-migration-metrics
Draft

feat(inkless): extended metrics for diskless migration states tracking#589
giuseppelillo wants to merge 1 commit into
mainfrom
giuseppelillo/diskless-migration-metrics

Conversation

@giuseppelillo
Copy link
Copy Markdown
Contributor

@giuseppelillo giuseppelillo commented May 11, 2026

Add the following metrics:

  • ClassicToDisklessMigrationsInFlight
  • ClassicToDisklessMigrationsWaitingForReplicationCount
  • ClassicToDisklessMigrationsSendingToControllerCount
  • ClassicToDisklessMigrationsAwaitingMetadataCount
  • ClassicToDisklessMigrationOldestWaitingForReplicationAgeMs
  • ClassicToDisklessMigrationOldestSendingToControllerAgeMs
  • ClassicToDisklessMigrationOldestAwaitingMetadataAgeMs
  • ClassicToDisklessMigrationsCompletedPerSec
  • ClassicToDisklessMigrationsFailedPerSec
  • ClassicToDisklessMigrationsRetriedPerSec

@giuseppelillo giuseppelillo force-pushed the giuseppelillo/diskless-migration-metrics branch 2 times, most recently from 71a7db1 to 25c0544 Compare May 11, 2026 13:27
Add gauges for:
- number of migrations in flight
- partitions in intermediate migration states
- oldest age per state

Add meters for completed, failed and retried migrations.
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/diskless-migration-metrics branch from 25c0544 to 931671f Compare May 11, 2026 13:36
@giuseppelillo giuseppelillo requested a review from Copilot May 11, 2026 13:37
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds expanded Yammer/JMX metrics to track classic→diskless migration progress in InitDisklessLogManager, including per-state in-flight gauges, oldest-age-per-state gauges, and completed/failed/retried meters. It also updates the batch queues and tests to validate the new metrics and to avoid cross-test metric leakage.

Changes:

  • Added InitDisklessLogManager metrics (per-state count + oldest-age gauges; completed/failed/retried meters) and wired metric refreshes into state transitions.
  • Added an onRetry callback to RetriableInitDisklessLogBatchQueue so retries can be metered.
  • Updated unit/flow tests to assert metric behavior and clean up metrics between test runs; plumbed Time into InitDisklessLogManager.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
core/src/main/scala/kafka/server/InitDisklessLogManager.scala Adds migration metrics, refresh hooks, retry/failure/completion metering, and a removeMetrics() helper.
core/src/main/scala/kafka/server/InitDisklessLogBatchQueue.scala Adds onRetry callback and invokes it when a retry is enqueued.
core/src/main/scala/kafka/server/BrokerServer.scala Passes time into InitDisklessLogManager construction.
core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala Ensures metrics are removed after each test; updates manager construction to pass time.
core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala Adds an end-to-end test asserting per-state gauges/ages/meters; cleans up metrics on shutdown.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Comment thread core/src/main/scala/kafka/server/InitDisklessLogManager.scala
Comment on lines +275 to +278
metricsGroup.newGauge(MigrationsInFlightMetricName, () => trackedSize())
metricsGroup.newGauge(WaitingForReplicationCountMetricName, () => enteredAtByTp.values().asScala.count(_.stateClass eq classOf[WaitingForReplication]))
metricsGroup.newGauge(SendingToControllerCountMetricName, () => enteredAtByTp.values().asScala.count(_.stateClass eq classOf[SendingToController]))
metricsGroup.newGauge(AwaitingMetadataCountMetricName, () => enteredAtByTp.values().asScala.count(_.stateClass eq classOf[AwaitingMetadata]))
Comment on lines +280 to +282
metricsGroup.newGauge(OldestWaitingForReplicationAgeMsMetricName, () => oldestAgeMs(classOf[WaitingForReplication]))
metricsGroup.newGauge(OldestSendingToControllerAgeMsMetricName, () => oldestAgeMs(classOf[SendingToController]))
metricsGroup.newGauge(OldestAwaitingMetadataAgeMsMetricName, () => oldestAgeMs(classOf[AwaitingMetadata]))
}
}

def removeMetrics(): Unit = metrics.removeMetrics()
Comment on lines +231 to +241
withQueueLock {
val retryAttemptNumber = attempt.attemptNumber + 1
Option(queuedByTp.get(tp)) match {
case Some(existing) =>
// Keep the already queued state (it may be fresher), but ensure retry progression is not lost.
queuedByTp.put(tp, Attempt(existing.state, Math.max(existing.attemptNumber, retryAttemptNumber)))
case None =>
queuedByTp.put(tp, Attempt(attempt.state, retryAttemptNumber))
}
}
onRetry()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants