Skip to content

Commit 5997d1f

Browse files
authored
Be more conservative in RPC peer rate limiting (#9841)
* tmp * fix * refactor * rate limit via storageLimitHit * update default limits * fix * fix and add tests * tmp * improvement * add logs * Revert "add logs" This reverts commit 2b2efc740b0134e0e4b68d22fe0f22e575a1c33a. * Revert "improvement" This reverts commit 17206d02fc77f4b981814998048eee4b9196928f. * Revert "tmp" This reverts commit 5f53cf433840c5a3ad5ca102e0aa94f182f65e3c. * revert some complexity * revert more some complexity * revert more some complexity * do not adjust on error everywhere * add comment
1 parent 16ceff1 commit 5997d1f

13 files changed

Lines changed: 42 additions & 121 deletions

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ public class P2PConfig {
5555
// DEFAULT_BACKGROUND_THREAD_COUNT)
5656
// The storage query channel allows up to 10 parallel queries (STORAGE_QUERY_CHANNEL_PARALLELISM)
5757
// To avoid resource saturation and ensure capacity for other tasks, we limit historical data
58-
// queries to 5
59-
public static final int DEFAULT_HISTORICAL_DATA_MAX_CONCURRENT_QUERIES = 5;
60-
public static final int DEFAULT_HISTORICAL_MAX_QUERY_QUEUE_SIZE = 100_000;
58+
// queries to 3
59+
public static final int DEFAULT_HISTORICAL_DATA_MAX_CONCURRENT_QUERIES = 3;
60+
public static final int DEFAULT_HISTORICAL_MAX_QUERY_QUEUE_SIZE = 500;
6161

6262
private final Spec spec;
6363
private final NetworkConfig networkConfig;

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BeaconBlocksByRangeMessageHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ public void onIncomingMessage(
156156
callback.completeSuccessfully();
157157
},
158158
error -> {
159-
peer.adjustBlocksRequest(maybeRequestKey.get(), 0);
160159
final Throwable rootCause = Throwables.getRootCause(error);
161160
if (rootCause instanceof RpcException) {
162161
LOG.trace("Rejecting beacon blocks by range request", error); // Keep full context

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BeaconBlocksByRootMessageHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,7 @@ public void onIncomingMessage(
137137
}
138138
callback.completeSuccessfully();
139139
},
140-
err -> {
141-
peer.adjustBlocksRequest(maybeRequestKey.get(), 0);
142-
handleError(callback, err);
143-
});
140+
err -> handleError(callback, err));
144141
}
145142

146143
private SafeFuture<Optional<SignedBeaconBlock>> retrieveBlock(final Bytes32 blockRoot) {

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRangeMessageHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,7 @@ public void onIncomingMessage(
213213
LOG.trace("Sent {} blob sidecars to peer {}.", sentBlobSidecars, peer.getId());
214214
callback.completeSuccessfully();
215215
},
216-
error -> {
217-
peer.adjustBlobSidecarsRequest(maybeRequestKey.get(), 0);
218-
handleProcessingRequestError(error, callback);
219-
});
216+
error -> handleProcessingRequestError(error, callback));
220217
}
221218

222219
private int calculateRequestedCount(

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRootMessageHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,7 @@ public void onIncomingMessage(
145145
}
146146
callback.completeSuccessfully();
147147
},
148-
err -> {
149-
peer.adjustBlobSidecarsRequest(maybeRequestKey.get(), 0);
150-
handleError(callback, err);
151-
});
148+
err -> handleError(callback, err));
152149
}
153150

154151
private int getMaxRequestBlobSidecars() {

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRangeMessageHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,7 @@ public void onIncomingMessage(
178178
}
179179
responseCallbackWithLogging.completeSuccessfully();
180180
},
181-
error -> {
182-
peer.adjustDataColumnSidecarsRequest(maybeRequestKey.get(), 0);
183-
handleProcessingRequestError(error, responseCallbackWithLogging);
184-
});
181+
error -> handleProcessingRequestError(error, responseCallbackWithLogging));
185182
}
186183

