Skip to content

Commit 614b816

Browse files
authored
[ISSUE #10203] Support wildcard subscription and and consumer suspend for LiteTopic (#10204)
- Add wildcard (*) subscription support for liteTopic - Implement consume suspend mechanism with invalid scan count threshold - Refactor subscriber query interface with SubscriberWrapper for flexible retrieval - Add wildcard client cache with 30s TTL for performance optimization - Update related components and enhance test coverage Change-Id: I4ecaceec7daa2f4364d911437007df98dc49d542
1 parent 860de80 commit 614b816

File tree

24 files changed

+1454
-1183
lines changed

24 files changed

+1454
-1183
lines changed

WORKSPACE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ maven_install(
7171
"org.bouncycastle:bcpkix-jdk15on:1.69",
7272
"com.google.code.gson:gson:2.8.9",
7373
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
74-
"org.apache.rocketmq:rocketmq-proto:2.1.1",
74+
"org.apache.rocketmq:rocketmq-proto:2.1.2",
7575
"com.google.protobuf:protobuf-java:3.20.1",
7676
"com.google.protobuf:protobuf-java-util:3.20.1",
7777
"com.conversantmedia:disruptor:1.2.10",

broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.rocketmq.broker.lite;
1919

2020
import com.google.common.collect.Sets;
21+
import org.apache.commons.lang3.tuple.Triple;
2122
import org.apache.rocketmq.broker.BrokerController;
2223
import org.apache.rocketmq.common.MixAll;
2324
import org.apache.rocketmq.common.Pair;
@@ -32,6 +33,8 @@
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.Set;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
import java.util.function.Function;
3538

3639
import static org.apache.rocketmq.broker.offset.ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR;
3740

@@ -41,13 +44,15 @@
4144
*/
4245
public abstract class AbstractLiteLifecycleManager extends ServiceThread {
4346
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LITE_LOGGER_NAME);
47+
private static final int MAX_INVALID_SCAN_COUNT = 5;
4448

4549
protected final BrokerController brokerController;
4650
protected final String brokerName;
4751
protected final LiteSharding liteSharding;
4852
protected MessageStore messageStore;
4953
protected Map<String, Integer> ttlMap = Collections.emptyMap();
5054
protected Map<String, Set<String>> subscriberGroupMap = Collections.emptyMap();
55+
protected Map<String, Integer> invalidScanCountMap = new ConcurrentHashMap<>();
5156

5257
public AbstractLiteLifecycleManager(BrokerController brokerController, LiteSharding liteSharding) {
5358
this.brokerController = brokerController;
@@ -77,6 +82,15 @@ public void init() {
7782
*/
7883
public abstract List<String> collectByParentTopic(String parentTopic);
7984

85+
/**
86+
* Iterator of lite topic, for high frequency iteration
87+
* Triple<lmqName, maxOffsetInQueue, lastStoreTimestamp>, lastStoreTimestamp is null for now
88+
* return true to continue, false to break.
89+
*
90+
* @param function consumer func
91+
*/
92+
public abstract void forEachLiteTopic(Function<Triple<String, Long, Long>, Boolean> function);
93+
8094
/**
8195
* Check if the subscription for the given LMQ is active.
8296
* A subscription is considered active if either:
@@ -153,8 +167,16 @@ public boolean isLiteTopicExpired(String parentTopic, String lmqName, long maxOf
153167
return false;
154168
}
155169
if (maxOffset <= 0) {
156-
LOGGER.warn("unexpected condition, max offset <= 0, {}, {}", lmqName, maxOffset);
170+
int invalidCount = invalidScanCountMap.getOrDefault(lmqName, 0) + 1;
171+
LOGGER.warn("unexpected condition, max offset <= 0, {}, {}, scanCount:{}", lmqName, maxOffset, invalidCount);
172+
if (invalidCount > MAX_INVALID_SCAN_COUNT) { // check more times in case of concurrent issue
173+
invalidScanCountMap.remove(lmqName);
174+
return true;
175+
}
176+
invalidScanCountMap.put(lmqName, invalidCount);
157177
return false;
178+
} else {
179+
invalidScanCountMap.remove(lmqName);
158180
}
159181
long latestStoreTime =
160182
this.brokerController.getMessageStore().getMessageStoreTimeStamp(lmqName, 0, maxOffset - 1);

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java

Lines changed: 132 additions & 112 deletions
Large diffs are not rendered by default.

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteLifecycleManager.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.rocketmq.broker.lite;
1919

2020
import org.apache.commons.lang3.StringUtils;
21+
import org.apache.commons.lang3.tuple.Triple;
2122
import org.apache.rocketmq.broker.BrokerController;
2223
import org.apache.rocketmq.common.Pair;
2324
import org.apache.rocketmq.common.constant.LoggerName;
@@ -32,6 +33,7 @@
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.concurrent.ConcurrentMap;
36+
import java.util.function.Function;
3537

3638
public class LiteLifecycleManager extends AbstractLiteLifecycleManager {
3739
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LITE_LOGGER_NAME);
@@ -86,4 +88,29 @@ public List<Pair<String, String>> collectExpiredLiteTopic() {
8688
}
8789
return lmqToDelete;
8890
}
91+
92+
@Override
93+
public void forEachLiteTopic(Function<Triple<String, Long, Long>, Boolean> function) {
94+
Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> iterator =
95+
messageStore.getQueueStore().getConsumeQueueTable().entrySet().iterator();
96+
while (iterator.hasNext()) {
97+
Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> entry = iterator.next();
98+
if (!LiteUtil.isLiteTopicQueue(entry.getKey())) {
99+
continue;
100+
}
101+
ConsumeQueueInterface consumeQueueInterface = entry.getValue().get(0);
102+
if (null == consumeQueueInterface) {
103+
continue;
104+
}
105+
Triple<String, Long, Long> triple = Triple.of(entry.getKey(), consumeQueueInterface.getMaxOffsetInQueue(), null);
106+
try {
107+
if (!function.apply(triple)) {
108+
break;
109+
}
110+
} catch (Throwable e) {
111+
LOGGER.error("forEachLiteTopic error. {}", entry.getKey(), e);
112+
break;
113+
}
114+
}
115+
}
89116
}

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteMetadataUtil.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,15 @@ public static int getMaxClientEventCount(String group, BrokerController brokerCo
103103
return groupConfig.getMaxClientEventCount();
104104
}
105105

106+
public static boolean isWildcardGroup(String group, BrokerController brokerController) {
107+
if (null == group || null == brokerController) {
108+
return false;
109+
}
110+
SubscriptionGroupConfig groupConfig =
111+
brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
112+
return groupConfig != null && groupConfig.isWildcardLiteGroup();
113+
}
114+
106115
public static Map<String, Integer> getTopicTtlMap(BrokerController brokerController) {
107116
if (null == brokerController) {
108117
return Collections.emptyMap();

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import java.util.List;
2323
import java.util.Set;
24-
import org.apache.rocketmq.common.entity.ClientGroup;
2524
import org.apache.rocketmq.common.lite.LiteSubscription;
2625
import org.apache.rocketmq.common.lite.OffsetOption;
2726

@@ -43,7 +42,9 @@ public interface LiteSubscriptionRegistry {
4342

4443
void addListener(LiteCtlListener listener);
4544

46-
Set<ClientGroup> getSubscriber(String lmqName);
45+
SubscriberWrapper getAllSubscriber(String group, String lmqName);
46+
47+
SubscriberWrapper.ListWrapper getWildcardSubscriber(String group, String parentTopic);
4748

4849
List<String> getAllClientIdByGroup(String group);
4950

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java

Lines changed: 116 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,23 @@
1818
package org.apache.rocketmq.broker.lite;
1919

2020
import com.google.common.annotations.VisibleForTesting;
21+
import com.google.common.cache.Cache;
22+
import com.google.common.cache.CacheBuilder;
2123
import io.netty.channel.Channel;
2224
import java.util.ArrayList;
25+
import java.util.Collections;
2326
import java.util.List;
2427
import java.util.Map;
2528
import java.util.Objects;
2629
import java.util.Set;
2730
import java.util.concurrent.ConcurrentHashMap;
2831
import java.util.concurrent.ConcurrentMap;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.TimeUnit;
2934
import java.util.concurrent.atomic.AtomicInteger;
3035
import java.util.stream.Collectors;
3136
import org.apache.commons.collections.CollectionUtils;
37+
import org.apache.commons.lang3.StringUtils;
3238
import org.apache.rocketmq.broker.BrokerController;
3339
import org.apache.rocketmq.common.ServiceThread;
3440
import org.apache.rocketmq.common.constant.LoggerName;
@@ -47,8 +53,11 @@ public class LiteSubscriptionRegistryImpl extends ServiceThread implements LiteS
4753
protected final ConcurrentMap<String/*clientId*/, Channel> clientChannels = new ConcurrentHashMap<>();
4854
protected final ConcurrentMap<String/*clientId*/, LiteSubscription> client2Subscription = new ConcurrentHashMap<>();
4955
protected final ConcurrentMap<String/*lmqName*/, Set<ClientGroup>> liteTopic2Group = new ConcurrentHashMap<>();
56+
protected final ConcurrentMap<String/*topic*/, Set<String/*group*/>> wildcardGroupMap = new ConcurrentHashMap<>();
57+
private final Cache<String/*group*/, List<ClientGroup>> wildcardClientCache =
58+
CacheBuilder.newBuilder().maximumSize(2000).expireAfterWrite(30, TimeUnit.SECONDS).build();
5059

51-
private final List<LiteCtlListener> listeners = new ArrayList<>();
60+
protected final List<LiteCtlListener> listeners = new ArrayList<>();
5261
private final BrokerController brokerController;
5362
private final AbstractLiteLifecycleManager liteLifecycleManager;
5463

@@ -75,6 +84,9 @@ public void addPartialSubscription(String clientId, String group, String topic,
7584
// No need to check existence, if reach here, it must be new.
7685
throw new LiteQuotaException("lite subscription quota exceeded " + maxCount);
7786
}
87+
if (LiteMetadataUtil.isWildcardGroup(group, brokerController)) {
88+
throw new IllegalStateException("subscribe lite operation is not supported for this group");
89+
}
7890

7991
LiteSubscription thisSub = getOrCreateLiteSubscription(clientId, group, topic);
8092
// Utilize existing string object
@@ -106,9 +118,15 @@ public void removePartialSubscription(String clientId, String group, String topi
106118

107119
@Override
108120
public void addCompleteSubscription(String clientId, String group, String topic, Set<String> lmqNameAll, long version) {
109-
Set<String> lmqNameNew = lmqNameAll.stream()
110-
.filter(lmqName -> liteLifecycleManager.isSubscriptionActive(topic, lmqName))
111-
.collect(Collectors.toSet());
121+
Set<String> lmqNameNew;
122+
if (LiteMetadataUtil.isWildcardGroup(group, brokerController)) {
123+
lmqNameNew = Collections.singleton(mockLmqNameForWildcardGroup(topic, group));
124+
markWildcardGroup(topic, group);
125+
} else {
126+
lmqNameNew = lmqNameAll.stream()
127+
.filter(lmqName -> liteLifecycleManager.isSubscriptionActive(topic, lmqName))
128+
.collect(Collectors.toSet());
129+
}
112130

113131
LiteSubscription thisSub = getOrCreateLiteSubscription(clientId, group, topic);
114132
Set<String> lmqNamePrev = thisSub.getLiteTopicSet();
@@ -150,9 +168,54 @@ public void addListener(LiteCtlListener listener) {
150168
listeners.add(listener);
151169
}
152170

171+
/**
172+
* Get all subscribers for a specific LMQ, with optional group filtering.
173+
* This method returns different types based on the subscription scenario:
174+
* 1. When there's only one subscriber, return List<ClientGroup>
175+
* 2. When group is specified, return List<ClientGroup> containing subscribers of that group
176+
* 3. When group is null and multiple groups exist, return Map<String, List<ClientGroup>>
177+
* mapping each group to its subscribers
178+
*/
179+
@Override
180+
public SubscriberWrapper getAllSubscriber(String group, String lmqName) {
181+
String topic = LiteUtil.getParentTopic(lmqName);
182+
183+
if (group != null) {
184+
if (LiteMetadataUtil.isWildcardGroup(group, brokerController)) {
185+
return getWildcardSubscriber(group, topic);
186+
}
187+
SubscriberWrapper.ListWrapper wrapper = new SubscriberWrapper.ListWrapper();
188+
Set<ClientGroup> subscribers = liteTopic2Group.get(lmqName);
189+
if (subscribers != null) {
190+
wrapper.getClients().addAll(subscribers.stream()
191+
.filter(clientGroup -> group.equals(clientGroup.group))
192+
.collect(Collectors.toSet()));
193+
}
194+
return wrapper;
195+
} else {
196+
SubscriberWrapper.MapWrapper wrapper = new SubscriberWrapper.MapWrapper();
197+
Set<ClientGroup> subscribers = liteTopic2Group.get(lmqName);
198+
if (subscribers != null) {
199+
for (ClientGroup clientGroup : subscribers) {
200+
wrapper.getGroupMap().computeIfAbsent(clientGroup.group, k -> new ArrayList<>()).add(clientGroup);
201+
}
202+
}
203+
Set<String> wildcardGroups = wildcardGroupMap.get(topic);
204+
if (wildcardGroups != null) {
205+
for (String wildcardGroup : wildcardGroups) {
206+
List<ClientGroup> wildcardClients = getWildcardGroupClients(topic, wildcardGroup);
207+
if (CollectionUtils.isNotEmpty(wildcardClients)) {
208+
wrapper.getGroupMap().putIfAbsent(wildcardGroup, wildcardClients);
209+
}
210+
}
211+
}
212+
return wrapper;
213+
}
214+
}
215+
153216
@Override
154-
public Set<ClientGroup> getSubscriber(String lmqName) {
155-
return liteTopic2Group.get(lmqName);
217+
public SubscriberWrapper.ListWrapper getWildcardSubscriber(String group, String topic) {
218+
return new SubscriberWrapper.ListWrapper(getWildcardGroupClients(topic, group));
156219
}
157220

158221
/**
@@ -186,6 +249,7 @@ protected void addTopicGroup(ClientGroup clientGroup, String lmqName) {
186249
.computeIfAbsent(lmqName, k -> ConcurrentHashMap.newKeySet());
187250
if (topicGroupSet.add(clientGroup)) {
188251
activeNum.incrementAndGet();
252+
invalidateWildcardCacheIfNecessary(clientGroup.group);
189253
for (LiteCtlListener listener : listeners) {
190254
listener.onRegister(clientGroup.clientId, clientGroup.group, lmqName);
191255
}
@@ -199,6 +263,7 @@ protected void removeTopicGroup(ClientGroup clientGroup, String lmqName, boolean
199263
}
200264
if (topicGroupSet.remove(clientGroup)) {
201265
activeNum.decrementAndGet();
266+
invalidateWildcardCacheIfNecessary(clientGroup.group);
202267
for (LiteCtlListener listener : listeners) {
203268
listener.onUnregister(clientGroup.clientId, clientGroup.group, lmqName);
204269
}
@@ -209,6 +274,7 @@ protected void removeTopicGroup(ClientGroup clientGroup, String lmqName, boolean
209274
}
210275
if (topicGroupSet.isEmpty()) {
211276
liteTopic2Group.remove(lmqName);
277+
unmarkWildcardGroupIfNecessary(lmqName);
212278
}
213279
}
214280

@@ -228,6 +294,10 @@ protected void excludeClientByLmqName(String newClientId, String group, String l
228294
LiteSubscription liteSubscription = client2Subscription.get(clientGroup.clientId);
229295
if (liteSubscription != null) {
230296
liteSubscription.removeLiteTopic(lmqName);
297+
// remove client if no more liteTopic
298+
if (liteSubscription.getLiteTopicSet().isEmpty()) {
299+
client2Subscription.remove(clientGroup.clientId);
300+
}
231301
}
232302
notifyUnsubscribeLite(clientGroup.clientId, clientGroup.group, lmqName);
233303
boolean resetOffset = LiteMetadataUtil.isResetOffsetInExclusiveMode(group, brokerController);
@@ -240,7 +310,7 @@ protected void excludeClientByLmqName(String newClientId, String group, String l
240310
/**
241311
* Notify the client to remove the liteTopic subscription from its local memory
242312
*/
243-
private void notifyUnsubscribeLite(String clientId, String group, String lmqName) {
313+
protected void notifyUnsubscribeLite(String clientId, String group, String lmqName) {
244314
String topic = LiteUtil.getParentTopic(lmqName);
245315
String liteTopic = LiteUtil.getLiteTopic(lmqName);
246316
Channel channel = clientChannels.get(clientId);
@@ -318,6 +388,45 @@ private LiteSubscription getOrCreateLiteSubscription(String clientId, String gro
318388
return curLiteSubscription;
319389
}
320390

391+
private void invalidateWildcardCacheIfNecessary(String group) {
392+
if (LiteMetadataUtil.isWildcardGroup(group, brokerController)) {
393+
wildcardClientCache.invalidate(group);
394+
}
395+
}
396+
397+
private void markWildcardGroup(String topic, String group) {
398+
wildcardGroupMap.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet()).add(group);
399+
}
400+
401+
private void unmarkWildcardGroupIfNecessary(String lmqName) {
402+
if (!LiteUtil.isLiteTopicQueue(lmqName)) { // must be topic@group
403+
String[] topicAtGroup = StringUtils.split(lmqName);
404+
if (null == topicAtGroup || topicAtGroup.length != 2) {
405+
return;
406+
}
407+
wildcardGroupMap.computeIfPresent(topicAtGroup[0], (k, v) -> {
408+
v.remove(topicAtGroup[1]);
409+
return v.isEmpty() ? null : v;
410+
});
411+
}
412+
}
413+
414+
private String mockLmqNameForWildcardGroup(String topic, String group) {
415+
return topic + "@" + group;
416+
}
417+
418+
private List<ClientGroup> getWildcardGroupClients(String topic, String group) {
419+
List<ClientGroup> list = null;
420+
try {
421+
list = wildcardClientCache.get(group, () -> {
422+
Set<ClientGroup> clientSet = liteTopic2Group.get(mockLmqNameForWildcardGroup(topic, group));
423+
return clientSet != null ? new ArrayList<>(clientSet) : Collections.emptyList();
424+
});
425+
} catch (ExecutionException ignored) {
426+
}
427+
return list;
428+
}
429+
321430
@Override
322431
public void run() {
323432
LOGGER.info("Start checking lite subscription.");

0 commit comments

Comments
 (0)