diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java index e8e5cc277e3..a9ebd1d27de 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java @@ -29,6 +29,7 @@ import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.SimpleDecoratingClient; +import com.linecorp.armeria.client.retry.limiter.RetryLimiter; import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.Request; @@ -199,8 +200,9 @@ protected final boolean setResponseTimeout(ClientRequestContext ctx) { * {@code currentAttemptNo} exceeds the {@code maxAttempts} or the {@code nextDelay} is after * the moment which timeout happens. */ - protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff) { - return getNextDelay(ctx, backoff, -1); + protected final long getNextDelay(ClientRequestContext ctx, @Nullable RetryLimiter limiter, + Backoff backoff) { + return getNextDelay(ctx, limiter, backoff, -1); } /** @@ -214,7 +216,8 @@ protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff) { * the moment which timeout happens. */ @SuppressWarnings("MethodMayBeStatic") // Intentionally left non-static for better user experience. - protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff, long millisAfterFromServer) { + protected final long getNextDelay(ClientRequestContext ctx, @Nullable RetryLimiter limiter, + Backoff backoff, long millisAfterFromServer) { requireNonNull(ctx, "ctx"); requireNonNull(backoff, "backoff"); final State state = state(ctx); @@ -237,6 +240,9 @@ protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff, lon return -1; } + if (!RetryLimiterExecutor.shouldRetry(limiter, ctx, currentAttemptNo)) { + return -1; + } return nextDelay; } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfig.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfig.java index 31f7892815d..ae68f5bd368 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfig.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfig.java @@ -22,6 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.linecorp.armeria.client.retry.limiter.RetryLimiter; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.Response; import com.linecorp.armeria.common.RpcResponse; @@ -87,25 +88,30 @@ static RetryConfigBuilder builder0( private final RetryRule fromRetryRuleWithContent; @Nullable private RetryRuleWithContent fromRetryRule; + @Nullable + private RetryLimiter retryLimiter; - RetryConfig(RetryRule retryRule, int maxTotalAttempts, long responseTimeoutMillisForEachAttempt) { - this(requireNonNull(retryRule, "retryRule"), null, - maxTotalAttempts, responseTimeoutMillisForEachAttempt, 0); + RetryConfig(RetryRule retryRule, @Nullable RetryLimiter retryLimiter, int maxTotalAttempts, + long responseTimeoutMillisForEachAttempt) { + this(requireNonNull(retryRule, "retryRule"), null, retryLimiter, + maxTotalAttempts, responseTimeoutMillisForEachAttempt, 0); checkArguments(maxTotalAttempts, responseTimeoutMillisForEachAttempt); } RetryConfig( RetryRuleWithContent retryRuleWithContent, + @Nullable RetryLimiter retryLimiter, int maxContentLength, int maxTotalAttempts, long responseTimeoutMillisForEachAttempt) { - this(null, requireNonNull(retryRuleWithContent, "retryRuleWithContent"), - maxTotalAttempts, responseTimeoutMillisForEachAttempt, maxContentLength); + this(null, requireNonNull(retryRuleWithContent, "retryRuleWithContent"), retryLimiter, + maxTotalAttempts, responseTimeoutMillisForEachAttempt, maxContentLength); } private RetryConfig( @Nullable RetryRule retryRule, @Nullable RetryRuleWithContent retryRuleWithContent, + @Nullable RetryLimiter retryLimiter, int maxTotalAttempts, long responseTimeoutMillisForEachAttempt, int maxContentLength) { @@ -120,6 +126,7 @@ private RetryConfig( } else { fromRetryRuleWithContent = RetryRuleUtil.fromRetryRuleWithContent(retryRuleWithContent); } + this.retryLimiter = retryLimiter; } private static void checkArguments(int maxTotalAttempts, long responseTimeoutMillisForEachAttempt) { @@ -145,9 +152,12 @@ public RetryConfigBuilder toBuilder() { assert retryRule != null; builder = builder0(retryRule); } - return builder - .maxTotalAttempts(maxTotalAttempts) - .responseTimeoutMillisForEachAttempt(responseTimeoutMillisForEachAttempt); + builder.maxTotalAttempts(maxTotalAttempts) + .responseTimeoutMillisForEachAttempt(responseTimeoutMillisForEachAttempt); + if (retryLimiter != null) { + builder.limiter(retryLimiter); + } + return builder; } /** @@ -183,6 +193,15 @@ public RetryRuleWithContent retryRuleWithContent() { return retryRuleWithContent; } + /** + * Returns the {@link RetryLimiter} which was specified with + * {@link RetryConfigBuilder#limiter(RetryLimiter)}. + */ + @Nullable + public RetryLimiter retryLimiter() { + return retryLimiter; + } + /** * Returns the {@code maxContentLength}, which is non-zero only if a {@link RetryRuleWithContent} is used. */ diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfigBuilder.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfigBuilder.java index cbda418e44f..1d6b245f929 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfigBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryConfigBuilder.java @@ -24,6 +24,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; +import com.linecorp.armeria.client.retry.limiter.RetryLimiter; import com.linecorp.armeria.common.Flags; import com.linecorp.armeria.common.Response; import com.linecorp.armeria.common.annotation.Nullable; @@ -37,6 +38,8 @@ public final class RetryConfigBuilder { private int maxTotalAttempts = Flags.defaultMaxTotalAttempts(); private long responseTimeoutMillisForEachAttempt = Flags.defaultResponseTimeoutMillis(); private int maxContentLength; + @Nullable + private RetryLimiter retryLimiter; @Nullable private final RetryRule retryRule; @@ -50,6 +53,7 @@ public final class RetryConfigBuilder { this.retryRule = requireNonNull(retryRule, "retryRule"); retryRuleWithContent = null; maxContentLength = 0; + retryLimiter = null; } /** @@ -59,6 +63,7 @@ public final class RetryConfigBuilder { retryRule = null; this.retryRuleWithContent = requireNonNull(retryRuleWithContent, "retryRuleWithContent"); maxContentLength = Integer.MAX_VALUE; + retryLimiter = null; } /** @@ -111,16 +116,27 @@ public RetryConfigBuilder responseTimeoutForEachAttempt(Duration responseTime return this; } + /** + * Sets the specified {@link RetryLimiter} to be used for retry budgeting. + */ + public RetryConfigBuilder limiter(RetryLimiter retryLimiter) { + requireNonNull(retryLimiter, "retryLimiter"); + this.retryLimiter = retryLimiter; + return this; + } + /** * Returns a newly-created {@link RetryConfig} from this {@link RetryConfigBuilder}'s values. */ public RetryConfig build() { if (retryRule != null) { - return new RetryConfig<>(retryRule, maxTotalAttempts, responseTimeoutMillisForEachAttempt); + return new RetryConfig<>(retryRule, retryLimiter, maxTotalAttempts, + responseTimeoutMillisForEachAttempt); } assert retryRuleWithContent != null; return new RetryConfig<>( retryRuleWithContent, + retryLimiter, maxContentLength, maxTotalAttempts, responseTimeoutMillisForEachAttempt); @@ -139,6 +155,7 @@ ToStringHelper toStringHelper() { .add("retryRuleWithContent", retryRuleWithContent) .add("maxTotalAttempts", maxTotalAttempts) .add("responseTimeoutMillisForEachAttempt", responseTimeoutMillisForEachAttempt) - .add("maxContentLength", maxContentLength); + .add("maxContentLength", maxContentLength) + .add("retryLimiter", retryLimiter); } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiterExecutor.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiterExecutor.java new file mode 100644 index 00000000000..06e49b249ae --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryLimiterExecutor.java @@ -0,0 +1,100 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.retry.limiter.RetryLimiter; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.logging.RequestLog; + +/** + * Executes {@link RetryLimiter} operations with proper exception handling. + *

+ * This class provides static utility methods to safely execute retry limiter operations. + * It handles null limiters gracefully and catches any exceptions thrown by the limiter + * to prevent them from affecting the retry logic. + *

+ *

+ * When a limiter throws an exception, it is logged as an error and the operation + * defaults to allowing the retry to proceed (returning {@code true} for {@link #shouldRetry} + * and doing nothing for {@link #onCompletedAttempt}). + *

+ */ +final class RetryLimiterExecutor { + + private RetryLimiterExecutor() {} + + private static final Logger logger = LoggerFactory.getLogger(RetryLimiterExecutor.class); + + /** + * Determines whether a retry should be attempted based on the provided limiter. + *

+ * This method safely executes the limiter's {@link RetryLimiter#shouldRetry} method. + * If the limiter is null, this method returns {@code true} to allow retries. + * If the limiter throws an exception, it is logged and {@code true} is returned + * to ensure retries can still proceed. + *

+ * + * @param limiter the retry limiter to consult, or {@code null} if no limiter is configured + * @param ctx the client request context + * @param numAttemptsSoFar the number of attempts made so far (0-based) + * @return {@code true} if a retry should be attempted, {@code false} otherwise. + * Returns {@code true} if the limiter is null or throws an exception. + */ + public static boolean shouldRetry(@Nullable RetryLimiter limiter, ClientRequestContext ctx, + int numAttemptsSoFar) { + try { + if (limiter != null) { + return limiter.shouldRetry(ctx, numAttemptsSoFar); + } else { + return true; + } + } catch (Throwable t) { + logger.error("Failed to execute RetryLimiter.shouldRetry: limiter={}, attempts={}", limiter, + numAttemptsSoFar, t); + return true; + } + } + + /** + * Notifies the limiter that an attempt has been completed. + *

+ * This method safely executes the limiter's {@link RetryLimiter#onCompletedAttempt} method. + * If the limiter is null, this method does nothing. + * If the limiter throws an exception, it is logged but does not affect the retry flow. + *

+ * + * @param limiter the retry limiter to notify, or {@code null} if no limiter is configured + * @param ctx the client request context + * @param requestLog the request log containing details about the completed attempt + * @param numAttemptsSoFar the number of attempts made so far (0-based) + */ + public static void onCompletedAttempt(@Nullable RetryLimiter limiter, ClientRequestContext ctx, + RequestLog requestLog, int numAttemptsSoFar) { + try { + if (limiter != null) { + limiter.onCompletedAttempt(ctx, requestLog, numAttemptsSoFar); + } + } catch (Throwable t) { + logger.error("Failed to execute RetryLimiter.onCompletedAttempt: limiter={}, attempts={}", limiter, + numAttemptsSoFar, t); + } + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index d914bfece68..a71bb1790a5 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -32,6 +32,7 @@ import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.HttpClient; import com.linecorp.armeria.client.ResponseTimeoutException; +import com.linecorp.armeria.client.retry.limiter.RetryLimiter; import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.AggregationOptions; import com.linecorp.armeria.common.HttpHeaderNames; @@ -362,10 +363,11 @@ private void handleResponseWithoutContent(RetryConfig config, Clie } try { final RetryRule retryRule = retryRule(config); + final RetryLimiter limiter = config.retryLimiter(); final CompletionStage f = retryRule.shouldRetry(derivedCtx, responseCause); f.handle((decision, shouldRetryCause) -> { warnIfExceptionIsRaised(retryRule, shouldRetryCause); - handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, + handleRetryDecision(decision, limiter, ctx, derivedCtx, rootReqDuplicator, originalReq, returnedRes, future, response); return null; }); @@ -412,8 +414,9 @@ private void handleStreamingResponse(RetryConfig retryConfig, Clie .handle((decision, cause) -> { warnIfExceptionIsRaised(ruleWithContent, cause); truncatingHttpResponse.abort(); - handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, - originalReq, returnedRes, future, duplicated); + handleRetryDecision(decision, retryConfig.retryLimiter(), ctx, + derivedCtx, rootReqDuplicator, originalReq, + returnedRes, future, duplicated); return null; }); } catch (Throwable cause) { @@ -449,9 +452,9 @@ private void handleAggregatedResponse(RetryConfig retryConfig, Cli ruleWithContent.shouldRetry(derivedCtx, aggregatedRes.toHttpResponse(), null) .handle((decision, cause) -> { warnIfExceptionIsRaised(ruleWithContent, cause); - handleRetryDecision( - decision, ctx, derivedCtx, rootReqDuplicator, originalReq, - returnedRes, future, aggregatedRes.toHttpResponse()); + handleRetryDecision(decision, retryConfig.retryLimiter(), ctx, derivedCtx, + rootReqDuplicator, originalReq, returnedRes, future, + aggregatedRes.toHttpResponse()); return null; }); } catch (Throwable cause) { @@ -521,14 +524,20 @@ private static void handleException(ClientRequestContext ctx, ctx.logBuilder().endResponse(cause); } - private void handleRetryDecision(@Nullable RetryDecision decision, ClientRequestContext ctx, - ClientRequestContext derivedCtx, HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future, HttpResponse originalRes) { + private void handleRetryDecision(@Nullable RetryDecision decision, @Nullable RetryLimiter limiter, + ClientRequestContext ctx, ClientRequestContext derivedCtx, + HttpRequestDuplicator rootReqDuplicator, HttpRequest originalReq, + HttpResponse returnedRes, CompletableFuture future, + HttpResponse originalRes) { + // Notify the retry limiter that this attempt has completed + final RetryConfig config = mappedRetryConfig(ctx); + RetryLimiterExecutor.onCompletedAttempt(config.retryLimiter(), ctx, derivedCtx.log().partial(), + getTotalAttempts(ctx)); + final Backoff backoff = decision != null ? decision.backoff() : null; if (backoff != null) { final long millisAfter = useRetryAfter ? getRetryAfterMillis(derivedCtx) : -1; - final long nextDelay = getNextDelay(ctx, backoff, millisAfter); + final long nextDelay = getNextDelay(ctx, limiter, backoff, millisAfter); if (nextDelay >= 0) { abortResponse(originalRes, derivedCtx); scheduleNextRetry( diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index 32146db4c68..ff24bcf1817 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -195,10 +195,12 @@ private void doExecute0(ClientRequestContext ctx, RpcRequest req, res.handle((unused1, cause) -> { try { assert retryRule != null; + RetryLimiterExecutor.onCompletedAttempt(retryConfig.retryLimiter(), ctx, ctx.log().partial(), + totalAttempts); retryRule.shouldRetry(derivedCtx, res, cause).handle((decision, unused3) -> { final Backoff backoff = decision != null ? decision.backoff() : null; if (backoff != null) { - final long nextDelay = getNextDelay(derivedCtx, backoff); + final long nextDelay = getNextDelay(derivedCtx, retryConfig.retryLimiter(), backoff); if (nextDelay < 0) { onRetryComplete(ctx, derivedCtx, res, future); return null; diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/limiter/GrpcRetryLimiter.java b/core/src/main/java/com/linecorp/armeria/client/retry/limiter/GrpcRetryLimiter.java new file mode 100644 index 00000000000..1bc83bdb13a --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/limiter/GrpcRetryLimiter.java @@ -0,0 +1,238 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry.limiter; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_HEADERS; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_TRAILERS; + +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableSet; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.HttpHeaders; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.internal.common.InternalGrpcWebTrailers; + +/** + * Retry limiter based on a token bucket algorithm with 3 parameters: + *
    + *
  • Max tokens: Max value of tokens that can be stored in the bucket
  • + *
  • Threshold: The min number tokens in the bucket required to allow retries to happen
  • + *
  • TokenRatio: The number of tokens that a successful attempt adds to the bucket.
  • + *
+ * + *

The algorithm subtracts 1 token from the bucket for each retriable error and then, if we + * can retry (due to timeouts and max attempts), it checks if the bucket is over the threshold. + * On every successful request, we add X tokens where X equals to the token ratio. + * + *

In Grpc implementation, the threshold is hardcoded as half the capacity of the bucket but you can set your + * own threshold here. + * + *

Internally, the implementation stores all values multiplied by 1000, some manipulation is required as + * max tokens can be odd, and when setting the threshold to half of max tokens, rounding will be needed, this + * way we keep full precision. + * + *

The implementation is heavily based on GRPC Java implementation. + */ +public class GrpcRetryLimiter implements RetryLimiter { + + private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000; + + /** + * 1000 times the maxTokens. + * The number of tokens starts at maxTokens. The token_count will always be between 0 and maxTokens. + */ + final int maxTokens; + + /** + * Half of {@code maxTokens} or 1000 times the threshold. + */ + final int threshold; + + /** + * 1000 times the tokenRatio field. + */ + final int tokenRatio; + + final Set retryableStatuses; + + final AtomicInteger tokenCount = new AtomicInteger(); + + /** + * Default retry limiter based on GRPC implementation as described + * here + * + *

This constructor builds a limiter configured to only ever allow retries when the bucket is at least + * half filled, and only decrements tokens when the response status is UNAVAILABLE. + * + *

maxTokens and tokenRatio are multiplied by 1000 and converted to int for the internal + * operations + * + * @param maxTokens Initial token count + * @param tokenRatio Number of tokens a successful request increments + */ + public GrpcRetryLimiter(float maxTokens, float tokenRatio) { + // Validate inputs + checkArgument( + maxTokens > 0 && tokenRatio > 0, + "maxTokens and tokenRatio must be positive: " + "maxTokens=" + maxTokens + + ", tokenRatio=" + tokenRatio + ); + // tokenRatio is up to 3 decimal places + this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP); + this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP); + threshold = this.maxTokens / 2; + // The default gRPC retry configuration only considers UNAVAILABLE(14) as a retriable error + retryableStatuses = ImmutableSet.of("14"); + tokenCount.set(this.maxTokens); + } + + /** + * Constructs a {@link GrpcRetryLimiter} with the specified parameters. + * + *

maxTokens, tokenRatio and threshold are multiplied by 1000 and converted to int for the internal + * operations + * + * @param maxTokens the initial token count (must be positive) + * @param tokenRatio the number of tokens a successful request increments (must be positive) + * @param threshold the minimum token count required to allow a retry (must be positive and less than or + * equal to {@code maxTokens}) + * @param retryableStatuses the collection of gRPC status codes (as integers) that are considered + * retryable (must not be null or empty) + * @throws IllegalArgumentException if any argument is invalid + */ + public GrpcRetryLimiter(float maxTokens, float tokenRatio, float threshold, + Collection retryableStatuses) { + // Validate inputs + checkArgument( + maxTokens > 0 && tokenRatio > 0 && threshold >= 0, + "maxTokens, tokenRatio, and threshold must be positive: " + "maxTokens=" + maxTokens + + ", tokenRatio=" + tokenRatio + ", threshold=" + threshold + ); + checkArgument(threshold <= maxTokens, + "threshold must be less than or equal to maxTokens: " + threshold + " > " + maxTokens + ); + checkArgument(retryableStatuses != null && !retryableStatuses.isEmpty(), + "retryableStatuses cannot be null or empty: " + retryableStatuses); + + // tokenRatio is up to 3 decimal places + this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP); + this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP); + this.threshold = (int) (threshold * THREE_DECIMAL_PLACES_SCALE_UP); + // Convert statuses to String so we can use them later + this.retryableStatuses = retryableStatuses.stream() + .filter(Objects::nonNull) + .map(String::valueOf) + .collect(Collectors.toSet()); + + // Ensure we have at least one valid status after filtering nulls + if (this.retryableStatuses.isEmpty()) { + throw new IllegalArgumentException("retryableStatuses cannot contain only null values"); + } + + tokenCount.set(this.maxTokens); + } + + /** + * Determines whether a retry should be allowed based on the current token count and threshold. + * + * @param ctx the request context + * @param numAttemptsSoFar the number of attempts made so far + * @return {@code true} if a retry is allowed + */ + @Override + public boolean shouldRetry(ClientRequestContext ctx, int numAttemptsSoFar) { + return tokenCount.get() > threshold; + } + + /** + * This function handles the token increase or decrease. + * In GRPC retry throttling implementation, there are 3 cases: + *

    + *
  • When a response is received with a retryable statuses, deduct a token
  • + *
  • When a response is received with any other statuses, refill by token ration
  • + *
  • When a response is not received, the token count does not change
  • + *
+ * @param ctx full request context that also includes response information + * @param requestLog reduced context with request and response information + * @param numAttemptsSoFar number of attempts (starting with 1) + */ + @Override + public void onCompletedAttempt(ClientRequestContext ctx, RequestLog requestLog, int numAttemptsSoFar) { + // Check if response headers and trailers are available if not we don't have a valid response + if (!requestLog.isAvailable(RESPONSE_HEADERS, RESPONSE_TRAILERS)) { + return; + } + + // Extract the headers to be able to evaluate the gRPC status + // Check HTTP trailers first, because most gRPC responses have non-empty payload + trailers. + HttpHeaders maybeGrpcTrailers = requestLog.responseTrailers(); + if (!maybeGrpcTrailers.contains("grpc-status")) { + // Check HTTP headers secondly. + maybeGrpcTrailers = requestLog.responseHeaders(); + if (!maybeGrpcTrailers.contains("grpc-status")) { + // Check gRPC Web trailers lastly, because gRPC Web is the least used protocol + // in reality. + maybeGrpcTrailers = InternalGrpcWebTrailers.get(ctx); + } + } + // If there are no headers, it is not a valid response so no changes. + if (maybeGrpcTrailers == null) { + return; + } + // Check if the status is one of the retriable ones, if it is we should deduct a token if not + // add tokenRatio tokens. + final String status = maybeGrpcTrailers.get("grpc-status"); + final boolean decrement = retryableStatuses.contains(status); + + boolean updated; + do { + final int currentCount = tokenCount.get(); + if (decrement) { + if (currentCount == 0) { + break; + } + final int decremented = currentCount - THREE_DECIMAL_PLACES_SCALE_UP; + updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0)); + } else { + if (currentCount == maxTokens) { + break; + } + final int incremented = currentCount + tokenRatio; + updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens)); + } + } while (!updated); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("maxTokens", maxTokens / (double) THREE_DECIMAL_PLACES_SCALE_UP) + .add("threshold", threshold / (double) THREE_DECIMAL_PLACES_SCALE_UP) + .add("tokenRatio", tokenRatio / (double) THREE_DECIMAL_PLACES_SCALE_UP) + .add("retryableStatuses", retryableStatuses) + .add("currentTokenCount", tokenCount.get() / (double) THREE_DECIMAL_PLACES_SCALE_UP) + .toString(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/limiter/RetryLimiter.java b/core/src/main/java/com/linecorp/armeria/client/retry/limiter/RetryLimiter.java new file mode 100644 index 00000000000..228cf3921b7 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/limiter/RetryLimiter.java @@ -0,0 +1,97 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry.limiter; + +import java.util.Collection; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.logging.RequestLog; + +/** + * A strategy interface for limiting retries in Armeria clients. + * Implementations decide whether a retry should be allowed and can update internal state after each attempt. + * + *

Implementations should be thread-safe and designed to handle concurrent calls to both methods. + * They can be called concurrently by multiple threads if there are multiple calls using the same client + * instance or if the limiter is shared across multiple clients. + * + *

Handle your errors carefully. If the implementation throws an exception it will be logged and the retry + * will be allowed. + */ +@FunctionalInterface +public interface RetryLimiter { + + /** + * Creates a new {@link RetryRateLimiter} with the specified number of permits per second. + * + * @param permitsPerSecond the number of retry permits allowed per second; must be positive + */ + static RetryLimiter ofRateLimiter(double permitsPerSecond) { + return new RetryRateLimiter(permitsPerSecond); + } + + /** + * Creates a new {@link GrpcRetryLimiter} with the specified parameters. + * + * @param maxTokens the initial token count (must be positive) + * @param tokenRatio the number of tokens a successful request increments (must be positive) + */ + static RetryLimiter ofGrpc(float maxTokens, float tokenRatio) { + return new GrpcRetryLimiter(maxTokens, tokenRatio); + } + + /** + * Creates a new {@link GrpcRetryLimiter} with the specified parameters. + * + * @param maxTokens the initial token count (must be positive) + * @param tokenRatio the number of tokens a successful request increments (must be positive) + * @param threshold the minimum token count required to allow a retry (must be positive and less than or + * equal to {@code maxTokens}) + * @param retryableStatuses the collection of gRPC status codes that are considered retryable(must not be + * null or empty) + */ + static RetryLimiter ofGrpc(float maxTokens, float tokenRatio, float threshold, + Collection retryableStatuses) { + return new GrpcRetryLimiter(maxTokens, tokenRatio, threshold, retryableStatuses); + } + + /** + * Determines whether the request should be retried. + * This method is not invoked: + *

    + *
  • if the maximum number of attempts has been reached,
  • + *
  • if the request has been cancelled,
  • + *
  • if the request is the first attempt,
  • + *
  • if the retry rule has decided not to retry.
  • + *
+ * + * @param ctx the request context + * @param numAttemptsSoFar the number of attempts made so far + * @return {@code true} if the request should be retried + */ + boolean shouldRetry(ClientRequestContext ctx, int numAttemptsSoFar); + + /** + * Invoked when an attempt completes regardless of the state and if it is a retry or not. + * You can use this function to implement your retry limiting algorithm. + * + * @param ctx full request context that also includes response information + * @param requestLog reduced context with request and response information + * @param numAttemptsSoFar the number of attempts made so far, including the current one + */ + default void onCompletedAttempt(ClientRequestContext ctx, RequestLog requestLog, int numAttemptsSoFar) {} +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/limiter/RetryRateLimiter.java b/core/src/main/java/com/linecorp/armeria/client/retry/limiter/RetryRateLimiter.java new file mode 100644 index 00000000000..195a259b3f1 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/limiter/RetryRateLimiter.java @@ -0,0 +1,64 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry.limiter; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.MoreObjects; +import com.google.common.util.concurrent.RateLimiter; + +import com.linecorp.armeria.client.ClientRequestContext; + +/** + * A Retry limiter implementation that limits based on a rate limiter. + * + *

This limiter allows a flat number of retries per second + */ +public class RetryRateLimiter implements RetryLimiter { + + private final RateLimiter rateLimiter; + + /** + * Creates a new {@link RetryRateLimiter} with the specified number of permits per second. + * + * @param permitsPerSecond the number of retry permits allowed per second; must be positive + */ + public RetryRateLimiter(double permitsPerSecond) { + checkArgument(permitsPerSecond > 0, "permitsPerSecond must be positive: %s", permitsPerSecond); + rateLimiter = RateLimiter.create(permitsPerSecond); + } + + /** + * Returns {@code true} if a permit is available from the rate limiter and acquires it. + * Otherwise, returns {@code false} to prevent retry. + * + * @param ctx the request context + * @param numAttemptsSoFar the number of attempts made so far + * @return {@code true} if a retry is allowed + */ + @Override + public boolean shouldRetry(ClientRequestContext ctx, int numAttemptsSoFar) { + return rateLimiter.tryAcquire(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("permitsPerSecond", rateLimiter.getRate()) + .toString(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/limiter/package-info.java b/core/src/main/java/com/linecorp/armeria/client/retry/limiter/package-info.java new file mode 100644 index 00000000000..8d6eb5a237b --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/limiter/package-info.java @@ -0,0 +1,24 @@ +/* + * Copyright 2017 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * An interface and implementations for applying retry budgets in + * {@link com.linecorp.armeria.client.retry.RetryingClient} decorator. + */ +@NonNullByDefault +package com.linecorp.armeria.client.retry.limiter; + +import com.linecorp.armeria.common.annotation.NonNullByDefault; diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterExecutorTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterExecutorTest.java new file mode 100644 index 00000000000..ee45120768d --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryLimiterExecutorTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.retry.limiter.RetryLimiter; +import com.linecorp.armeria.common.logging.RequestLog; + +class RetryLimiterExecutorTest { + + private RetryLimiter mockLimiter; + private ClientRequestContext mockContext; + private RequestLog mockRequestLog; + + @BeforeEach + void setUp() { + mockLimiter = mock(RetryLimiter.class); + mockContext = mock(ClientRequestContext.class); + mockRequestLog = mock(RequestLog.class); + } + + @Test + void shouldRetry_withNullLimiter_returnsTrue() { + final boolean result = RetryLimiterExecutor.shouldRetry(null, mockContext, 0); + + assertThat(result).isTrue(); + } + + @Test + void shouldRetry_withLimiterReturningTrue_returnsTrue() { + when(mockLimiter.shouldRetry(any(), anyInt())).thenReturn(true); + + final boolean result = RetryLimiterExecutor.shouldRetry(mockLimiter, mockContext, 1); + + assertThat(result).isTrue(); + verify(mockLimiter).shouldRetry(mockContext, 1); + } + + @Test + void shouldRetry_withLimiterReturningFalse_returnsFalse() { + when(mockLimiter.shouldRetry(any(), anyInt())).thenReturn(false); + + final boolean result = RetryLimiterExecutor.shouldRetry(mockLimiter, mockContext, 2); + + assertThat(result).isFalse(); + verify(mockLimiter).shouldRetry(mockContext, 2); + } + + @Test + void shouldRetry_withLimiterThrowingException_returnsTrue() { + when(mockLimiter.shouldRetry(any(), anyInt())).thenThrow(new RuntimeException("Test exception")); + + final boolean result = RetryLimiterExecutor.shouldRetry(mockLimiter, mockContext, 3); + + assertThat(result).isTrue(); + verify(mockLimiter).shouldRetry(mockContext, 3); + } + + @Test + void onCompletedAttempt_withNullLimiter_doesNothing() { + RetryLimiterExecutor.onCompletedAttempt(null, mockContext, mockRequestLog, 0); + + verifyNoInteractions(mockLimiter); + } + + @Test + void onCompletedAttempt_withLimiter_callsLimiter() { + RetryLimiterExecutor.onCompletedAttempt(mockLimiter, mockContext, mockRequestLog, 1); + + verify(mockLimiter).onCompletedAttempt(mockContext, mockRequestLog, 1); + } + + @Test + void onCompletedAttempt_withLimiterThrowingException_doesNotPropagateException() { + doThrow(new RuntimeException("Test exception")).when(mockLimiter) + .onCompletedAttempt(any(), any(), anyInt()); + + // Should not throw an exception + RetryLimiterExecutor.onCompletedAttempt(mockLimiter, mockContext, mockRequestLog, 2); + + verify(mockLimiter).onCompletedAttempt(mockContext, mockRequestLog, 2); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientWithLimiterTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientWithLimiterTest.java new file mode 100644 index 00000000000..66c77763e59 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientWithLimiterTest.java @@ -0,0 +1,483 @@ +/* + * Copyright 2025 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.ResponseTimeoutException; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.retry.limiter.RetryLimiter; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.server.AbstractHttpService; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +class RetryingClientWithLimiterTest { + + private static ClientFactory clientFactory; + + @BeforeAll + static void beforeAll() { + // Use different eventLoop from server's so that clients don't hang when the eventLoop in server hangs + clientFactory = ClientFactory.builder().workerGroup(2).build(); + } + + @AfterAll + static void afterAll() { + clientFactory.closeAsync(); + } + + private AtomicInteger reqCount; + + @RegisterExtension + final ServerExtension server = new ServerExtension() { + @Override + protected boolean runForEachTest() { + return true; + } + + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.service("/500-then-success", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) { + if (reqCount.getAndIncrement() < 1) { + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); + } else { + return HttpResponse.of("Succeeded after retry"); + } + } + }); + + sb.service("/503-then-success", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) { + if (reqCount.getAndIncrement() < 1) { + return HttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE); + } else { + return HttpResponse.of("Succeeded after retry"); + } + } + }); + + sb.service("/always-500", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) { + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); + } + }); + + sb.service("/timeout-then-success", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) { + if (reqCount.getAndIncrement() < 1) { + // Simulate a slow response that will timeout + return HttpResponse.delayed(HttpResponse.of("Success"), Duration.ofSeconds(2)); + } else { + return HttpResponse.of("Succeeded after retry"); + } + } + }); + + sb.service("/immediate-success", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) { + return HttpResponse.of("Immediate success"); + } + }); + + sb.service("/exception-then-success", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) { + if (reqCount.getAndIncrement() < 1) { + throw new RuntimeException("Simulated exception"); + } else { + return HttpResponse.of("Succeeded after retry"); + } + } + }); + } + }; + + @BeforeEach + void setUp() { + reqCount = new AtomicInteger(); + } + + private static RetryLimiter createMockLimiter(boolean shouldRetry) { + final RetryLimiter mockLimiter = mock(RetryLimiter.class); + when(mockLimiter.shouldRetry(any(), anyInt())).thenReturn(shouldRetry); + return mockLimiter; + } + + private static RetryConfig createRetryConfig(RetryRule rule, RetryLimiter limiter) { + return RetryConfig.builder(rule) + .limiter(limiter) + .build(); + } + + private static RetryConfig createRetryConfig(RetryRuleWithContent rule, + RetryLimiter limiter) { + return RetryConfig.builder(rule) + .limiter(limiter) + .maxContentLength(1024) + .build(); + } + + private static RetryConfig createRetryConfig(RetryRule rule, RetryLimiter limiter, + int maxTotalAttempts) { + return RetryConfig.builder(rule) + .limiter(limiter) + .maxTotalAttempts(maxTotalAttempts) + .build(); + } + + private static RetryConfig createTimeoutRetryConfig(RetryLimiter limiter) { + return RetryConfig.builder(RetryRule.builder() + .onException(ResponseTimeoutException.class) + .thenBackoff()) + .limiter(limiter) + .responseTimeoutMillisForEachAttempt(500) + .build(); + } + + private WebClient createClient(RetryConfig retryConfig) { + return WebClient.builder(server.httpUri()) + .factory(clientFactory) + .decorator(RetryingClient.builder(retryConfig).newDecorator()) + .build(); + } + + private WebClient createClientWithMapping(RetryConfigMapping mapping) { + return WebClient.builder(server.httpUri()) + .factory(clientFactory) + .decorator(RetryingClient.builderWithMapping(mapping).newDecorator()) + .build(); + } + + private static void verifyOnCompletedAttemptCalls(RetryLimiter limiter, int expectedCalls) { + verify(limiter, times(expectedCalls)).onCompletedAttempt(any(), any(), anyInt()); + } + + private static void verifyOnCompletedAttemptCalls(RetryLimiter limiter, int expectedCalls, + int... attemptNumbers) { + verify(limiter, times(expectedCalls)).onCompletedAttempt(any(), any(), anyInt()); + for (int attemptNumber : attemptNumbers) { + verify(limiter, times(1)).onCompletedAttempt(any(), any(), eq(attemptNumber)); + } + } + + private static void verifyShouldRetryCalls(RetryLimiter limiter, int expectedCalls, int... attemptNumbers) { + verify(limiter, times(expectedCalls)).shouldRetry(any(), anyInt()); + for (int attemptNumber : attemptNumbers) { + verify(limiter, times(1)).shouldRetry(any(), eq(attemptNumber)); + } + } + + private static void verifyShouldRetryNeverCalled(RetryLimiter limiter) { + verify(limiter, never()).shouldRetry(any(), anyInt()); + } + + @Test + void retryLimiterShouldBeCalledOnCompletedAttempt() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createRetryConfig( + RetryRuleWithContent.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/500-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldPreventRetryWhenShouldRetryReturnsFalse() { + final RetryLimiter mockLimiter = createMockLimiter(false); + final RetryConfig retryConfig = createRetryConfig( + RetryRuleWithContent.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/always-500").aggregate().join(); + assertThat(res.status()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR); + + verifyOnCompletedAttemptCalls(mockLimiter, 1, 1); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldNotBeCalledOnFirstAttempt() { + final RetryLimiter mockLimiter = mock(RetryLimiter.class); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/immediate-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Immediate success"); + + verifyOnCompletedAttemptCalls(mockLimiter, 1); + verifyShouldRetryNeverCalled(mockLimiter); + } + + @Test + void retryLimiterShouldBeCalledOnTimeoutRetry() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createTimeoutRetryConfig(mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/timeout-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldBeCalledOnExceptionRetry() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/exception-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldBeCalledWithCorrectAttemptNumbers() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter, + 3); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/always-500").aggregate().join(); + assertThat(res.status()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR); + + verifyOnCompletedAttemptCalls(mockLimiter, 3, 1, 2, 3); + verifyShouldRetryCalls(mockLimiter, 2, 1, 2); + } + + @Test + void retryLimiterShouldNotBeCalledWhenMaxAttemptsReached() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter, + 2); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/always-500").aggregate().join(); + assertThat(res.status()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldBeCalledForStreamingResponses() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/503-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldBeCalledForRetryRuleWithContent() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createRetryConfig( + RetryRuleWithContent.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/503-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldNotBeCalledWhenRetryRuleDecidesNotToRetry() { + final RetryLimiter mockLimiter = mock(RetryLimiter.class); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onStatus(HttpStatus.BAD_REQUEST) // Won't match 500 + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/always-500").aggregate().join(); + assertThat(res.status()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR); + + verifyOnCompletedAttemptCalls(mockLimiter, 1); + verifyShouldRetryNeverCalled(mockLimiter); + } + + @Test + void retryLimiterShouldBeCalledWithCorrectContextAndLog() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + final AggregatedHttpResponse res = client.get("/500-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2, 1, 2); + } + + @Test + void retryLimiterShouldBeCalledForMultipleRetries() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter, + 4); + final WebClient client = createClient(retryConfig); + + // Reset counter to simulate multiple failures + reqCount.set(0); + + final AggregatedHttpResponse res = client.get("/always-500").aggregate().join(); + assertThat(res.status()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR); + + verifyOnCompletedAttemptCalls(mockLimiter, 4); + verifyShouldRetryCalls(mockLimiter, 3, 1, 2, 3); + } + + @Test + void retryLimiterShouldBeCalledForRetryConfigMapping() { + final RetryLimiter mockLimiter = createMockLimiter(true); + final RetryConfigMapping mapping = RetryConfigMapping.of( + (ctx, req) -> "test-key", + (ctx, req) -> createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter, + 2) + ); + final WebClient client = createClientWithMapping(mapping); + + final AggregatedHttpResponse res = client.get("/500-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldHandleExceptionInShouldRetry() { + final RetryLimiter mockLimiter = mock(RetryLimiter.class); + when(mockLimiter.shouldRetry(any(), anyInt())).thenThrow(new RuntimeException("Limiter exception")); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter); + final WebClient client = createClient(retryConfig); + + // The exception from shouldRetry should not prevent the retry + final AggregatedHttpResponse res = client.get("/500-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } + + @Test + void retryLimiterShouldHandleExceptionInOnCompletedAttempt() { + final RetryLimiter mockLimiter = createMockLimiter(true); + doThrow(new RuntimeException("onCompletedAttempt exception")) + .when(mockLimiter).onCompletedAttempt(any(), any(), anyInt()); + final RetryConfig retryConfig = createRetryConfig( + RetryRule.builder() + .onServerErrorStatus() + .thenBackoff(), + mockLimiter, + 2); + final WebClient client = createClient(retryConfig); + + // The exception from onCompletedAttempt should not prevent the retry + final AggregatedHttpResponse res = client.get("/500-then-success").aggregate().join(); + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + verifyOnCompletedAttemptCalls(mockLimiter, 2); + verifyShouldRetryCalls(mockLimiter, 1, 1); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/limiter/GrpcRetryLimiterTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/limiter/GrpcRetryLimiterTest.java new file mode 100644 index 00000000000..ab46cb81468 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/limiter/GrpcRetryLimiterTest.java @@ -0,0 +1,427 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry.limiter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.HttpHeaders; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.common.logging.RequestLogProperty; + +class GrpcRetryLimiterTest { + + private final ClientRequestContext ctx = mock(ClientRequestContext.class); + + private static HttpHeaders createGrpcHeaders(String status) { + return HttpHeaders.builder() + .add("grpc-status", status) + .build(); + } + + private static HttpHeaders createEmptyHeaders() { + return HttpHeaders.builder().build(); + } + + private static ResponseHeaders createResponseHeaders(String status) { + return ResponseHeaders.builder(200) + .add("grpc-status", status) + .build(); + } + + private static ResponseHeaders createEmptyResponseHeaders() { + return ResponseHeaders.of(200); + } + + // ============================================================================ + // Constructor Tests + // ============================================================================ + + @Test + void defaultConstructor() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + + assertThat(limiter.maxTokens).isEqualTo(10000); // 10 * 1000 + assertThat(limiter.threshold).isEqualTo(5000); // 10 / 2 * 1000 + assertThat(limiter.tokenRatio).isEqualTo(1000); // 1 * 1000 + assertThat(limiter.retryableStatuses).containsExactly("14"); // UNAVAILABLE + assertThat(limiter.tokenCount.get()).isEqualTo(10000); // maxTokens + } + + @Test + void defaultConstructor_withFloatValues() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.5f, 1.25f); + + assertThat(limiter.maxTokens).isEqualTo(10500); // 10.5 * 1000 + assertThat(limiter.threshold).isEqualTo(5250); // 10.5 / 2 * 1000 + assertThat(limiter.tokenRatio).isEqualTo(1250); // 1.25 * 1000 + assertThat(limiter.retryableStatuses).containsExactly("14"); // UNAVAILABLE + assertThat(limiter.tokenCount.get()).isEqualTo(10500); // maxTokens + } + + @Test + void customConstructor() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(20.0f, 2.0f, 8.0f, Arrays.asList(14, 13)); + + assertThat(limiter.maxTokens).isEqualTo(20000); // 20 * 1000 + assertThat(limiter.threshold).isEqualTo(8000); + assertThat(limiter.tokenRatio).isEqualTo(2000); // 2 * 1000 + assertThat(limiter.retryableStatuses).containsExactlyInAnyOrder("14", "13"); + assertThat(limiter.tokenCount.get()).isEqualTo(20000); // maxTokens + } + + @Test + void customConstructor_withFloatValues() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(20.5f, 2.25f, 8.75f, Arrays.asList(14, 13)); + + assertThat(limiter.maxTokens).isEqualTo(20500); // 20.5 * 1000 + assertThat(limiter.threshold).isEqualTo(8750); // 8.75 * 1000 + assertThat(limiter.tokenRatio).isEqualTo(2250); // 2.25 * 1000 + assertThat(limiter.retryableStatuses).containsExactlyInAnyOrder("14", "13"); + assertThat(limiter.tokenCount.get()).isEqualTo(20500); // maxTokens + } + + @Test + void constructor_withThresholdEqualToMaxTokens() { + // This should be valid + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f, 10.0f, + Collections.singletonList(14)); + assertThat(limiter).isNotNull(); + assertThat(limiter.maxTokens).isEqualTo(10000); + assertThat(limiter.threshold).isEqualTo(10000); + } + + // ============================================================================ + // Constructor Validation Tests + // ============================================================================ + + @Test + void defaultConstructor_withZeroMaxTokens() { + final String error = + "maxTokens and tokenRatio must be positive: maxTokens=0.0, tokenRatio=1.0"; + assertThatThrownBy(() -> new GrpcRetryLimiter(0.0f, 1.0f)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(error); + } + + @Test + void defaultConstructor_withZeroTokenRatio() { + final String error = + "maxTokens and tokenRatio must be positive: maxTokens=1.0, tokenRatio=0.0"; + assertThatThrownBy(() -> new GrpcRetryLimiter(1.0f, 0.0f)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(error); + } + + @Test + void defaultConstructor_withNegativeTokenRatio() { + final String error = + "maxTokens and tokenRatio must be positive: maxTokens=1.0, tokenRatio=-1.0"; + assertThatThrownBy(() -> new GrpcRetryLimiter(1.0f, -1.0f)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(error); + } + + @Test + void constructor_withZeroMaxTokens() { + final String error = "maxTokens, tokenRatio, and threshold must be positive:"; + assertThatThrownBy(() -> new GrpcRetryLimiter(0.0f, 1.0f, 5.0f, Collections.singletonList(14))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining(error); + } + + @Test + void constructor_withZeroTokenRatio() { + final String error = "maxTokens, tokenRatio, and threshold must be positive:"; + assertThatThrownBy(() -> new GrpcRetryLimiter(1.0f, 0.0f, 5.0f, Collections.singletonList(14))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining(error); + } + + @Test + void constructor_withNegativeThreshold() { + final String error = "maxTokens, tokenRatio, and threshold must be positive:"; + assertThatThrownBy(() -> new GrpcRetryLimiter(1.0f, 1.0f, -1.0f, Collections.singletonList(14))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining(error); + } + + @Test + void constructor_withThresholdGreaterThanMaxTokens() { + assertThatThrownBy(() -> new GrpcRetryLimiter(10.0f, 1.0f, 15.0f, Collections.singletonList(14))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("threshold must be less than or equal to maxTokens: 15.0 > 10.0"); + } + + @Test + void constructor_withNullRetryableStatuses() { + assertThatThrownBy(() -> new GrpcRetryLimiter(10.0f, 1.0f, 5.0f, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("retryableStatuses cannot be null or empty: null"); + } + + @Test + void constructor_withEmptyRetryableStatuses() { + assertThatThrownBy(() -> new GrpcRetryLimiter(10.0f, 1.0f, 5.0f, Collections.emptyList())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("retryableStatuses cannot be null or empty: []"); + } + + @Test + void constructor_withNullValuesInRetryableStatuses() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f, 5.0f, Arrays.asList(14, null, 13)); + + assertThat(limiter.retryableStatuses).containsExactlyInAnyOrder("14", "13"); + } + + @Test + void constructor_withOnlyNullValuesInRetryableStatuses() { + assertThatThrownBy(() -> new GrpcRetryLimiter(10.0f, 1.0f, 5.0f, Arrays.asList(null, null))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("retryableStatuses cannot contain only null values"); + } + + // ============================================================================ + // shouldRetry Tests + // ============================================================================ + + @Test + void retryAllowed_whenTokensAboveThreshold() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + + // Token count starts at maxTokens (10000), threshold is 5 + assertThat(limiter.shouldRetry(ctx, 2)).isTrue(); + assertThat(limiter.shouldRetry(ctx, 3)).isTrue(); + } + + @Test + void retryNotAllowed_whenTokensAtThreshold() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f, 5.0f, Collections.singletonList(14)); + + // Set token count to threshold + limiter.tokenCount.set(5000); + + assertThat(limiter.shouldRetry(ctx, 2)).isFalse(); + } + + @Test + void retryNotAllowed_whenTokensBelowThreshold() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f, 5.0f, Collections.singletonList(14)); + + // Set token count below threshold + limiter.tokenCount.set(4000); + + assertThat(limiter.shouldRetry(ctx, 2)).isFalse(); + } + + // ============================================================================ + // onCompletedAttempt Tests + // ============================================================================ + + @Test + void onCompletedAttempt_withRetryableStatus_decrementsTokens() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + final RequestLog requestLog = mock(RequestLog.class); + final HttpHeaders headers = createGrpcHeaders("14"); // UNAVAILABLE + + when(requestLog.responseTrailers()).thenReturn(headers); + when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) + .thenReturn(true); + + final int initialTokens = limiter.tokenCount.get(); + limiter.onCompletedAttempt(ctx, requestLog, 1); + + assertThat(limiter.tokenCount.get()).isEqualTo(initialTokens - 1000); + } + + @Test + void onCompletedAttempt_withNonRetryableStatus_incrementsTokens() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + final RequestLog requestLog = mock(RequestLog.class); + final HttpHeaders headers = createGrpcHeaders("0"); // OK + + when(requestLog.responseTrailers()).thenReturn(headers); + when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) + .thenReturn(true); + + // Set token count to less than max to allow increment + limiter.tokenCount.set(5000); + + final int initialTokens = limiter.tokenCount.get(); + limiter.onCompletedAttempt(ctx, requestLog, 1); + + assertThat(limiter.tokenCount.get()).isEqualTo(initialTokens + 1000); + } + + @Test + void onCompletedAttempt_withException_doesNotChangeTokens() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + final RequestLog requestLog = mock(RequestLog.class); + when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) + .thenReturn(false); + + final int initialTokens = limiter.tokenCount.get(); + limiter.onCompletedAttempt(ctx, requestLog, 1); + + assertThat(limiter.tokenCount.get()).isEqualTo(initialTokens); + } + + @Test + void onCompletedAttempt_withNoGrpcStatus_doesNotChangeTokens() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + final RequestLog requestLog = mock(RequestLog.class); + final HttpHeaders headers = createEmptyHeaders(); // No grpc-status + + when(requestLog.responseTrailers()).thenReturn(headers); + when(requestLog.responseHeaders()).thenReturn(createEmptyResponseHeaders()); + when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) + .thenReturn(true); + + final int initialTokens = limiter.tokenCount.get(); + limiter.onCompletedAttempt(ctx, requestLog, 1); + + assertThat(limiter.tokenCount.get()).isEqualTo(initialTokens); + } + + @Test + void onCompletedAttempt_withGrpcStatusInHeaders() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + final RequestLog requestLog = mock(RequestLog.class); + final HttpHeaders trailers = createEmptyHeaders(); // No grpc-status in trailers + final ResponseHeaders headers = createResponseHeaders("14"); // UNAVAILABLE in headers + + when(requestLog.responseTrailers()).thenReturn(trailers); + when(requestLog.responseHeaders()).thenReturn(headers); + when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) + .thenReturn(true); + + final int initialTokens = limiter.tokenCount.get(); + limiter.onCompletedAttempt(ctx, requestLog, 1); + + assertThat(limiter.tokenCount.get()).isEqualTo(initialTokens - 1000); + } + + @Test + void onCompletedAttempt_tokenCountNeverGoesBelowZero() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + final RequestLog requestLog = mock(RequestLog.class); + final HttpHeaders headers = createGrpcHeaders("14"); // UNAVAILABLE + + when(requestLog.responseTrailers()).thenReturn(headers); + when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) + .thenReturn(true); + + // Set token count to 0 + limiter.tokenCount.set(0); + + limiter.onCompletedAttempt(ctx, requestLog, 1); + + assertThat(limiter.tokenCount.get()).isEqualTo(0); + } + + @Test + void onCompletedAttempt_tokenCountNeverExceedsMaxTokens() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + final RequestLog requestLog = mock(RequestLog.class); + final HttpHeaders headers = createGrpcHeaders("0"); // OK + + when(requestLog.responseTrailers()).thenReturn(headers); + when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) + .thenReturn(true); + + // Set token count to max + limiter.tokenCount.set(limiter.maxTokens); + + limiter.onCompletedAttempt(ctx, requestLog, 1); + + assertThat(limiter.tokenCount.get()).isEqualTo(limiter.maxTokens); + } + + // ============================================================================ + // Concurrency Tests + // ============================================================================ + + @Test + void onCompletedAttempt_concurrentAccess() throws InterruptedException { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f); + final RequestLog requestLog = mock(RequestLog.class); + final HttpHeaders headers = createGrpcHeaders("14"); // UNAVAILABLE + + when(requestLog.responseTrailers()).thenReturn(headers); + when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) + .thenReturn(true); + + // Start with max tokens + limiter.tokenCount.set(limiter.maxTokens); + + // Create thread pool for concurrent access testing + final int threadCount = 10; + final int attemptsPerThread = 100; + final ExecutorService executor = Executors.newFixedThreadPool(threadCount); + final CountDownLatch latch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < attemptsPerThread; j++) { + limiter.onCompletedAttempt(ctx, requestLog, 1); + } + } finally { + latch.countDown(); + } + }); + } + + // Wait for all threads to complete + latch.await(5, TimeUnit.SECONDS); + executor.shutdown(); + + // Verify final token count is correct (maxTokens - 1000 * threadCount * attemptsPerThread) + final int expectedTokens = limiter.maxTokens - (1000 * threadCount * attemptsPerThread); + assertThat(limiter.tokenCount.get()).isEqualTo(Math.max(0, expectedTokens)); + } + + @Test + void grpcRetryLimiter_toString() { + final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.5f, 1.25f, 5.75f, Arrays.asList(14)); + final String toString = limiter.toString(); + + assertThat(toString).contains("GrpcRetryLimiter"); + assertThat(toString).contains("maxTokens=10.5"); + assertThat(toString).contains("threshold=5.75"); + assertThat(toString).contains("tokenRatio=1.25"); + assertThat(toString).contains("retryableStatuses=[14]"); + assertThat(toString).contains("currentTokenCount=10.5"); + // After some token changes (simulate by directly setting) + limiter.tokenCount.set(5500); // 5.5 tokens + final String updatedToString = limiter.toString(); + assertThat(updatedToString).contains("currentTokenCount=5.5"); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/limiter/RetryLimiterTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/limiter/RetryLimiterTest.java new file mode 100644 index 00000000000..ba6eb12423c --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/limiter/RetryLimiterTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry.limiter; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +class RetryLimiterTest { + + // ============================================================================ + // RetryRateLimiter.ofRateLimiter Tests + // ============================================================================ + + @Test + void ofRateLimiter() { + final RetryLimiter limiter = RetryLimiter.ofRateLimiter(2.5); + + assertThat(limiter).isInstanceOf(RetryRateLimiter.class); + final RetryRateLimiter rateLimiter = (RetryRateLimiter) limiter; + assertThat(rateLimiter.toString()).contains("permitsPerSecond=2.5"); + } + + @Test + void ofGrpc() { + final RetryLimiter limiter = RetryLimiter.ofGrpc(10.5f, 1.25f); + + assertThat(limiter).isInstanceOf(GrpcRetryLimiter.class); + final GrpcRetryLimiter grpcLimiter = (GrpcRetryLimiter) limiter; + assertThat(grpcLimiter.maxTokens).isEqualTo(10500); // 10.5 * 1000 + assertThat(grpcLimiter.threshold).isEqualTo(5250); // 10.5 / 2 * 1000 + assertThat(grpcLimiter.tokenRatio).isEqualTo(1250); // 1.25 * 1000 + assertThat(grpcLimiter.retryableStatuses).containsExactly("14"); // UNAVAILABLE + } + + @Test + void ofGrpc_withAllParameters() { + final RetryLimiter limiter = RetryLimiter.ofGrpc(20.5f, 2.25f, 8.75f, Arrays.asList(14, 13)); + + assertThat(limiter).isInstanceOf(GrpcRetryLimiter.class); + final GrpcRetryLimiter grpcLimiter = (GrpcRetryLimiter) limiter; + assertThat(grpcLimiter.maxTokens).isEqualTo(20500); // 20.5 * 1000 + assertThat(grpcLimiter.threshold).isEqualTo(8750); // 8.75 * 1000 + assertThat(grpcLimiter.tokenRatio).isEqualTo(2250); // 2.25 * 1000 + assertThat(grpcLimiter.retryableStatuses).containsExactlyInAnyOrder("14", "13"); + } + + @Test + void ofRateLimiter_functionalTest() { + final RetryLimiter limiter = RetryLimiter.ofRateLimiter(10.0); // High rate for testing + + // Should allow first retry (RateLimiter allows one initial permit) + assertThat(limiter.shouldRetry(null, 1)).isTrue(); + // Second retry should be denied due to rate limiting + assertThat(limiter.shouldRetry(null, 2)).isFalse(); + } + + @Test + void ofGrpc_functionalTest() { + final RetryLimiter limiter = RetryLimiter.ofGrpc(10.0f, 1.0f); + + // Should allow retries when tokens are above threshold + assertThat(limiter.shouldRetry(null, 1)).isTrue(); + assertThat(limiter.shouldRetry(null, 2)).isTrue(); + } + + @Test + void ofGrpc_withCustomThreshold_functionalTest() { + final RetryLimiter limiter = RetryLimiter.ofGrpc(10.0f, 1.0f, 8.0f, Collections.singletonList(14)); + + // Should allow retries when tokens are above threshold + assertThat(limiter.shouldRetry(null, 1)).isTrue(); + assertThat(limiter.shouldRetry(null, 2)).isTrue(); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/limiter/RetryRateLimiterTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/limiter/RetryRateLimiterTest.java new file mode 100644 index 00000000000..a6b92cbc5cf --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/retry/limiter/RetryRateLimiterTest.java @@ -0,0 +1,181 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retry.limiter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.ClientRequestContext; + +class RetryRateLimiterTest { + + private final ClientRequestContext ctx = mock(ClientRequestContext.class); + + @Test + void constructor_withPositiveRate() { + final RetryRateLimiter limiter = new RetryRateLimiter(10.0); + + // Should not throw any exception + assertThat(limiter).isNotNull(); + } + + @Test + void constructor_withZeroRate() { + assertThatThrownBy(() -> new RetryRateLimiter(0.0)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void constructor_withNegativeRate() { + assertThatThrownBy(() -> new RetryRateLimiter(-1.0)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldRetry_withHighRate_shouldAllowOneInitialRetry() { + final RetryRateLimiter limiter = new RetryRateLimiter(100.0); // 100 permits per second + + // With any rate, only the first retry should be allowed initially + assertThat(limiter.shouldRetry(ctx, 2)).isTrue(); + assertThat(limiter.shouldRetry(ctx, 3)).isFalse(); + assertThat(limiter.shouldRetry(ctx, 4)).isFalse(); + } + + @Test + void shouldRetry_withLowRate_shouldLimitRetries() { + final RetryRateLimiter limiter = new RetryRateLimiter(1.0); // 1 permit per second + + // First retry should be allowed + assertThat(limiter.shouldRetry(ctx, 2)).isTrue(); + + // Second retry immediately after should be denied due to rate limiting + assertThat(limiter.shouldRetry(ctx, 3)).isFalse(); + } + + @Test + void shouldRetry_withVeryHighRate_shouldStillLimitInitialRetries() { + final RetryRateLimiter limiter = new RetryRateLimiter(1000.0); // 1000 permits per second + + // Even with high rate, only the first retry should be allowed initially + assertThat(limiter.shouldRetry(ctx, 2)).isTrue(); + assertThat(limiter.shouldRetry(ctx, 3)).isFalse(); + assertThat(limiter.shouldRetry(ctx, 4)).isFalse(); + } + + @Test + void shouldRetry_rateLimitingBehavior() throws InterruptedException { + final RetryRateLimiter limiter = new RetryRateLimiter(2.0); // 2 permits per second + + // First retry should be allowed + assertThat(limiter.shouldRetry(ctx, 2)).isTrue(); + + // Second retry immediately after should be denied + assertThat(limiter.shouldRetry(ctx, 3)).isFalse(); + + // Wait for 0.5 seconds + some tolerance to allow rate limiter to refill + //(2 permits per second = 0.5 seconds per permit) + Thread.sleep(500 + 100); + + // Should allow one more retry after waiting + assertThat(limiter.shouldRetry(ctx, 4)).isTrue(); + + // But the next one should be denied again + assertThat(limiter.shouldRetry(ctx, 5)).isFalse(); + } + + @Test + void shouldRetry_concurrentAccess() throws InterruptedException { + final RetryRateLimiter limiter = new RetryRateLimiter(10.0); // 10 permits per second + + final int threadCount = 5; + final int attemptsPerThread = 10; + final ExecutorService executor = Executors.newFixedThreadPool(threadCount); + final CountDownLatch latch = new CountDownLatch(threadCount); + final AtomicInteger successfulRetries = new AtomicInteger(0); + final AtomicInteger failedRetries = new AtomicInteger(0); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < attemptsPerThread; j++) { + if (limiter.shouldRetry(ctx, j)) { + successfulRetries.incrementAndGet(); + } else { + failedRetries.incrementAndGet(); + } + } + } finally { + latch.countDown(); + } + }); + } + + latch.await(5, TimeUnit.SECONDS); + executor.shutdown(); + + // With 10 permits per second and 5 threads making 10 attempts each, + // we should have some successful and some failed retries + assertThat(successfulRetries.get()).isGreaterThan(0); + assertThat(failedRetries.get()).isGreaterThan(0); + assertThat(successfulRetries.get() + failedRetries.get()).isEqualTo(threadCount * attemptsPerThread); + } + + @Test + void shouldRetry_withFractionalRate() { + final RetryRateLimiter limiter = new RetryRateLimiter(0.5); // 0.5 permits per second + + // First retry should be allowed + assertThat(limiter.shouldRetry(ctx, 2)).isTrue(); + + // Second retry immediately after should be denied due to rate limiting + assertThat(limiter.shouldRetry(ctx, 3)).isFalse(); + + // Wait for 2 seconds + some tolerance to allow rate limiter to refill + //(0.5 permits per second = 2 seconds per permit) + try { + Thread.sleep(2000 + 100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Should allow one more retry after waiting + assertThat(limiter.shouldRetry(ctx, 4)).isTrue(); + } + + @Test + void multipleInstances_areIndependent() { + final RetryRateLimiter limiter1 = new RetryRateLimiter(1.0); + final RetryRateLimiter limiter2 = new RetryRateLimiter(1.0); + + // Both limiters should allow their first retry + assertThat(limiter1.shouldRetry(ctx, 2)).isTrue(); + assertThat(limiter2.shouldRetry(ctx, 2)).isTrue(); + + // Both limiters should deny their second retry due to rate limiting + assertThat(limiter1.shouldRetry(ctx, 3)).isFalse(); + assertThat(limiter2.shouldRetry(ctx, 3)).isFalse(); + } +}