187184
private int calculateRequestedCount(final DataColumnSidecarsByRangeRequestMessage message) {

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootMessageHandler.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger;
5151
import tech.pegasys.teku.statetransition.datacolumns.log.rpc.LoggingPeerId;
5252
import tech.pegasys.teku.statetransition.datacolumns.log.rpc.ReqRespResponseLogger;
53-
import tech.pegasys.teku.storage.api.ThrottlingStorageQueryChannel;
5453
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
5554

5655
/**
@@ -193,11 +192,7 @@ public void onIncomingMessage(
193192
}
194193
responseCallbackWithLogging.completeSuccessfully();
195194
})
196-
.finish(
197-
err -> {
198-
peer.adjustDataColumnSidecarsRequest(maybeRequestKey.get(), 0);
199-
handleError(responseCallbackWithLogging, err);
200-
});
195+
.finish(err -> handleError(responseCallbackWithLogging, err));
201196
}
202197

203198
private SafeFuture<Optional<DataColumnSidecar>> getNonCanonicalDataColumnSidecar(
@@ -233,7 +228,6 @@ private SafeFuture<Void> validateMinimumRequestEpoch(
233228
() ->
234229
combinedChainDataClient
235230
.getBlockByBlockRoot(identifier.blockRoot())
236-
.exceptionally(ThrottlingStorageQueryChannel::ignoreQueueIsFullException)
237231
.thenApply(maybeBlock -> maybeBlock.map(SignedBeaconBlock::getSlot)))
238232
.thenAcceptChecked(
239233
maybeSlot -> {
@@ -264,8 +258,7 @@ private SafeFuture<Optional<DataColumnSidecar>> retrieveDataColumnSidecar(
264258
}
265259
// Fallback to non-canonical sidecar if the canonical one is not found
266260
return getNonCanonicalDataColumnSidecar(identifier);
267-
})
268-
.exceptionally(ThrottlingStorageQueryChannel::ignoreQueueIsFullException);
261+
});
269262
}
270263

271264
private void handleError(

networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BeaconBlocksByRangeMessageHandlerTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,8 @@ public void shouldReturnErrorWhenFirstBlockIsMissing() {
283283

284284
// Requesting 5 blocks
285285
verify(peer, times(1)).approveBlocksRequest(any(), eq(Long.valueOf(count)));
286-
// Sending 0 blocks (First block is missing, return error)
287-
verify(peer, times(1))
288-
.adjustBlocksRequest(eq(maybeRequestKey.orElseThrow()), eq(Long.valueOf(0)));
286+
// Be protective: do not adjust due to error
287+
verify(peer, never()).adjustBlocksRequest(any(), anyLong());
289288

290289
final RpcException expectedError =
291290
new RpcException.ResourceUnavailableException(
@@ -309,9 +308,8 @@ public void shouldReturnErrorWhenEarliestHistoricalBlockUnknown() {
309308

310309
// Requesting 5 blocks
311310
verify(peer, times(1)).approveBlocksRequest(any(), eq(Long.valueOf(count)));
312-
// Sending 0 blocks (First block is missing, return error)
313-
verify(peer, times(1))
314-
.adjustBlocksRequest(eq(maybeRequestKey.orElseThrow()), eq(Long.valueOf(0)));
311+
// Be protective: do not adjust due to error
312+
verify(peer, never()).adjustBlocksRequest(any(), anyLong());
315313

316314
final RpcException expectedError =
317315
new RpcException.ResourceUnavailableException(

networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BeaconBlocksByRootMessageHandlerTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,8 @@ public void onIncomingMessage_interruptedByClosedStream() {
156156

157157
// Requesting 5 blocks
158158
verify(peer, times(1)).approveBlocksRequest(any(), eq(Long.valueOf(blocks.size())));
159-
// Request cancelled
160-
verify(peer, times(1))
161-
.adjustBlocksRequest(eq(maybeRequestKey.orElseThrow()), eq(Long.valueOf(0)));
159+
// Be protective: do not adjust due to error
160+
verify(peer, never()).adjustBlocksRequest(any(), anyLong());
162161

163162
// Check that we only asked for the first block
164163
verify(recentChainData, times(1)).retrieveSignedBlockByRoot(any());

networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRangeMessageHandlerTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,8 @@ public void shouldSendResourceUnavailableIfBlobSidecarsAreNotAvailable() {
283283

284284
// Requesting 5 * maxBlobsPerBlock blob sidecars
285285
verify(peer).approveBlobSidecarsRequest(any(), eq(count.times(maxBlobsPerBlock).longValue()));
286-
// Request cancelled
287-
verify(peer)
288-
.adjustBlobSidecarsRequest(eq(allowedObjectsRequest.orElseThrow()), eq(Long.valueOf(0)));
286+
// Be protective: do not adjust due to error
287+
verify(peer, never()).adjustBlobSidecarsRequest(any(), anyLong());
289288

290289
// blob sidecars should be available from epoch 5000, but they are
291290
// available from epoch 5010

0 commit comments

Comments
 (0)