Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 170 additions & 12 deletions packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ enum FetchResult {
FailureMaxAttempts = "failure_max_attempts",
}

class UnknownBlockRateLimitedError extends Error {
constructor(message: string) {
super(message);
this.name = "UnknownBlockRateLimitedError";
}
}

function getRateLimitedUntilMs(e: unknown): number | null {
if (!(e instanceof RequestError)) {
return null;
}

switch (e.type.code) {
case RequestErrorCode.RESP_RATE_LIMITED:
case RequestErrorCode.REQUEST_SELF_RATE_LIMITED:
return e.type.rateLimitedUntilMs ?? null;
default:
return null;
}
}

/**
* BlockInputSync is a class that handles ReqResp to find blocks and data related to a specific blockRoot. The
* blockRoot may have been found via object gossip, or the API. Gossip objects that can trigger a search are block,
Expand Down Expand Up @@ -106,6 +127,7 @@ export class BlockInputSync {
private readonly maxPendingBlocks;
private subscribedToNetworkEvents = false;
private peerBalancer: UnknownBlockPeerBalancer;
private rateLimitBackoffTimeout: NodeJS.Timeout | undefined;

constructor(
private readonly config: ChainForkConfig,
Expand Down Expand Up @@ -155,6 +177,7 @@ export class BlockInputSync {

unsubscribeFromNetwork(): void {
this.logger.verbose("BlockInputSync disabled.");
this.clearRateLimitBackoffTimer();
this.chain.emitter.off(ChainEvent.unknownBlockRoot, this.onUnknownBlockRoot);
this.chain.emitter.off(ChainEvent.unknownEnvelopeBlockRoot, this.onUnknownEnvelopeBlockRoot);
this.chain.emitter.off(ChainEvent.incompleteBlockInput, this.onIncompleteBlockInput);
Expand Down Expand Up @@ -402,6 +425,7 @@ export class BlockInputSync {
private onPeerDisconnected = (data: NetworkEventData[NetworkEvent.peerDisconnected]): void => {
const peerId = data.peer;
this.peerBalancer.onPeerDisconnected(peerId);
this.scheduleRateLimitBackoffRetry();
};

/**
Expand Down Expand Up @@ -519,7 +543,7 @@ export class BlockInputSync {
*/
private triggerUnknownBlockSearch = (): void => {
// Cheap early stop to prevent calling the network.getConnectedPeers()
if (this.pendingBlocks.size === 0 && this.pendingPayloads.size === 0) {
if (!this.subscribedToNetworkEvents || (this.pendingBlocks.size === 0 && this.pendingPayloads.size === 0)) {
return;
}

Expand Down Expand Up @@ -607,6 +631,36 @@ export class BlockInputSync {
}
};

private scheduleRateLimitBackoffRetry(): void {
this.clearRateLimitBackoffTimer();

if (!this.subscribedToNetworkEvents || (this.pendingBlocks.size === 0 && this.pendingPayloads.size === 0)) {
return;
}

const now = Date.now();
const retryAt = this.peerBalancer.getNextRateLimitRetryAt();
if (retryAt === null) {
return;
}

this.rateLimitBackoffTimeout = setTimeout(
() => {
this.rateLimitBackoffTimeout = undefined;
this.triggerUnknownBlockSearch();
this.scheduleRateLimitBackoffRetry();
},
Math.max(0, retryAt - now)
);
}

private clearRateLimitBackoffTimer(): void {
if (this.rateLimitBackoffTimeout !== undefined) {
clearTimeout(this.rateLimitBackoffTimeout);
this.rateLimitBackoffTimeout = undefined;
}
}

private async downloadBlock(block: BlockInputSyncCacheItem): Promise<void> {
if (block.status !== PendingBlockInputStatus.pending) {
return;
Expand Down Expand Up @@ -678,6 +732,16 @@ export class BlockInputSync {
this.onUnknownBlockRoot({rootHex: pending.blockInput.parentRootHex, source: BlockInputSource.byRoot});
}
} else {
if (res.err instanceof UnknownBlockRateLimitedError) {
const pendingBlock = this.pendingBlocks.get(rootHex);
if (pendingBlock) {
pendingBlock.status = PendingBlockInputStatus.pending;
}
this.logger.debug("Deferring unknown block download due to peer rate limit", logCtx, res.err);
this.scheduleRateLimitBackoffRetry();
return;
}

this.metrics?.blockInputSync.downloadedBlocksError.inc();
this.logger.debug("Ignoring unknown block root after many failed downloads", logCtx, res.err);
this.removeAndDownScoreAllDescendants(block);
Expand Down Expand Up @@ -994,12 +1058,19 @@ export class BlockInputSync {
let envelope = payloadInput?.hasPayloadEnvelope() ? payloadInput.getPayloadEnvelope() : undefined;

let i = 0;
let deferredByRateLimit = false;
while (i++ < this.getMaxDownloadAttempts()) {
const pendingColumns = payloadInput?.hasAllData()
? new Set<number>()
: new Set(payloadInput?.getMissingSampledColumnMeta().missing ?? []);
const peerMeta = this.peerBalancer.bestPeerForPendingColumns(pendingColumns, excludedPeers);
if (peerMeta === null) {
if (this.peerBalancer.getNextRateLimitRetryAt(pendingColumns, excludedPeers) !== null) {
throw new UnknownBlockRateLimitedError(
`Error fetching payload by root slot=${slot} root=${rootHex} after ${i}: peers with needed columns are rate-limited`
);
}

throw Error(
`Error fetching payload by root slot=${slot} root=${rootHex} after ${i}: cannot find peer with needed columns=${prettyPrintIndices(Array.from(pendingColumns))}`
);
Expand Down Expand Up @@ -1076,7 +1147,12 @@ export class BlockInputSync {
e as Error
);

if (e instanceof RequestError) {
const rateLimitedUntilMs = getRateLimitedUntilMs(e);
if (rateLimitedUntilMs !== null) {
deferredByRateLimit = true;
this.peerBalancer.onRateLimited(peerId, rateLimitedUntilMs);
this.scheduleRateLimitBackoffRetry();
} else if (e instanceof RequestError) {
switch (e.type.code) {
case RequestErrorCode.REQUEST_RATE_LIMITED:
case RequestErrorCode.REQUEST_TIMEOUT:
Expand All @@ -1093,6 +1169,12 @@ export class BlockInputSync {
}
}

if (deferredByRateLimit && this.peerBalancer.getNextRateLimitRetryAt() !== null) {
throw new UnknownBlockRateLimitedError(
`Error fetching payload with slot=${slot} root=${rootHex} after ${i - 1} attempts: peers are rate-limited`
);
}

throw Error(`Error fetching payload with slot=${slot} root=${rootHex} after ${i - 1} attempts.`);
}

Expand Down Expand Up @@ -1176,13 +1258,20 @@ export class BlockInputSync {
}

let i = 0;
let deferredByRateLimit = false;
while (i++ < this.getMaxDownloadAttempts()) {
const pendingColumns =
isPendingBlockInput(cacheItem) && isBlockInputColumns(cacheItem.blockInput)
? new Set(cacheItem.blockInput.getMissingSampledColumnMeta().missing)
: defaultPendingColumns;
const peerMeta = this.peerBalancer.bestPeerForPendingColumns(pendingColumns, excludedPeers);
if (peerMeta === null) {
if (this.peerBalancer.getNextRateLimitRetryAt(pendingColumns, excludedPeers) !== null) {
throw new UnknownBlockRateLimitedError(
`Error fetching UnknownBlockRoot slot=${slot} root=${rootHex} after ${i}: peers with needed columns are rate-limited`
);
}

// no more peer with needed columns to try, throw error
const message = `Error fetching UnknownBlockRoot slot=${slot} root=${rootHex} after ${i}: cannot find peer with needed columns=${prettyPrintIndices(Array.from(pendingColumns))}`;
this.metrics?.blockInputSync.fetchTimeSec.observe(
Expand Down Expand Up @@ -1238,16 +1327,23 @@ export class BlockInputSync {
} else if (e instanceof RequestError) {
// should look into req_resp metrics in this case
downloadByRootMetrics?.error.inc({code: "req_resp", client: peerClient});
switch (e.type.code) {
case RequestErrorCode.REQUEST_RATE_LIMITED:
case RequestErrorCode.RESP_RATE_LIMITED:
case RequestErrorCode.REQUEST_SELF_RATE_LIMITED:
case RequestErrorCode.REQUEST_TIMEOUT:
// do not exclude peer for these errors
break;
default:
excludedPeers.add(peerId);
break;
const rateLimitedUntilMs = getRateLimitedUntilMs(e);
if (rateLimitedUntilMs !== null) {
deferredByRateLimit = true;
this.peerBalancer.onRateLimited(peerId, rateLimitedUntilMs);
this.scheduleRateLimitBackoffRetry();
} else {
switch (e.type.code) {
case RequestErrorCode.REQUEST_RATE_LIMITED:
case RequestErrorCode.RESP_RATE_LIMITED:
case RequestErrorCode.REQUEST_SELF_RATE_LIMITED:
case RequestErrorCode.REQUEST_TIMEOUT:
// do not exclude peer for these errors
break;
default:
excludedPeers.add(peerId);
break;
}
}
} else {
// investigate if this happens
Expand Down Expand Up @@ -1275,6 +1371,10 @@ export class BlockInputSync {

const message = `Error fetching BlockInput with slot=${slot} root=${rootHex} after ${i - 1} attempts.`;

if (deferredByRateLimit && this.peerBalancer.getNextRateLimitRetryAt() !== null) {
throw new UnknownBlockRateLimitedError(`${message} Peers are rate-limited.`);
}

if (!isPendingBlockInput(cacheItem)) {
throw Error(`${message} No block and no data was found.`);
}
Expand Down Expand Up @@ -1406,10 +1506,12 @@ export class BlockInputSync {
export class UnknownBlockPeerBalancer {
readonly peersMeta: Map<PeerIdStr, PeerSyncMeta>;
readonly activeRequests: Map<PeerIdStr, number>;
readonly rateLimitedUntilByPeer: Map<PeerIdStr, number>;

constructor() {
this.peersMeta = new Map();
this.activeRequests = new Map();
this.rateLimitedUntilByPeer = new Map();
}

/** Trigger on each peer re-status */
Expand All @@ -1424,6 +1526,41 @@ export class UnknownBlockPeerBalancer {
onPeerDisconnected(peerId: PeerIdStr): void {
this.peersMeta.delete(peerId);
this.activeRequests.delete(peerId);
this.rateLimitedUntilByPeer.delete(peerId);
}

onRateLimited(peerId: PeerIdStr, rateLimitedUntilMs: number): void {
this.rateLimitedUntilByPeer.set(peerId, rateLimitedUntilMs);
}

getNextRateLimitRetryAt(pendingColumns?: Set<number>, excludedPeers?: Set<PeerIdStr>): number | null {
const now = Date.now();
let retryAt: number | null = null;

for (const [peerId, rateLimitedUntil] of this.rateLimitedUntilByPeer.entries()) {
if (rateLimitedUntil <= now) {
this.rateLimitedUntilByPeer.delete(peerId);
continue;
Comment thread
wemeetagain marked this conversation as resolved.
}

if (excludedPeers?.has(peerId)) {
continue;
}

const syncMeta = this.peersMeta.get(peerId);
if (syncMeta === undefined) {
this.rateLimitedUntilByPeer.delete(peerId);
continue;
}

if (pendingColumns !== undefined && !this.peerHasPendingColumns(syncMeta, pendingColumns)) {
continue;
}

retryAt = Math.min(retryAt ?? rateLimitedUntil, rateLimitedUntil);
}

return retryAt;
}

/**
Expand Down Expand Up @@ -1472,6 +1609,7 @@ export class UnknownBlockPeerBalancer {
}

private filterPeers(pendingDataColumns: Set<number>, excludedPeers: Set<PeerIdStr>): PeerIdStr[] {
const now = Date.now();
let maxColumnCount = 0;
const considerPeers: {peerId: PeerIdStr; columnCount: number}[] = [];
for (const [peerId, syncMeta] of this.peersMeta.entries()) {
Expand All @@ -1480,12 +1618,24 @@ export class UnknownBlockPeerBalancer {
continue;
}

const rateLimitedUntil = this.rateLimitedUntilByPeer.get(peerId);
if (rateLimitedUntil !== undefined) {
if (now < rateLimitedUntil) {
continue;
}
this.rateLimitedUntilByPeer.delete(peerId);
}

const activeRequests = this.activeRequests.get(peerId) ?? 0;
if (activeRequests >= MAX_CONCURRENT_REQUESTS) {
// should return peer with no more than MAX_CONCURRENT_REQUESTS active requests
continue;
}

if (!this.peerHasPendingColumns(syncMeta, pendingDataColumns)) {
continue;
}

if (pendingDataColumns.size === 0) {
considerPeers.push({peerId, columnCount: 0});
continue;
Expand Down Expand Up @@ -1519,4 +1669,12 @@ export class UnknownBlockPeerBalancer {

return eligiblePeers;
}

private peerHasPendingColumns(syncMeta: PeerSyncMeta, pendingDataColumns: Set<number>): boolean {
if (pendingDataColumns.size === 0) {
return true;
}

return syncMeta.custodyColumns.some((column) => pendingDataColumns.has(column));
}
}
20 changes: 16 additions & 4 deletions packages/beacon-node/test/e2e/network/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,31 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);

// First request: responder sends RATE_LIMITED → detected as RESP_RATE_LIMITED
await expectRejectedWithLodestarError(
await expectRejectedWithRateLimitError(
netA.sendBeaconBlocksByRange(peerIdB, {startSlot: 0, step: 1, count: 1}),
new RequestError({code: RequestErrorCode.RESP_RATE_LIMITED})
RequestErrorCode.RESP_RATE_LIMITED
);

// Second request: SelfRateLimiter has the peer in backoff → blocked before sending
await expectRejectedWithLodestarError(
await expectRejectedWithRateLimitError(
netA.sendBeaconBlocksByRange(peerIdB, {startSlot: 0, step: 1, count: 1}),
new RequestError({code: RequestErrorCode.REQUEST_SELF_RATE_LIMITED})
RequestErrorCode.REQUEST_SELF_RATE_LIMITED
);
});
}

async function expectRejectedWithRateLimitError(promise: Promise<unknown>, code: RequestErrorCode): Promise<void> {
try {
const value = await promise;
throw Error(`Expected promise to reject but returned value: \n\n\t${JSON.stringify(value, null, 2)}`);
} catch (e) {
expect(e).toBeInstanceOf(RequestError);
const type = (e as RequestError).type as {code: RequestErrorCode; rateLimitedUntilMs?: number};
expect(type.code).toBe(code);
expect(type.rateLimitedUntilMs).toEqual(expect.any(Number));
}
}

function getEmptyEncodedPayloadSignedBeaconBlock(config: ChainForkConfig): ResponseOutgoing {
return wrapBlockAsEncodedPayload(config, config.getForkTypes(0).SignedBeaconBlock.defaultValue());
}
Expand Down
5 changes: 4 additions & 1 deletion packages/beacon-node/test/unit/sync/range/chain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ describe("sync / range / chain", () => {
const downloadByRange: SyncChainFns["downloadByRange"] = async (peerMeta, request, _partialDownload) => {
if (peerMeta.peerId === peer1) {
peer1Downloads++;
throw new RequestError({code: RequestErrorCode.RESP_RATE_LIMITED});
throw new RequestError({
code: RequestErrorCode.RESP_RATE_LIMITED,
rateLimitedUntilMs: Date.now() + 50,
});
}

// peer2 returns blocks normally
Expand Down
Loading
Loading