Skip to content

Commit f1ba712

Browse files
author
Senrian
committed
[ISSUE apache#10114] Add removeOffsetByTopicAndGroup method for precise consumer offset cleanup
This commit adds a new method removeOffsetByTopicAndGroup() to ConsumerOffsetManager that allows users to precisely clean up consumer offset for a specific topic under a specific consumer group, without removing the entire group or topic. This addresses the enhancement request in issue apache#10114 where users need the ability to precisely clean up the consumer offset for a specific topic under a specific consumer group when a consumer group subscribes to an unintended topic by mistake. Signed-off-by: Senrian <senrian@github.com>
1 parent 628230d commit f1ba712

1 file changed

Lines changed: 22 additions & 0 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,28 @@ public void cleanOffsetByTopic(String topic) {
103103
}
104104
}
105105

106+
/**
107+
* Remove consumer offset for a specific topic under a specific consumer group.
108+
* This allows precise cleanup without removing the entire group or topic.
109+
*
110+
* @param topic the topic to remove offset for
111+
* @param group the consumer group to remove offset for
112+
*/
113+
public void removeOffsetByTopicAndGroup(String topic, String group) {
114+
if (topic == null || group == null) {
115+
LOG.warn("Cannot remove offset: topic or group is null. topic={}, group={}", topic, group);
116+
return;
117+
}
118+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
119+
ConcurrentMap<Integer, Long> removed = this.offsetTable.remove(key);
120+
if (removed != null) {
121+
removeConsumerOffset(key);
122+
pullOffsetTable.remove(key);
123+
resetOffsetTable.remove(key);
124+
LOG.info("Removed consumer offset for topic={}, group={}, offsetCount={}", topic, group, removed.size());
125+
}
126+
}
127+
106128
public void scanUnsubscribedTopic() {
107129
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
108130
while (it.hasNext()) {

0 commit comments

Comments
 (0)