Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tc
completableFuture.complete(null);
} else {
pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames
.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()))
.thenRun(() -> internalPinnedExecutor.execute(() -> {
final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
.computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -1225,35 +1223,6 @@ public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName)
new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()));
}

/**
* @deprecated This method is only used in test now.
*/
@Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
try {
return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e);
throw new RuntimeException(e);
}
}

public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
return getBundleAsync(topicName).thenCompose(bundle -> {
Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle);
if (optionalFuture.isEmpty()) {
return CompletableFuture.completedFuture(false);
}
return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive());
});
}

private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
checkArgument(producer.getTopic() == this);

return brokerService.checkTopicNsOwnership(getName())
return brokerService.checkTopicNsOwnership(TopicName.get(topic))
.thenCompose(__ ->
incrementTopicEpochIfNeeded(producer, producerQueuedFuture))
.thenCompose(producerEpoch -> {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
Map<String, String> subscriptionProperties,
SchemaType schemaType) {

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
return brokerService.checkTopicNsOwnership(TopicName.get(topic)).thenCompose(__ -> {
final CompletableFuture<Consumer> future = new CompletableFuture<>();

if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
"readCompacted only allowed on failover or exclusive subscriptions"));
}

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
return brokerService.checkTopicNsOwnership(TopicName.get(topic)).thenCompose(__ -> {
Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;
if (replicatedSubscriptionState != null && replicatedSubscriptionState
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3746,6 +3746,7 @@ public void testGetStatsIfPartitionNotExists() throws Exception {
final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
admin.topics().createPartitionedTopic(partitionedTp, 1);
TopicName partition0 = TopicName.get(partitionedTp).getPartition(0);
admin.lookups().lookupTopic(partition0.toString()); // trigger loading namespace bundles
boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertTrue(topicExists1);
// Verify topics-stats works.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {

// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName, true, null);
.loadOrCreatePersistentTopic(topic, true, null);

try {
futureResult.get();
Expand Down Expand Up @@ -1227,7 +1227,7 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex
for (int i = 0; i < 10; i++) {
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName + "_" + i, false, null);
.loadOrCreatePersistentTopic(TopicName.get(topicName + "_" + i), false, null);
loadFutures.add(futureResult);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
Expand Down Expand Up @@ -162,7 +161,6 @@ public void setup() throws Exception {

NamespaceService nsSvc = pulsarTestContext.getPulsarService().getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,7 +102,6 @@ public void setup(Method m) throws Exception {
NamespaceService nsSvc = mock(NamespaceService.class);
doReturn(nsSvc).when(pulsar).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ public void setup() throws Exception {
NamespaceBundle bundle = mock(NamespaceBundle.class);
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(true).when(nsSvc).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
doReturn(CompletableFuture.completedFuture(null)).when(brokerService).checkTopicNsOwnership(any());
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
Expand Down Expand Up @@ -231,8 +230,7 @@ public void setup() throws Exception {
.getBundleAsync(any());
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(), any());
doReturn(true).when(namespaceService).isServiceUnitOwned(any());
doReturn(true).when(namespaceService).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
doReturn(CompletableFuture.completedFuture(null)).when(brokerService).checkTopicNsOwnership(any());
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(
Expand Down Expand Up @@ -1583,8 +1581,8 @@ public void testProducerOnNotOwnedTopic() throws Exception {
setChannelConnected();

// Force the case where the broker doesn't own any topic
doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
.isServiceUnitActiveAsync(any(TopicName.class));
doReturn(CompletableFuture.failedFuture(new ServiceUnitNotReadyException("topic not owned")))
.when(brokerService).checkTopicNsOwnership(any());

// test PRODUCER failure case
ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */,
Expand Down Expand Up @@ -3109,7 +3107,7 @@ public void testTopicIsNotReady() throws Exception {

// Force the checkTopicNsOwnership method to throw ServiceUnitNotReadyException
doReturn(FutureUtil.failedFuture(new ServiceUnitNotReadyException("Service unit is not ready")))
.when(brokerService).checkTopicNsOwnership(anyString());
.when(brokerService).checkTopicNsOwnership(any());

// 2nd subscribe command when the service unit is not ready
ByteBuf clientCommand2 = Commands.newSubscribe(successTopicName, successSubName, 2 /* consumer id */,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -48,6 +49,7 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -139,7 +141,9 @@ static class TestBrokerService extends BrokerService {
}

@Override
protected CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(TopicName topicName) {
protected CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(
TopicName topicName, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
CompletableFuture<PartitionedTopicMetadata> partitionedMetadataFuture) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep
.newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService),
Mockito.eq(PersistentTopic.class));

brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap());
brokerService.createPersistentTopic0(TopicName.get(topic), true, new CompletableFuture<>(),
Collections.emptyMap(), System.currentTimeMillis());

Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null);
PersistentTopic persistentTopic = reference.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +37,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -69,7 +68,7 @@ protected void cleanup() throws Exception {
}

@Test
public void testNoOrphanClosedTopicIfTxnInternalFailed() {
public void testNoOrphanClosedTopicIfTxnInternalFailed() throws PulsarAdminException {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");

BrokerService brokerService = pulsar.getBrokerService();
Expand All @@ -89,13 +88,7 @@ public void testNoOrphanClosedTopicIfTxnInternalFailed() {
pulsar.setTransactionBufferProvider(mockTransactionBufferProvider);

// 2. Trigger create topic and assert topic load success.
CompletableFuture<Optional<Topic>> firstLoad = brokerService.getTopic(tpName, true);
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
.pollInterval(200, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertTrue(firstLoad.isDone());
assertFalse(firstLoad.isCompletedExceptionally());
});
admin.topics().createNonPartitionedTopic(tpName);

// 3. Assert topic removed from cache
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception {
admin.topics().createNonPartitionedTopic(tpName);
admin.namespaces().unload(ns);

// Inject an error when calling "NamespaceService.isServiceUnitActiveAsync".
// Inject an error when loading the topic
AtomicInteger failedTimes = new AtomicInteger();
NamespaceService namespaceService = pulsar.getNamespaceService();
doAnswer(invocation -> {
Expand All @@ -258,7 +258,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception {
return CompletableFuture.failedFuture(new RuntimeException("mocked error"));
}
return invocation.callRealMethod();
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
}).when(namespaceService).checkBundleOwnership(any(TopicName.class), any());

// Verify: the consumer can create successfully eventually.
Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();
Expand Down Expand Up @@ -295,7 +295,7 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex
pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
}
return invocation.callRealMethod();
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
}).when(namespaceService).checkBundleOwnership(any(TopicName.class), any());

// Verify: the consumer create failed due to pulsar does not allow to create topic automatically.
try {
Expand Down
Loading