Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1706,37 +1706,55 @@ private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster)

protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> checkReplicationCluster(remoteCluster))
.thenCompose(clusterExists -> {
if (!clusterExists) {
log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
return removeReplicator(remoteCluster).thenApply(__ -> null);
}
return brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData));
})
.thenAccept(replicationClient -> {
if (replicationClient == null) {
return;
CompletableFuture<Void> replicationStartFuture = new CompletableFuture<>();
AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
}
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
return null;
});
// clean up replicator if startup is failed
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
}
}).whenComplete((ignore, ex) -> {
if (ex == null){
replicationStartFuture.complete(null);
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to print the ex warning here.

checkReplicationCluster(remoteCluster).thenCompose(clusterExists -> {
if (!clusterExists) {
log.warn("[{}] Start remove the replicator because the cluster '{}' does not exist",
topic, remoteCluster);
return removeReplicator(remoteCluster).whenComplete((ignore2, removeCursorEx) -> {
if (removeCursorEx != null) {
log.error("[{}] Remove the cursor of replicator[{}] is failed, please reload topic."
, topic, remoteCluster);
replicationStartFuture.failedFuture(removeCursorEx);
} else {
log.warn("[{}] Remove the cursor of replicator[{}] successfully",
topic, remoteCluster);
replicationStartFuture.complete(null);
}
});
} else {
// Start replication is failed.
log.error("[{}] The replicator startup failed {}", topic, remoteCluster, ex);
replicationStartFuture.completeExceptionally(ex);
return CompletableFuture.completedFuture(null);
}
return null;
});

// clean up replicator if startup is failed
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
}
});
}
});
return replicationStartFuture;
}

CompletableFuture<Void> removeReplicator(String remoteCluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -558,7 +559,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
admin.tenants().updateTenant("prop", tenantInfo);

if (topicLevelPolicy) {
admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
// setReplicationClusters may fail, so do retry.
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
});
} else {
admin.namespaces().setNamespaceReplicationClustersAsync(
namespace, Collections.singleton(remoteCluster)).get();
Expand All @@ -584,8 +588,13 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
admin.clusters().deleteCluster(remoteCluster);
// Now the cluster and its related policy has been removed but the replicator cursor still exists

topic.initialize().get(3, TimeUnit.SECONDS);
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
// Verify:
// 1. Topic can load success. If the topic loading by client is failed, it will retry,
// so we can do retry "initialize topic".
// 2. The repl cursor will be deleted.
Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
topic.initialize().get(3, TimeUnit.SECONDS);
assertFalse(topic.getManagedLedger().getCursors().iterator().hasNext());
});
}
}