|
1 | 1 | use anyhow::Context; |
2 | 2 | use kumo_prometheus::declare_metric; |
3 | 3 | use parking_lot::Mutex; |
4 | | -use std::collections::HashMap; |
| 4 | +use std::collections::{BTreeMap, HashMap}; |
| 5 | +use std::fmt::Write; |
5 | 6 | use std::future::Future; |
6 | 7 | use std::sync::atomic::{AtomicUsize, Ordering}; |
7 | 8 | use std::sync::{Arc, LazyLock}; |
8 | 9 | use tokio::runtime::Handle; |
9 | 10 | use tokio::task::JoinHandle; |
| 11 | +use tokio::time::Duration; |
10 | 12 |
|
11 | 13 | pub static RUNTIME: LazyLock<Runtime> = |
12 | 14 | LazyLock::new(|| Runtime::new("localset", |cpus| cpus / 4, &LOCALSET_THREADS).unwrap()); |
@@ -54,6 +56,64 @@ struct RuntimeInner { |
54 | 56 | name_prefix: String, |
55 | 57 | } |
56 | 58 |
|
| 59 | +fn runtimes_by_name() -> BTreeMap<String, Runtime> { |
| 60 | + RUNTIMES |
| 61 | + .lock() |
| 62 | + .iter() |
| 63 | + .map(|(name, rt)| (name.clone(), rt.clone())) |
| 64 | + .collect() |
| 65 | +} |
| 66 | + |
| 67 | +#[cfg(not(target_os = "linux"))] |
| 68 | +pub async fn dump_all_runtimes(timeout_duration: Duration) -> String { |
| 69 | + "Runtime state dumping is not supported on this system".into() |
| 70 | +} |
| 71 | + |
| 72 | +// NOTE: at the time of writing, calling this once will prevent a |
| 73 | +// subsequent graceful shutdown from completing. |
| 74 | +// |
| 75 | +// You will need to call this multiple times to allow the graceful |
| 76 | +// shutdown to "clock through" and finish successfully. |
| 77 | +// I do not know what exactly causes that stutter/stickiness. |
| 78 | +#[cfg(target_os = "linux")] |
| 79 | +pub async fn dump_all_runtimes(timeout_duration: Duration) -> String { |
| 80 | + let runtimes = runtimes_by_name(); |
| 81 | + let mut dumps = vec![]; |
| 82 | + |
| 83 | + async fn collect_dump( |
| 84 | + label: &str, |
| 85 | + handle: &tokio::runtime::Handle, |
| 86 | + timeout_duration: Duration, |
| 87 | + ) -> String { |
| 88 | + match tokio::time::timeout(timeout_duration, handle.dump()).await { |
| 89 | + Err(_) => format!("Runtime {label}: Timeout while collecting runtime dump"), |
| 90 | + Ok(dump) => { |
| 91 | + let label = label.to_string(); |
| 92 | + match tokio::task::spawn_blocking(move || { |
| 93 | + let mut output = format!("Runtime: {label}\n"); |
| 94 | + for (i, task) in dump.tasks().iter().enumerate() { |
| 95 | + let trace = task.trace(); |
| 96 | + writeln!(&mut output, "{label} TASK {i}:\n{trace}").ok(); |
| 97 | + } |
| 98 | + output |
| 99 | + }) |
| 100 | + .await |
| 101 | + .map_err(|err| format!("spawn_blocking: join failed: {err:#}")) |
| 102 | + { |
| 103 | + Ok(s) | Err(s) => s, |
| 104 | + } |
| 105 | + } |
| 106 | + } |
| 107 | + } |
| 108 | + |
| 109 | + dumps.push(collect_dump("main", &get_main_runtime(), timeout_duration).await); |
| 110 | + for (label, rt) in runtimes { |
| 111 | + dumps.push(collect_dump(&label, rt.handle(), timeout_duration).await); |
| 112 | + } |
| 113 | + |
| 114 | + dumps.join("\n\n") |
| 115 | +} |
| 116 | + |
57 | 117 | #[derive(Clone)] |
58 | 118 | pub struct Runtime { |
59 | 119 | inner: Arc<RuntimeInner>, |
|
0 commit comments