diff --git a/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherManager.scala b/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherManager.scala index d0ef66a656..99647b4c67 100644 --- a/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherManager.scala +++ b/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherManager.scala @@ -27,7 +27,8 @@ class ConsolidationFetcherManager(brokerConfig: KafkaConfig, replicaManager: ReplicaManager, quotaManager: ReplicationQuotaManager, fetchHandler: FetchHandler, - fetchOffsetHandler: FetchOffsetHandler) + fetchOffsetHandler: FetchOffsetHandler, + consolidationMetrics: Option[ConsolidationMetrics] = None) extends AbstractFetcherManager[ConsolidationFetcherThread](name = "ConsolidationFetcherManager on broker " + brokerConfig.brokerId, clientId = "Consolidation", numFetchers = brokerConfig.numReplicaFetchers) { @@ -47,7 +48,7 @@ class ConsolidationFetcherManager(brokerConfig: KafkaConfig, replicaManager.brokerEpochSupplier ) new ConsolidationFetcherThread(threadName, disklessLeaderEndPoint, brokerConfig, failedPartitions, replicaManager, - quotaManager, logContext.logPrefix) + quotaManager, logContext.logPrefix, consolidationMetrics) } def shutdown(): Unit = { diff --git a/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala b/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala index c0b66a8f53..3502a29321 100644 --- a/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala +++ b/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThread.scala @@ -20,8 +20,12 @@ package io.aiven.inkless.consolidation import io.aiven.inkless.consume.ConcatenatedRecords import kafka.server.{FailedPartitions, KafkaConfig, ReplicaFetcherThread, ReplicaManager, ReplicaQuota} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.server.LeaderEndPoint +import org.apache.kafka.storage.internals.log.LogAppendInfo + +import scala.util.Try class ConsolidationFetcherThread(name: String, leader: LeaderEndPoint, @@ -29,7 +33,8 @@ class ConsolidationFetcherThread(name: String, failedPartitions: FailedPartitions, replicaMgr: ReplicaManager, quota: ReplicaQuota, - logPrefix: String) extends ReplicaFetcherThread(name, leader, brokerConfig, failedPartitions, replicaMgr, quota, logPrefix) { + logPrefix: String, + consolidationMetrics: Option[ConsolidationMetrics] = None) extends ReplicaFetcherThread(name, leader, brokerConfig, failedPartitions, replicaMgr, quota, logPrefix) { override def toMemoryRecords(records: Records): MemoryRecords = { (records: @unchecked) match { @@ -37,4 +42,30 @@ class ConsolidationFetcherThread(name: String, case _ => super.toMemoryRecords(records) } } + + override def processPartitionData( + topicPartition: TopicPartition, + fetchOffset: Long, + partitionLeaderEpoch: Int, + partitionData: FetchData + ): Option[LogAppendInfo] = { + val result = super.processPartitionData(topicPartition, fetchOffset, partitionLeaderEpoch, partitionData) + + consolidationMetrics.foreach { metrics => + val logOpt = Try(replicaMgr.getPartitionOrException(topicPartition).localLogOrException).toOption + logOpt.foreach { log => + val disklessLogEndOffset = partitionData.highWatermark + val localLogEndOffset = log.logEndOffset + val remoteLogEndOffset = log.highestOffsetInRemoteStorage() + + metrics.updateLocalLag(topicPartition, Math.max(0L, disklessLogEndOffset - localLogEndOffset)) + if (remoteLogEndOffset >= 0) { + metrics.updateLag(topicPartition, Math.max(0L, disklessLogEndOffset - remoteLogEndOffset)) + metrics.updateDeletableMessages(topicPartition, Math.max(0L, remoteLogEndOffset - log.localLogStartOffset())) + } + } + } + + result + } } diff --git a/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationMetrics.scala b/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationMetrics.scala new file mode 100644 index 0000000000..cc81e7021e --- /dev/null +++ b/core/src/main/scala/io/aiven/inkless/consolidation/ConsolidationMetrics.scala @@ -0,0 +1,82 @@ +/* + * Inkless + * Copyright (C) 2024 - 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.aiven.inkless.consolidation + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.server.metrics.KafkaMetricsGroup + +import java.io.Closeable +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong +import scala.jdk.CollectionConverters._ + +class ConsolidationMetrics extends Closeable { + private val Lag = "inkless.remote.consolidation.lag" + private val LocalLag = "inkless.remote.consolidation.local.lag" + private val DeletableMessages = "inkless.remote.consolidation.deletable.messages" + + private val metricsGroup = new KafkaMetricsGroup("io.aiven.inkless.consolidation", "ConsolidationMetrics") + + private val lagByPartition = new ConcurrentHashMap[TopicPartition, AtomicLong]() + private val localLagByPartition = new ConcurrentHashMap[TopicPartition, AtomicLong]() + private val deletableByPartition = new ConcurrentHashMap[TopicPartition, AtomicLong]() + + def registerPartition(tp: TopicPartition): Unit = { + val tags = Map("topic" -> tp.topic, "partition" -> tp.partition.toString).asJava + + lagByPartition.computeIfAbsent(tp, _ => { + val value = new AtomicLong(0) + metricsGroup.newGauge(Lag, () => value.get, tags) + value + }).set(0) + localLagByPartition.computeIfAbsent(tp, _ => { + val value = new AtomicLong(0) + metricsGroup.newGauge(LocalLag, () => value.get, tags) + value + }).set(0) + deletableByPartition.computeIfAbsent(tp, _ => { + val value = new AtomicLong(0) + metricsGroup.newGauge(DeletableMessages, () => value.get, tags) + value + }).set(0) + } + + def updateLag(tp: TopicPartition, lag: Long): Unit = + Option(lagByPartition.get(tp)).foreach(_.set(lag)) + + def updateLocalLag(tp: TopicPartition, lag: Long): Unit = + Option(localLagByPartition.get(tp)).foreach(_.set(lag)) + + def updateDeletableMessages(tp: TopicPartition, count: Long): Unit = + Option(deletableByPartition.get(tp)).foreach(_.set(count)) + + def unregisterPartition(tp: TopicPartition): Unit = { + val tags = Map("topic" -> tp.topic, "partition" -> tp.partition.toString).asJava + lagByPartition.remove(tp) + localLagByPartition.remove(tp) + deletableByPartition.remove(tp) + metricsGroup.removeMetric(Lag, tags) + metricsGroup.removeMetric(LocalLag, tags) + metricsGroup.removeMetric(DeletableMessages, tags) + } + + override def close(): Unit = { + lagByPartition.keys.asScala.toList.foreach(unregisterPartition) + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7cab4b1bf0..dc7b495505 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -23,7 +23,7 @@ import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchRes import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer} import io.aiven.inkless.merge.FileMerger import io.aiven.inkless.produce.AppendHandler -import io.aiven.inkless.consolidation.ConsolidationFetcherManager +import io.aiven.inkless.consolidation.{ConsolidationFetcherManager, ConsolidationMetrics} import kafka.cluster.Partition import kafka.log.LogManager import kafka.server.HostedPartition.Online @@ -284,13 +284,18 @@ class ReplicaManager(val config: KafkaConfig, private val inklessFileCleaner: Option[FileCleaner] = inklessSharedState.map(new FileCleaner(_)) // FIXME: FileMerger is having issues with hanging queries. Disabling until fixed. private val inklessFileMerger: Option[FileMerger] = None // inklessSharedState.map(new FileMerger(_)) + private val consolidationMetrics: Option[ConsolidationMetrics] = + if (config.disklessRemoteStorageConsolidationEnabled && inklessFetchHandler.isDefined && inklessFetchOffsetHandler.isDefined) + Some(new ConsolidationMetrics()) + else + None private val consolidationFetcherManager: Option[ConsolidationFetcherManager] = if (config.disklessRemoteStorageConsolidationEnabled) { if (inklessFetchHandler.isEmpty || inklessFetchOffsetHandler.isEmpty) { logger.warn("Remote storage consolidation is enabled, however Inkless doesn't seem to be configured properly.") } inklessFetchHandler.zip(inklessFetchOffsetHandler).map { case (fetchHandler, fetchOffsetHandler) => - new ConsolidationFetcherManager(config, this, quotaManagers.follower, fetchHandler, fetchOffsetHandler) + new ConsolidationFetcherManager(config, this, quotaManagers.follower, fetchHandler, fetchOffsetHandler, consolidationMetrics) } } else { None @@ -487,8 +492,13 @@ class ReplicaManager(val config: KafkaConfig, val partitions = partitionsToStop.map(_.topicPartition) replicaFetcherManager.removeFetcherForPartitions(partitions) replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) - consolidationFetcherManager.foreach(_.removeFetcherForPartitions( - partitions.filter(p => _inklessMetadataView.isConsolidatingDisklessTopic(p.topic)))) + val consolidatingPartitionsToStop = if (config.disklessRemoteStorageConsolidationEnabled) + partitions.filter(p => _inklessMetadataView.isConsolidatingDisklessTopic(p.topic)) + else Set[TopicPartition]() + consolidationFetcherManager.foreach(_.removeFetcherForPartitions(consolidatingPartitionsToStop)) + consolidationMetrics.foreach { metrics => + consolidatingPartitionsToStop.foreach(tp => metrics.unregisterPartition(tp)) + } // Second remove deleted partitions from the partition map. Fetchers rely on the // ReplicaManager to get Partition's information so they must be stopped first. @@ -2590,6 +2600,9 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions) replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition)) consolidationFetcherManager.foreach(_.removeFetcherForPartitions(newOfflinePartitions)) + consolidationMetrics.foreach { metrics => + newOfflinePartitions.foreach(tp => metrics.unregisterPartition(tp)) + } partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false)) newOfflinePartitions.foreach { topicPartition => @@ -2645,6 +2658,7 @@ class ReplicaManager(val config: KafkaConfig, if (checkpointHW) checkpointHighWatermarks() consolidationFetcherManager.foreach(_.shutdown()) + consolidationMetrics.foreach(_.close()) replicaSelectorPlugin.foreach(_.close) removeAllTopicMetrics() addPartitionsToTxnManager.foreach(_.shutdown()) @@ -3055,6 +3069,9 @@ class ReplicaManager(val config: KafkaConfig, } consolidationFetcherManager.foreach(_.addFetcherForPartitions(consolidatingPartitionAndOffsets)) + consolidationMetrics.foreach { metrics => + consolidatingPartitionAndOffsets.keys.foreach(tp => metrics.registerPartition(tp)) + } stateChangeLogger.info(s"Started consolidating diskless fetchers as part of become-leader for ${consolidatingDisklessPartitionsToStartFetching.size} partitions") } } @@ -3186,6 +3203,9 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) consolidationFetcherManager.foreach(_.addFetcherForPartitions(consolidatingPartitionAndOffsets)) + consolidationMetrics.foreach { metrics => + consolidatingPartitionAndOffsets.keys.foreach(tp => metrics.registerPartition(tp)) + } stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions") partitionsToStartFetching.foreach{ case (topicPartition, partition) => diff --git a/core/src/test/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThreadTest.scala b/core/src/test/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThreadTest.scala new file mode 100644 index 0000000000..b7dcc70ff4 --- /dev/null +++ b/core/src/test/scala/io/aiven/inkless/consolidation/ConsolidationFetcherThreadTest.scala @@ -0,0 +1,214 @@ +/* + * Inkless + * Copyright (C) 2024 - 2026 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.aiven.inkless.consolidation + +import kafka.cluster.Partition +import kafka.server._ +import kafka.server.metadata.InklessMetadataView +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.compress.Compression +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord} +import org.apache.kafka.metadata.PartitionRegistration +import org.apache.kafka.server.LeaderEndPoint +import org.apache.kafka.server.network.BrokerEndPoint +import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.apache.kafka.storage.internals.log.{LogAppendInfo, UnifiedLog} +import org.apache.kafka.storage.log.metrics.BrokerTopicStats +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.mockito.ArgumentMatchers.{any, anyLong} +import org.mockito.Mockito.{mock, when} + +import java.nio.charset.StandardCharsets +import java.util.Optional +import scala.jdk.CollectionConverters._ + +class ConsolidationFetcherThreadTest { + + private val topicPartition = new TopicPartition("test-topic", 0) + private val failedPartitions = new FailedPartitions + private var metrics: ConsolidationMetrics = _ + + @BeforeEach + def setUp(): Unit = { + metrics = new ConsolidationMetrics() + } + + @AfterEach + def tearDown(): Unit = { + metrics.close() + TestUtils.clearYammerMetrics() + } + + private def createConsolidationFetcherThread( + replicaManager: ReplicaManager, + consolidationMetrics: Option[ConsolidationMetrics] + ): ConsolidationFetcherThread = { + val props = TestUtils.createBrokerConfig(nodeId=1) + val config = KafkaConfig.fromProps(props) + val leader = mock(classOf[LeaderEndPoint]) + when(leader.brokerEndPoint()).thenReturn(new BrokerEndPoint(0, "localhost", 9092)) + new ConsolidationFetcherThread( + "consolidation-fetcher-test", + leader, + config, + failedPartitions, + replicaManager, + QuotaFactory.UNBOUNDED_QUOTA, + "[ConsolidationFetcherTest] ", + consolidationMetrics + ) + } + + private def mockReplicaManager( + partition: Partition, + inklessMetadataView: InklessMetadataView = null + ): ReplicaManager = { + val replicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.getPartitionOrException(any[TopicPartition])).thenReturn(partition) + when(replicaManager.brokerTopicStats).thenReturn(new BrokerTopicStats) + val view = if (inklessMetadataView != null) inklessMetadataView else { + val v = mock(classOf[InklessMetadataView]) + when(v.getClassicToDisklessStartOffset(any[TopicPartition])) + .thenReturn(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) + v + } + when(replicaManager.inklessMetadataView()).thenReturn(view) + when(replicaManager.replicaFetcherManager).thenReturn(mock(classOf[ReplicaFetcherManager])) + replicaManager + } + + private def mockPartitionWithLog( + logEndOffset: Long, + highestOffsetInRemoteStorage: Long, + localLogStartOffset: Long = 0L + ): Partition = { + val log = mock(classOf[UnifiedLog]) + when(log.logEndOffset).thenReturn(logEndOffset) + when(log.highestOffsetInRemoteStorage()).thenReturn(highestOffsetInRemoteStorage) + when(log.localLogStartOffset()).thenReturn(localLogStartOffset) + when(log.maybeUpdateHighWatermark(anyLong())).thenReturn(Optional.empty) + + val partition = mock(classOf[Partition]) + when(partition.localLogOrException).thenReturn(log) + when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean], any[Int])) + .thenReturn(Some(mock(classOf[LogAppendInfo]))) + partition + } + + private def buildPartitionData(highWatermark: Long): FetchResponseData.PartitionData = { + val records = MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))) + new FetchResponseData.PartitionData() + .setPartitionIndex(topicPartition.partition) + .setRecords(records) + .setHighWatermark(highWatermark) + .setLogStartOffset(0) + } + + @Test + def testMetricsUpdatedOnProcessPartitionData(): Unit = { + val disklessLEO = 100L + val localLEO = 80L + val remoteOffset = 60L + val localLogStart = 10L + + val partition = mockPartitionWithLog(localLEO, remoteOffset, localLogStart) + val replicaManager = mockReplicaManager(partition) + val thread = createConsolidationFetcherThread(replicaManager, Some(metrics)) + + metrics.registerPartition(topicPartition) + + thread.processPartitionData(topicPartition, localLEO, Int.MaxValue, buildPartitionData(disklessLEO)) + + assertEquals(40L, findGaugeValue("inkless.remote.consolidation.lag", topicPartition)) + assertEquals(20L, findGaugeValue("inkless.remote.consolidation.local.lag", topicPartition)) + assertEquals(50L, findGaugeValue("inkless.remote.consolidation.deletable.messages", topicPartition)) + } + + @Test + def testRemoteLagSkippedWhenRemoteStorageNotActive(): Unit = { + val disklessLEO = 100L + val localLEO = 80L + val remoteOffset = -1L + + val partition = mockPartitionWithLog(localLEO, remoteOffset) + val replicaManager = mockReplicaManager(partition) + val thread = createConsolidationFetcherThread(replicaManager, Some(metrics)) + + metrics.registerPartition(topicPartition) + + thread.processPartitionData(topicPartition, localLEO, Int.MaxValue, buildPartitionData(disklessLEO)) + + assertEquals(20L, findGaugeValue("inkless.remote.consolidation.local.lag", topicPartition)) + assertEquals(0L, findGaugeValue("inkless.remote.consolidation.lag", topicPartition)) + assertEquals(0L, findGaugeValue("inkless.remote.consolidation.deletable.messages", topicPartition)) + } + + @Test + def testLagMetricsClampedToZeroWhenLocalAheadOfDiskless(): Unit = { + val disklessLEO = 50L + val localLEO = 60L + val remoteOffset = 70L + val localLogStart = 0L + + val partition = mockPartitionWithLog(localLEO, remoteOffset, localLogStart) + val replicaManager = mockReplicaManager(partition) + val thread = createConsolidationFetcherThread(replicaManager, Some(metrics)) + + metrics.registerPartition(topicPartition) + + thread.processPartitionData(topicPartition, localLEO, Int.MaxValue, buildPartitionData(disklessLEO)) + + assertEquals(0L, findGaugeValue("inkless.remote.consolidation.lag", topicPartition)) + assertEquals(0L, findGaugeValue("inkless.remote.consolidation.local.lag", topicPartition)) + assertEquals(70L, findGaugeValue("inkless.remote.consolidation.deletable.messages", topicPartition)) + } + + @Test + def testNoMetricsRegisteredWhenConsolidationMetricsIsNone(): Unit = { + val partition = mockPartitionWithLog(80L, 60L) + val replicaManager = mockReplicaManager(partition) + val thread = createConsolidationFetcherThread(replicaManager, None) + + thread.processPartitionData(topicPartition, 80L, Int.MaxValue, buildPartitionData(100L)) + + assertNull(findGaugeOrNull("inkless.remote.consolidation.lag", topicPartition)) + assertNull(findGaugeOrNull("inkless.remote.consolidation.local.lag", topicPartition)) + assertNull(findGaugeOrNull("inkless.remote.consolidation.deletable.messages", topicPartition)) + } + + private def findGaugeOrNull(name: String, tp: TopicPartition): com.yammer.metrics.core.Gauge[_] = { + val expectedScope = s"partition.${tp.partition}.topic.${tp.topic.replace(".", "_")}" + KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .find { case (metricName, _) => + metricName.getName == name && metricName.getScope == expectedScope + } + .map(_._2.asInstanceOf[com.yammer.metrics.core.Gauge[_]]) + .orNull + } + + private def findGaugeValue(name: String, tp: TopicPartition): Long = { + val gauge = findGaugeOrNull(name, tp) + assertNotNull(gauge, s"Gauge $name not found for $tp") + gauge.asInstanceOf[com.yammer.metrics.core.Gauge[Long]].value() + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index dc80383dc6..467bee5e12 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -392,7 +392,7 @@ public long logStartOffset() { return logStartOffset; } - long highestOffsetInRemoteStorage() { + public long highestOffsetInRemoteStorage() { return highestOffsetInRemoteStorage; }