Skip to content

Commit f8a5806

Browse files
committed
[fix][broker] Check replication cluster before starting the replicator
Fixes #20010 ### Motivation `PersistentTopicTest.testCreateTopicWithZombieReplicatorCursor` is flaky because the cursor could still be created again in `startReplicator`, which could be called by: ``` onPoliciesUpdate checkReplicationAndRetryOnFailure checkReplication ``` Sometimes the policies update might fail because the topic might be deleted in `PersistentTopic#checkReplication`: > Deleting topic [xxx] because local cluster is not part of global namespace repl list [remote] ### Modifications - Call `checkReplicationCluster` before calling `startReplicator`. - Add the local cluster to the replication cluster list - Sleep for a while in the test to reduce the flakiness caused by the asynchronous update of the policies
1 parent 42a6969 commit f8a5806

2 files changed

Lines changed: 29 additions & 8 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,13 @@ public CompletableFuture<Void> checkReplication() {
15421542
continue;
15431543
}
15441544
if (!replicators.containsKey(cluster)) {
1545-
futures.add(startReplicator(cluster));
1545+
futures.add(checkReplicationCluster(cluster).thenCompose(clusterExists -> {
1546+
if (clusterExists) {
1547+
return startReplicator(cluster);
1548+
} else {
1549+
return CompletableFuture.completedFuture(null);
1550+
}
1551+
}));
15461552
}
15471553
}
15481554

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import java.io.ByteArrayOutputStream;
3939
import java.lang.reflect.Field;
4040
import java.nio.charset.StandardCharsets;
41+
import java.time.Duration;
4142
import java.util.ArrayList;
43+
import java.util.Arrays;
4244
import java.util.Collection;
4345
import java.util.Collections;
4446
import java.util.HashSet;
@@ -52,6 +54,7 @@
5254
import java.util.concurrent.atomic.AtomicBoolean;
5355
import java.util.function.Supplier;
5456
import lombok.Cleanup;
57+
import lombok.extern.slf4j.Slf4j;
5558
import org.apache.bookkeeper.client.LedgerHandle;
5659
import org.apache.bookkeeper.mledger.ManagedCursor;
5760
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -83,6 +86,7 @@
8386
import org.testng.annotations.DataProvider;
8487
import org.testng.annotations.Test;
8588

89+
@Slf4j
8690
@Test(groups = "broker")
8791
public class PersistentTopicTest extends BrokerTestBase {
8892

@@ -558,10 +562,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
558562
admin.tenants().updateTenant("prop", tenantInfo);
559563

560564
if (topicLevelPolicy) {
561-
admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
565+
admin.topics().setReplicationClusters(topicName, Arrays.asList("test", remoteCluster));
562566
} else {
563567
admin.namespaces().setNamespaceReplicationClustersAsync(
564-
namespace, Collections.singleton(remoteCluster)).get();
568+
namespace, Sets.newHashSet("test", remoteCluster)).get();
565569
}
566570

567571
final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
@@ -576,16 +580,27 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
576580
};
577581
assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
578582

583+
// PersistentTopics#onPoliciesUpdate might happen in different threads, so there might be a race between two
584+
// updates of the replication clusters. So here we sleep for a while to reduce the flakiness.
585+
Thread.sleep(100);
586+
587+
// Configure the local cluster to avoid the topic being deleted in PersistentTopics#checkReplication
579588
if (topicLevelPolicy) {
580-
admin.topics().setReplicationClusters(topicName, Collections.emptyList());
589+
admin.topics().setReplicationClusters(topicName, Collections.singletonList("test"));
581590
} else {
582-
admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get();
591+
admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.singleton("test")).get();
583592
}
584593
admin.clusters().deleteCluster(remoteCluster);
585594
// Now the cluster and its related policy has been removed but the replicator cursor still exists
586595

587-
topic.initialize().get(3, TimeUnit.SECONDS);
588-
Awaitility.await().atMost(3, TimeUnit.SECONDS)
589-
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
596+
Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
597+
log.info("Before initialize...");
598+
try {
599+
topic.initialize().get(3, TimeUnit.SECONDS);
600+
} catch (ExecutionException e) {
601+
log.warn("Failed to initialize: {}", e.getCause().getMessage());
602+
}
603+
return !topic.getManagedLedger().getCursors().iterator().hasNext();
604+
});
590605
}
591606
}

0 commit comments

Comments
 (0)