@@ -105,7 +105,7 @@ public abstract class CloudSolrClient extends SolrClient {
105105 private final boolean directUpdatesToLeadersOnly ;
106106 private final RequestReplicaListTransformerGenerator requestRLTGenerator ;
107107 private final boolean parallelUpdates ;
108- private ExecutorService threadPool =
108+ private final ExecutorService threadPool =
109109 ExecutorUtil .newMDCAwareCachedThreadPool (
110110 new SolrNamedThreadFactory ("CloudSolrClient ThreadPool" ));
111111
@@ -642,9 +642,8 @@ protected boolean wasCommError(Throwable t) {
642642 public void close () {
643643 closed = true ;
644644 collectionRefreshes .clear ();
645- if (this . threadPool != null && !ExecutorUtil .isShutdown (this .threadPool )) {
645+ if (!ExecutorUtil .isShutdown (this .threadPool )) {
646646 ExecutorUtil .shutdownAndAwaitTermination (this .threadPool );
647- this .threadPool = null ;
648647 }
649648 }
650649
@@ -1658,41 +1657,34 @@ protected DocCollection getDocCollection(String collection, Integer expectedVers
16581657 }
16591658
16601659 private CompletableFuture <DocCollection > triggerCollectionRefresh (String collection ) {
1661- if (closed ) {
1662- ExpiringCachedDocCollection cacheEntry = collectionStateCache .peek (collection );
1663- DocCollection cached = cacheEntry == null ? null : cacheEntry .cached ;
1664- return CompletableFuture .completedFuture (cached );
1665- }
1666- return collectionRefreshes .computeIfAbsent (
1660+ return collectionRefreshes .compute (
16671661 collection ,
1668- key -> {
1669- ExecutorService executor = threadPool ;
1670- CompletableFuture <DocCollection > future ;
1671- if (executor == null || ExecutorUtil .isShutdown (executor )) {
1672- future = new CompletableFuture <>();
1673- try {
1674- future .complete (loadDocCollection (key ));
1675- } catch (Throwable t ) {
1676- future .completeExceptionally (t );
1677- }
1662+ (key , existingFuture ) -> {
1663+ // A refresh is still in progress; return it.
1664+ if (existingFuture != null && !existingFuture .isDone ()) {
1665+ return existingFuture ;
1666+ }
1667+ // No refresh is in-progress, so trigger it.
1668+
1669+ if (ExecutorUtil .isShutdown (threadPool )) {
1670+ assert closed ; // see close() for the sequence
1671+ ExpiringCachedDocCollection cacheEntry = collectionStateCache .peek (key );
1672+ DocCollection cached = cacheEntry == null ? null : cacheEntry .cached ;
1673+ return CompletableFuture .completedFuture (cached );
16781674 } else {
1679- future =
1680- CompletableFuture .supplyAsync (
1681- () -> {
1682- stateRefreshSemaphore .acquireUninterruptibly ();
1683- try {
1684- return loadDocCollection (key );
1685- } finally {
1686- stateRefreshSemaphore .release ();
1687- }
1688- },
1689- executor );
1675+ return CompletableFuture .supplyAsync (
1676+ () -> {
1677+ stateRefreshSemaphore .acquireUninterruptibly ();
1678+ try {
1679+ return loadDocCollection (key );
1680+ } finally {
1681+ stateRefreshSemaphore .release ();
1682+ // Remove the entry in case of many collections
1683+ collectionRefreshes .remove (key );
1684+ }
1685+ },
1686+ threadPool );
16901687 }
1691- future .whenCompleteAsync (
1692- (result , error ) -> {
1693- collectionRefreshes .remove (key , future );
1694- });
1695- return future ;
16961688 });
16971689 }
16981690
0 commit comments