Skip to content

Commit 75417ce

Browse files
committed
Keep checks bounded in terms of fds
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
1 parent 70b444f commit 75417ce

2 files changed

Lines changed: 80 additions & 40 deletions

File tree

cli/src/commands/authorities.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,15 +767,31 @@ impl AuthorityDiscovery {
767767
}
768768

769769
/// Returns a reference to the discovered peer info.
770+
#[allow(dead_code)]
770771
pub fn peer_info(&self) -> &HashMap<PeerId, Info> {
771772
&self.peer_info
772773
}
773774

774775
/// Returns a reference to the mapping between the authority discovery public key and the
775776
/// discovered addresses.
777+
#[allow(dead_code)]
776778
pub fn authority_to_details(&self) -> &HashMap<sr25519::PublicKey, HashSet<Multiaddr>> {
777779
&self.authority_to_details
778780
}
781+
782+
/// Consume the discovery state and return the collected results.
783+
///
784+
/// This drops the underlying swarm, freeing its network connections and
785+
/// file descriptors so that subsequent phases (e.g. TCP reachability
786+
/// checks) do not hit the open-file limit.
787+
pub fn into_results(
788+
self,
789+
) -> (
790+
HashMap<sr25519::PublicKey, HashSet<Multiaddr>>,
791+
HashMap<PeerId, Info>,
792+
) {
793+
(self.authority_to_details, self.peer_info)
794+
}
779795
}
780796

781797
/// Reach a single peer and query the identify protocol.

cli/src/commands/authority_check.rs

Lines changed: 64 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -172,60 +172,81 @@ async fn check_tcp_reachable(endpoint: &str, timeout: Duration) -> Result<(), St
172172
}
173173
}
174174

175-
/// Run TCP connectivity checks for all addresses concurrently.
175+
/// Run TCP connectivity checks for all addresses in batches.
176176
///
177177
/// Each entry in `checks` is `(authority_index, multiaddr, is_public)`.
178178
/// Returns results grouped by authority index.
179+
///
180+
/// Addresses are processed in batches of [`MAX_PARALLEL_CHECKS`] so that
181+
/// file descriptors from one batch are fully released before the next batch
182+
/// begins. This prevents "too many open files" errors on large networks
183+
/// (e.g. Kusama with 1000+ validators).
179184
async fn run_connectivity_checks(
180-
checks: Vec<(usize, Multiaddr, bool)>,
185+
mut checks: Vec<(usize, Multiaddr, bool)>,
181186
dial_timeout: Duration,
182187
w: &mut DualWriter,
183188
) -> HashMap<usize, Vec<AddressCheck>> {
184189
let total = checks.len();
185190
let public_count = checks.iter().filter(|(_, _, p)| *p).count();
186191
let start = Instant::now();
187192

188-
let results: Vec<(usize, AddressCheck)> = futures::stream::iter(checks)
189-
.map(|(idx, addr, is_public)| async move {
190-
let address_short = shorten_address(&addr);
191-
192-
let result = if !is_public {
193-
AddressResult::Skipped("private".into())
194-
} else if let Some(endpoint) = extract_tcp_endpoint(&addr) {
195-
match check_tcp_reachable(&endpoint, dial_timeout).await {
196-
Ok(()) => AddressResult::Ok,
197-
Err(e) => AddressResult::Failed(e),
198-
}
199-
} else {
200-
AddressResult::Skipped("unsupported transport".into())
201-
};
193+
let mut grouped: HashMap<usize, Vec<AddressCheck>> = HashMap::new();
194+
let mut checked = 0usize;
195+
196+
while !checks.is_empty() {
197+
let batch_end = checks.len().min(MAX_PARALLEL_CHECKS);
198+
let batch: Vec<_> = checks.drain(..batch_end).collect();
199+
200+
let batch_results: Vec<(usize, AddressCheck)> = futures::stream::iter(batch)
201+
.map(|(idx, addr, is_public)| async move {
202+
let address_short = shorten_address(&addr);
203+
204+
let result = if !is_public {
205+
AddressResult::Skipped("private".into())
206+
} else if let Some(endpoint) = extract_tcp_endpoint(&addr) {
207+
match check_tcp_reachable(&endpoint, dial_timeout).await {
208+
Ok(()) => AddressResult::Ok,
209+
Err(e) => AddressResult::Failed(e),
210+
}
211+
} else {
212+
AddressResult::Skipped("unsupported transport".into())
213+
};
214+
215+
(
216+
idx,
217+
AddressCheck {
218+
address_short,
219+
is_public,
220+
result,
221+
},
222+
)
223+
})
224+
.buffer_unordered(MAX_PARALLEL_CHECKS)
225+
.collect()
226+
.await;
227+
228+
checked += batch_results.len();
229+
for (idx, check) in batch_results {
230+
grouped.entry(idx).or_default().push(check);
231+
}
202232

203-
(
204-
idx,
205-
AddressCheck {
206-
address_short,
207-
is_public,
208-
result,
209-
},
210-
)
211-
})
212-
.buffer_unordered(MAX_PARALLEL_CHECKS)
213-
.collect()
214-
.await;
233+
let _ = write!(
234+
w,
235+
"\r Checked {}/{} addresses ...",
236+
checked, total,
237+
);
238+
let _ = w.flush();
239+
}
215240

216241
let elapsed = start.elapsed();
217242
let _ = writeln!(
218243
w,
219-
" Checked {} addresses ({} public) in {:.1}s",
244+
"\r Checked {} addresses ({} public) in {:.1}s ",
220245
total,
221246
public_count,
222247
elapsed.as_secs_f64()
223248
);
224249

225-
let mut grouped: HashMap<usize, Vec<AddressCheck>> = HashMap::new();
226-
for (idx, check) in results {
227-
grouped.entry(idx).or_default().push(check);
228-
}
229250
grouped
230251
}
231252

