Skip to content

Commit 343daa9

Browse files
authored
[ISSUE #10173] Improve PopLite: rename RocksDB CQ path and schedule autoClean (#10242)
1 parent 7b85a5d commit 343daa9

File tree

3 files changed

+9
-3
lines changed

3 files changed

+9
-3
lines changed

broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,7 @@ public void updateNextVisibleTime(String topic, String group, int queueId, long
291291
updateLockFreeTimestamp(topic, group, queueId, orderInfo);
292292
}
293293

294-
@VisibleForTesting
295-
protected void autoClean() {
294+
public void autoClean() {
296295
if (brokerController == null) {
297296
return;
298297
}

broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,9 @@ public GetMessageResult getMessage(String clientHost, String group, String lmqNa
438438
}
439439

440440
public class PopLiteLockManager extends ServiceThread {
441+
private static final long AUTO_CLEAN_INTERVAL = 5 * 60 * 1000;
442+
private long lastCleanTime = System.currentTimeMillis();
443+
441444
@Override
442445
public String getServiceName() {
443446
if (brokerController.getBrokerConfig().isInBrokerContainer()) {
@@ -452,6 +455,10 @@ public void run() {
452455
try {
453456
waitForRunning(60000);
454457
lockService.removeTimeout();
458+
if (System.currentTimeMillis() - lastCleanTime >= AUTO_CLEAN_INTERVAL) {
459+
((MemoryConsumerOrderInfoManager) consumerOrderInfoManager).autoClean();
460+
lastCleanTime = System.currentTimeMillis();
461+
}
455462
} catch (Exception ignored) {
456463
}
457464
}

store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public static String getStorePathBatchConsumeQueue(final String rootDir) {
3232
}
3333

3434
public static String getStorePathRocksDBConsumeQueue(final String rootDir) {
35-
return rootDir + File.separator + "consumequeue_r";
35+
return rootDir + File.separator + "consumequeue_rocksdb";
3636
}
3737

3838
public static String getStorePathIndex(final String rootDir) {

0 commit comments

Comments
 (0)