diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index bd19a8e860255..2512028b381df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -117,7 +117,7 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc completableFuture.complete(null); } else { pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames - .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()) + .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId())) .thenRun(() -> internalPinnedExecutor.execute(() -> { final Semaphore tcLoadSemaphore = this.tcLoadSemaphores .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 32fe4ca14493f..a92acf2177e9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -43,9 +43,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -1225,35 +1223,6 @@ public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName())); } - /** - * @deprecated This method is only used in test now. - */ - @Deprecated - public boolean isServiceUnitActive(TopicName topicName) { - try { - return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig() - .getMetadataStoreOperationTimeoutSeconds(), SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e); - throw new RuntimeException(e); - } - } - - public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) { - // TODO: Add unit tests cover it. - if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { - return getBundleAsync(topicName) - .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); - } - return getBundleAsync(topicName).thenCompose(bundle -> { - Optional> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle); - if (optionalFuture.isEmpty()) { - return CompletableFuture.completedFuture(false); - } - return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive()); - }); - } - private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { // TODO: Add unit tests cover it. if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index e8253771eded4..8f32963759431 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -762,7 +762,7 @@ public CompletableFuture> addProducer(Producer producer, CompletableFuture producerQueuedFuture) { checkArgument(producer.getTopic() == this); - return brokerService.checkTopicNsOwnership(getName()) + return brokerService.checkTopicNsOwnership(TopicName.get(topic)) .thenCompose(__ -> incrementTopicEpochIfNeeded(producer, producerQueuedFuture)) .thenCompose(producerEpoch -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f67a16ed8da7e..769234d054061 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -81,7 +81,6 @@ import java.util.function.Consumer; import java.util.function.Predicate; import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -196,6 +195,7 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1082,13 +1082,18 @@ public CompletableFuture> getTopic(final String topic, boolean c * completes exceptionally with NotAllowedException if validation fails */ private CompletableFuture validateTopicConsistency(TopicName topicName) { + final var partitionMetadataFuture = fetchPartitionedTopicMetadataAsync(topicName.isPartitioned() + ? TopicName.get(topicName.getPartitionedTopicName()) : topicName); + return validateTopicConsistency(topicName, partitionMetadataFuture); + } + + private CompletableFuture validateTopicConsistency( + TopicName topicName, CompletableFuture partitionedMetadataFuture) { if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { // Skip validation for heartbeat namespace. return CompletableFuture.completedFuture(null); } - TopicName baseTopicName = - topicName.isPartitioned() ? TopicName.get(topicName.getPartitionedTopicName()) : topicName; - return fetchPartitionedTopicMetadataAsync(baseTopicName) + return partitionedMetadataFuture .thenCompose(metadata -> { if (topicName.isPartitioned()) { if (metadata.partitions == 0) { @@ -1146,7 +1151,7 @@ private CompletableFuture validateTopicConsistency(TopicName topicName) { * @return CompletableFuture with an Optional of the topic if found or created, otherwise empty. */ public CompletableFuture> getTopic(final TopicName topicName, boolean createIfMissing, - Map properties) { + @Nullable Map properties) { try { // If topic future exists in the cache returned directly regardless of whether it fails or timeout. CompletableFuture> tp = topics.get(topicName.toString()); @@ -1162,24 +1167,17 @@ public CompletableFuture> getTopic(final TopicName topicName, bo return FutureUtil.failedFuture(new NotAllowedException( "Broker is unable to load persistent topic")); } + if (isTransactionInternalName(topicName)) { + String msg = String.format("Can not create transaction system topic %s", topicName); + log.warn(msg); + return CompletableFuture.failedFuture(new NotAllowedException(msg)); + } return checkNonPartitionedTopicExists(topicName).thenCompose(exists -> { if (!exists && !createIfMissing) { return CompletableFuture.completedFuture(Optional.empty()); } - // The topic level policies are not needed now, but the meaning of calling - // "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization. - return getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY) - .exceptionally(ex -> { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - final String errorInfo = String.format("Topic creation encountered an exception by initialize" - + " topic policies service. topic_name=%s error_message=%s", topicName, - rc.getMessage()); - log.error(errorInfo, rc); - throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenCompose(optionalTopicPolicies -> { - return topics.computeIfAbsent(topicName.toString(), - (tpName) -> loadOrCreatePersistentTopic(tpName, createIfMissing, properties)); - }); + return topics.computeIfAbsent(topicName.toString(), __ -> + loadOrCreatePersistentTopic(topicName, createIfMissing, properties)); }); } else { if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { @@ -1325,11 +1323,13 @@ public void deleteLedgerFailed(ManagedLedgerException exception, } public CompletableFuture getManagedLedgerFactoryForTopic(TopicName topicName) { - return getManagedLedgerConfig(topicName) - .thenApply(config -> { - String storageClassName = config.getStorageClassName(); - return getManagedLedgerFactoryForTopic(topicName, storageClassName); - }); + return getManagedLedgerFactoryForTopic(topicName, getManagedLedgerConfig(topicName)); + } + + private CompletableFuture getManagedLedgerFactoryForTopic( + TopicName topicName, CompletableFuture mlConfigFuture) { + return mlConfigFuture.thenApply(config -> getManagedLedgerFactoryForTopic(topicName, + config.getStorageClassName())); } public ManagedLedgerFactory getManagedLedgerFactoryForTopic(TopicName topicName, String storageClassName) { @@ -1391,8 +1391,9 @@ private CompletableFuture> createNonPersistentTopic(String topic topicFuture.completeExceptionally(e); return topicFuture; } - checkTopicNsOwnership(topic) - .thenCompose((__) -> validateTopicConsistency(TopicName.get(topic))) + final var topicName = TopicName.get(topic); + checkTopicNsOwnership(topicName) + .thenCompose((__) -> validateTopicConsistency(topicName)) .thenRun(() -> { nonPersistentTopic.initialize() .thenCompose(__ -> nonPersistentTopic.checkReplication()) @@ -1651,13 +1652,9 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c /** * It creates a topic async and returns CompletableFuture. It also throttles down configured max-concurrent topic * loading and puts them into queue once in-process topics are created. - * - * @param topic persistent-topic name - * @return CompletableFuture - * @throws RuntimeException */ - protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, - boolean createIfMissing, Map properties) { + protected CompletableFuture> loadOrCreatePersistentTopic(TopicName topicName, + boolean createIfMissing, @Nullable Map properties) { final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); @@ -1667,54 +1664,47 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return null; }); - checkTopicNsOwnership(topic) - .thenRun(() -> { - final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); - - if (topicLoadSemaphore.tryAcquire()) { - checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, - properties); - topicFuture.handle((persistentTopic, ex) -> { - // release permit and process pending topic - topicLoadSemaphore.release(); - // do not recreate topic if topic is already migrated and deleted by broker - // so, avoid creating a new topic if migration is already started - if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex.getCause()); - return null; - } - createPendingLoadTopic(); - return null; - }); - } else { - pendingTopicLoadingQueue.add(new TopicLoadingContext(topic, - createIfMissing, topicFuture, properties)); - if (log.isDebugEnabled()) { - log.debug("topic-loading for {} added into pending queue", topic); - } - } - }).exceptionally(ex -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex.getCause()); + final var topic = topicName.toString(); + final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); + + if (topicLoadSemaphore.tryAcquire()) { + checkOwnershipAndCreatePersistentTopic(topicName, createIfMissing, topicFuture, + properties, false); + topicFuture.handle((persistentTopic, ex) -> { + // release permit and process pending topic + topicLoadSemaphore.release(); + // do not recreate topic if topic is already migrated and deleted by broker + // so, avoid creating a new topic if migration is already started + if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { + failTopicFuture(topic, topicFuture, ex.getCause()); return null; - }); + } + createPendingLoadTopic(); + return null; + }); + } else { + pendingTopicLoadingQueue.add(new TopicLoadingContext(topicName, + createIfMissing, topicFuture, properties)); + if (log.isDebugEnabled()) { + log.debug("topic-loading for {} added into pending queue", topic); + } + } return topicFuture; } @VisibleForTesting - protected CompletableFuture> fetchTopicPropertiesAsync(TopicName topicName) { + protected CompletableFuture> fetchTopicPropertiesAsync( + TopicName topicName, CompletableFuture mlConfigFuture, + CompletableFuture partitionMetadataFuture) { if (!topicName.isPartitioned()) { - return getManagedLedgerFactoryForTopic(topicName).thenCompose( - managedLedgerFactory -> managedLedgerFactory.getManagedLedgerPropertiesAsync( - topicName.getPersistenceNamingEncoding())); + return getManagedLedgerFactoryForTopic(topicName, mlConfigFuture).thenCompose(managedLedgerFactory -> + managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding())); } else { - TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); - return fetchPartitionedTopicMetadataAsync(partitionedTopicName) + return partitionMetadataFuture .thenCompose(metadata -> { if (metadata.partitions == PartitionedTopicMetadata.NON_PARTITIONED) { - return getManagedLedgerFactoryForTopic(topicName).thenCompose( + return getManagedLedgerFactoryForTopic(topicName, mlConfigFuture).thenCompose( managedLedgerFactory -> managedLedgerFactory.getManagedLedgerPropertiesAsync( topicName.getPersistenceNamingEncoding())); } else { @@ -1732,68 +1722,55 @@ protected CompletableFuture> fetchTopicPropertiesAsync(Topic } } - private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, - CompletableFuture> topicFuture, - Map properties) { - TopicName topicName = TopicName.get(topic); - pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) - .thenAccept(isActive -> { - if (isActive) { - CompletableFuture> propertiesFuture; - if (properties == null) { - //Read properties from storage when loading topic. - propertiesFuture = fetchTopicPropertiesAsync(topicName); - } else { - propertiesFuture = CompletableFuture.completedFuture(properties); - } - propertiesFuture.thenAccept(finalProperties -> - //TODO add topicName in properties? - createPersistentTopic0(topic, createIfMissing, topicFuture, - finalProperties) - ).exceptionally(throwable -> { - log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(throwable); - return null; - }); - } else { - // namespace is being unloaded - String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - }).exceptionally(ex -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex); - return null; - }); + private void checkOwnershipAndCreatePersistentTopic(TopicName topicName, boolean createIfMissing, + CompletableFuture> topicFuture, + @Nullable Map properties, + boolean fromPendingLoadTopic) { + final var topic = topicName.toString(); + final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + checkTopicNsOwnership(topicName).thenRun(() -> + createPersistentTopic0(topicName, createIfMissing, topicFuture, properties, topicCreateTimeMs) + ).exceptionally(ex -> { + if (fromPendingLoadTopic) { + inactivityMonitor.schedule(this::createPendingLoadTopic, 100, MILLISECONDS); + } + failTopicFuture(topic, topicFuture, ex); + return null; + }); } @VisibleForTesting - public void createPersistentTopic0(final String topic, boolean createIfMissing, + public void createPersistentTopic0(TopicName topicName, boolean createIfMissing, CompletableFuture> topicFuture, - Map properties) { - TopicName topicName = TopicName.get(topic); - final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - - if (isTransactionInternalName(topicName)) { - String msg = String.format("Can not create transaction system topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new NotAllowedException(msg)); - return; - } - - CompletableFuture maxTopicsCheck = createIfMissing - ? checkMaxTopicsPerNamespace(topicName) - : CompletableFuture.completedFuture(null); - - CompletableFuture isTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName); - maxTopicsCheck.thenCompose(partitionedTopicMetadata -> validateTopicConsistency(topicName)) - .thenCompose(__ -> isTopicAlreadyMigrated) - .thenCompose(__ -> getManagedLedgerConfig(topicName)) - .thenAccept(managedLedgerConfig -> { + @Nullable Map originalProperties, + long topicCreateTimeMs) { + final var beforeGetManagedLedgerConfig = System.currentTimeMillis(); + final var mlConfigFuture = getManagedLedgerConfig(topicName); + mlConfigFuture.thenRun(() -> { + // Log the latency specially for getManagedLedgerConfig() because it needs to load the topic policies, + // which could be a time-consuming task with no metrics yet. + final var latencyMs = System.currentTimeMillis() - beforeGetManagedLedgerConfig; + log.info("Got managed ledger config for {} after {} ms", topicName, latencyMs); + }); + final var partitionedMetadataFuture = fetchPartitionedTopicMetadataAsync(topicName.isPartitioned() + ? TopicName.get(topicName.getPartitionedTopicName()) : topicName); + final CompletableFuture> propertiesFuture; + if (originalProperties == null) { + //Read properties from storage when loading topic. + propertiesFuture = fetchTopicPropertiesAsync(topicName, mlConfigFuture, partitionedMetadataFuture); + } else { + propertiesFuture = CompletableFuture.completedFuture(originalProperties); + } + final var topic = topicName.toString(); + + // The validations can be performed concurrently + final var validateFuture = CompletableFuture.allOf(mlConfigFuture, propertiesFuture, partitionedMetadataFuture, + createIfMissing ? checkMaxTopicsPerNamespace(topicName) : CompletableFuture.completedFuture(null), + checkTopicAlreadyMigrated(topicName), + validateTopicConsistency(topicName, partitionedMetadataFuture)); + validateFuture.thenRun(() -> { + final var managedLedgerConfig = mlConfigFuture.join(); + final var properties = propertiesFuture.join(); if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { // init managedLedger interceptor Set interceptors = new HashSet<>(); @@ -1887,20 +1864,18 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " Removing topic from topics list {}, {}", topic, ex); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, closeEx) -> { - topics.remove(topic, topicFuture); if (closeEx != null) { log.warn("[{}] Get an error when closing topic.", topic, closeEx); } - topicFuture.completeExceptionally(ex); + failTopicFuture(topic, topicFuture, ex); }); }); return null; }); } catch (Exception e) { log.warn("Failed to create topic {}: {}", topic, e.getMessage()); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(e); + failTopicFuture(topic, topicFuture, e); } } @@ -1913,8 +1888,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new PersistenceException(exception)); + failTopicFuture(topic, topicFuture, new PersistenceException(exception)); } } }, () -> isTopicNsOwnedByBrokerAsync(topicName), null); @@ -1924,11 +1898,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { String msg = migrationFailure ? "Topic is already migrated" : "Failed to get topic configuration:"; log.warn("[{}] {} {}", topic, msg, exception.getMessage(), exception); - // remove topic from topics-map in different thread to avoid possible deadlock if - // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(exception); - return null; + return failTopicFuture(topic, topicFuture, exception); }); } @@ -2363,8 +2333,7 @@ public CompletableFuture isTopicNsOwnedByBrokerAsync(TopicName topicNam }); } - public CompletableFuture checkTopicNsOwnership(final String topic) { - TopicName topicName = TopicName.get(topic); + public CompletableFuture checkTopicNsOwnership(TopicName topicName) { final var namespaceService = pulsar.getNamespaceService(); return namespaceService.getBundleAsync(topicName).thenCompose(bundle -> @@ -2373,7 +2342,7 @@ public CompletableFuture checkTopicNsOwnership(final String topic) { return CompletableFuture.completedFuture(null); } else { String msg = String.format("Namespace bundle (%s) for topic (%s) not served by this instance:" - + "%s. Please redo the lookup.", bundle, topic, pulsar.getBrokerId()); + + "%s. Please redo the lookup.", bundle, topicName, pulsar.getBrokerId()); log.warn(msg); return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg)); } @@ -3255,29 +3224,18 @@ private void createPendingLoadTopic() { return; } - final String topic = pendingTopic.getTopic(); - checkTopicNsOwnership(topic).thenRun(() -> { - CompletableFuture> pendingFuture = pendingTopic.getTopicFuture(); - final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); - final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - checkOwnershipAndCreatePersistentTopic(topic, - pendingTopic.isCreateIfMissing(), - pendingFuture, - pendingTopic.getProperties()); - pendingFuture.handle((persistentTopic, ex) -> { - // release permit and process next pending topic - if (acquiredPermit) { - topicLoadSemaphore.release(); - } - createPendingLoadTopic(); - return null; - }); - }).exceptionally(e -> { - log.error("Failed to create pending topic {}", topic, e); - pendingTopic.getTopicFuture() - .completeExceptionally((e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e); - // schedule to process next pending topic - inactivityMonitor.schedule(this::createPendingLoadTopic, 100, MILLISECONDS); + final var topicName = pendingTopic.topicName; + final var pendingFuture = pendingTopic.topicFuture; + final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); + final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); + checkOwnershipAndCreatePersistentTopic(topicName, pendingTopic.createIfMissing, pendingFuture, + pendingTopic.properties, true); + pendingFuture.handle((persistentTopic, ex) -> { + // release permit and process next pending topic + if (acquiredPermit) { + topicLoadSemaphore.release(); + } + createPendingLoadTopic(); return null; }); } @@ -3830,12 +3788,22 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory this.pulsarChannelInitFactory = factory; } - @AllArgsConstructor - @Getter - private static class TopicLoadingContext { - private final String topic; - private final boolean createIfMissing; - private final CompletableFuture> topicFuture; - private final Map properties; + private Void failTopicFuture(String topic, CompletableFuture> topicFuture, Throwable throwable) { + // remove topic from topics-map in different thread to avoid possible deadlock if + // createPersistentTopic-thread only tries to handle this future-result + pulsar.getExecutor().execute(() -> { + if (topics.remove(topic, topicFuture)) { + log.info("Removed cached topic {} for failure {}", topic, throwable.getMessage()); + } else { + log.warn("Cached failed topic {} was not removed because it's outdated (failure: {})", topic, + throwable.getMessage()); + } + }); + topicFuture.completeExceptionally(throwable); + return null; + } + + private record TopicLoadingContext(TopicName topicName, boolean createIfMissing, + CompletableFuture> topicFuture, Map properties) { } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 6021c41142a5e..f219663bc2abe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -284,7 +284,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St Map subscriptionProperties, SchemaType schemaType) { - return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { + return brokerService.checkTopicNsOwnership(TopicName.get(topic)).thenCompose(__ -> { final CompletableFuture future = new CompletableFuture<>(); if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4d29252eafda1..1510b0d95b305 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -928,7 +928,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St "readCompacted only allowed on failover or exclusive subscriptions")); } - return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { + return brokerService.checkTopicNsOwnership(TopicName.get(topic)).thenCompose(__ -> { Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; if (replicatedSubscriptionState != null && replicatedSubscriptionState && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 1c2f2215a3e16..1db89fbff6670 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3746,6 +3746,7 @@ public void testGetStatsIfPartitionNotExists() throws Exception { final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp"); admin.topics().createPartitionedTopic(partitionedTp, 1); TopicName partition0 = TopicName.get(partitionedTp).getPartition(0); + admin.lookups().lookupTopic(partition0.toString()); // trigger loading namespace bundles boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent(); assertTrue(topicExists1); // Verify topics-stats works. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index c73acfe9ee81f..695c4b99f1747 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1183,7 +1183,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception { // try to create topic which should fail as bundle is disable CompletableFuture> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName, true, null); + .loadOrCreatePersistentTopic(topic, true, null); try { futureResult.get(); @@ -1227,7 +1227,7 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex for (int i = 0; i < 10; i++) { // try to create topic which should fail as bundle is disable CompletableFuture> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName + "_" + i, false, null); + .loadOrCreatePersistentTopic(TopicName.get(topicName + "_" + i), false, null); loadFutures.add(futureResult); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 96ca2d90f0613..37cf75d84ca6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -79,7 +79,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -162,7 +161,6 @@ public void setup() throws Exception { NamespaceService nsSvc = pulsarTestContext.getPulsarService().getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); - doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index 20f58f277a39c..2f8a924635116 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -51,7 +51,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +102,6 @@ public void setup(Method m) throws Exception { NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); - doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index ca8f762adc445..f021b4a4519ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -224,8 +224,7 @@ public void setup() throws Exception { NamespaceBundle bundle = mock(NamespaceBundle.class); doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any()); doReturn(true).when(nsSvc).isServiceUnitOwned(any()); - doReturn(true).when(nsSvc).isServiceUnitActive(any()); - doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any()); + doReturn(CompletableFuture.completedFuture(null)).when(brokerService).checkTopicNsOwnership(any()); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index bba8f28675533..2e1eeecd6444b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -26,7 +26,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.CALLS_REAL_METHODS; @@ -231,8 +230,7 @@ public void setup() throws Exception { .getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(), any()); doReturn(true).when(namespaceService).isServiceUnitOwned(any()); - doReturn(true).when(namespaceService).isServiceUnitActive(any()); - doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any()); + doReturn(CompletableFuture.completedFuture(null)).when(brokerService).checkTopicNsOwnership(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics( @@ -1583,8 +1581,8 @@ public void testProducerOnNotOwnedTopic() throws Exception { setChannelConnected(); // Force the case where the broker doesn't own any topic - doReturn(CompletableFuture.completedFuture(false)).when(namespaceService) - .isServiceUnitActiveAsync(any(TopicName.class)); + doReturn(CompletableFuture.failedFuture(new ServiceUnitNotReadyException("topic not owned"))) + .when(brokerService).checkTopicNsOwnership(any()); // test PRODUCER failure case ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */, @@ -3109,7 +3107,7 @@ public void testTopicIsNotReady() throws Exception { // Force the checkTopicNsOwnership method to throw ServiceUnitNotReadyException doReturn(FutureUtil.failedFuture(new ServiceUnitNotReadyException("Service unit is not ready"))) - .when(brokerService).checkTopicNsOwnership(anyString()); + .when(brokerService).checkTopicNsOwnership(any()); // 2nd subscribe command when the service unit is not ready ByteBuf clientCommand2 = Commands.newSubscribe(successTopicName, successSubName, 2 /* consumer id */, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java index 46e41be012511..9a82681c60952 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java @@ -29,6 +29,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.BookKeeperClientFactory; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -48,6 +49,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -139,7 +141,9 @@ static class TestBrokerService extends BrokerService { } @Override - protected CompletableFuture> fetchTopicPropertiesAsync(TopicName topicName) { + protected CompletableFuture> fetchTopicPropertiesAsync( + TopicName topicName, CompletableFuture mlConfigFuture, + CompletableFuture partitionedMetadataFuture) { return CompletableFuture.completedFuture(Collections.emptyMap()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 93654db2c9992..bc09266d272c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -178,7 +178,8 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), Mockito.eq(PersistentTopic.class)); - brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); + brokerService.createPersistentTopic0(TopicName.get(topic), true, new CompletableFuture<>(), + Collections.emptyMap(), System.currentTimeMillis()); Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); PersistentTopic persistentTopic = reference.get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java index 508423adce4d8..4e7e99c00eb5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java @@ -22,9 +22,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; import java.io.IOException; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,6 +37,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -69,7 +68,7 @@ protected void cleanup() throws Exception { } @Test - public void testNoOrphanClosedTopicIfTxnInternalFailed() { + public void testNoOrphanClosedTopicIfTxnInternalFailed() throws PulsarAdminException { String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); BrokerService brokerService = pulsar.getBrokerService(); @@ -89,13 +88,7 @@ public void testNoOrphanClosedTopicIfTxnInternalFailed() { pulsar.setTransactionBufferProvider(mockTransactionBufferProvider); // 2. Trigger create topic and assert topic load success. - CompletableFuture> firstLoad = brokerService.getTopic(tpName, true); - Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) - .pollInterval(200, TimeUnit.MILLISECONDS) - .untilAsserted(() -> { - assertTrue(firstLoad.isDone()); - assertFalse(firstLoad.isCompletedExceptionally()); - }); + admin.topics().createNonPartitionedTopic(tpName); // 3. Assert topic removed from cache Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index b7c323af5bcd4..3613ba516254c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -245,7 +245,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { admin.topics().createNonPartitionedTopic(tpName); admin.namespaces().unload(ns); - // Inject an error when calling "NamespaceService.isServiceUnitActiveAsync". + // Inject an error when loading the topic AtomicInteger failedTimes = new AtomicInteger(); NamespaceService namespaceService = pulsar.getNamespaceService(); doAnswer(invocation -> { @@ -258,7 +258,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { return CompletableFuture.failedFuture(new RuntimeException("mocked error")); } return invocation.callRealMethod(); - }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + }).when(namespaceService).checkBundleOwnership(any(TopicName.class), any()); // Verify: the consumer can create successfully eventually. Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe(); @@ -295,7 +295,7 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding()); } return invocation.callRealMethod(); - }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + }).when(namespaceService).checkBundleOwnership(any(TopicName.class), any()); // Verify: the consumer create failed due to pulsar does not allow to create topic automatically. try {