@@ -684,15 +705,18 @@ pub async fn check_authorities(
684705
discovery.set_show_progress(true);
685706
discovery.discover().await;
686707

687-
let dht_count = discovery.authority_to_details().len();
708+
// Extract the results and drop the swarm so its network connections and
709+
// file descriptors are released before we open new TCP sockets in Phase 4.
710+
let (authority_to_details, peer_info) = discovery.into_results();
711+
712+
let dht_count = authority_to_details.len();
688713
let identified_authorities = authorities
689714
.iter()
690715
.filter(|auth| {
691-
discovery
692-
.authority_to_details()
716+
authority_to_details
693717
.get(*auth)
694718
.and_then(|addrs| addrs.iter().find_map(get_peer_id))
695-
.map_or(false, |pid| discovery.peer_info().contains_key(&pid))
719+
.map_or(false, |pid| peer_info.contains_key(&pid))
696720
})
697721
.count();
698722
writeln!(
@@ -708,7 +732,7 @@ pub async fn check_authorities(
708732
// Phase 3: TCP connectivity checks on every discovered address.
709733
let mut pending_checks: Vec<(usize, Multiaddr, bool)> = Vec::new();
710734
for (idx, authority) in authorities.iter().enumerate() {
711-
if let Some(addrs) = discovery.authority_to_details().get(authority) {
735+
if let Some(addrs) = authority_to_details.get(authority) {
712736
for addr in addrs {
713737
let is_pub = is_public_address(addr);
714738
pending_checks.push((idx, addr.clone(), is_pub));
@@ -736,7 +760,7 @@ pub async fn check_authorities(
736760

737761
let identity_name = identity_names.get(authority).cloned();
738762

739-
let Some(addrs) = discovery.authority_to_details().get(authority) else {
763+
let Some(addrs) = authority_to_details.get(authority) else {
740764
results.push(AuthorityResult {
741765
authority_ss58,
742766
identity_name,
@@ -750,7 +774,7 @@ pub async fn check_authorities(
750774

751775
let peer_id = addrs.iter().find_map(get_peer_id);
752776
let agent_version = peer_id
753-
.and_then(|pid| discovery.peer_info().get(&pid))
777+
.and_then(|pid| peer_info.get(&pid))
754778
.map(|info| info.agent_version.clone());
755779
let addresses = check_results.remove(&idx).unwrap_or_default();
756780

0 commit comments

Comments
 (0)