Skip to content

Commit ceae760

Browse files
lnyngfacebook-github-bot
authored andcommitted
Collector plugin do not send error to consumer
Summary: Below has a collector plugin framework to collect extra data async. Currently if the collector fails (timeout, incorrect config, dependency missing etc), an error is returned to the collecting thread, and also stored as a shared value later retrieved by the consumer (main) thread. When the consumer sees the error, the whole sample fails. This is undesirable because we already log the error in the collecting thread. Below follows the model that most failures result in a None field of the sample instead of failing the whole sample. This diff updates to log the error in the collecting thread, and report None to the consumer for any error. Differential Revision: D76748582 fbshipit-source-id: 96b095e215cedb8fbb176e6801a25751f047783b
1 parent 9d940b9 commit ceae760

2 files changed

Lines changed: 29 additions & 57 deletions

File tree

below/model/src/collector.rs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,8 @@ fn collect_sample(
275275
if let Some(gpu_stats_receiver) = &options.gpu_stats_receiver {
276276
// It is possible to receive no sample if the
277277
// collector has not updated since the previous take
278-
// or the collector encountered a recoverable error
279-
// (e.g. timeout). The behavior for now is to store an
280-
// empty map. Alternatively we could store the latest
281-
// sample and read that, but then we have to decide how
282-
// stale the data can be.
283-
Some(
284-
gpu_stats_receiver
285-
.try_take()
286-
.context("GPU stats collector had an error")?
287-
.unwrap_or_default(),
288-
)
278+
// or the collector encountered errors (e.g. timeout).
279+
gpu_stats_receiver.take()
289280
} else {
290281
None
291282
}
@@ -320,12 +311,7 @@ fn collect_sample(
320311
}
321312
},
322313
tc: if let Some(tc_stats_receiver) = &options.tc_stats_receiver {
323-
Some(
324-
tc_stats_receiver
325-
.try_take()
326-
.context("TC stats collector had an error")?
327-
.unwrap_or_default(),
328-
)
314+
tc_stats_receiver.take()
329315
} else {
330316
None
331317
},
@@ -396,13 +382,13 @@ fn collect_cgroup_sample(
396382
None
397383
};
398384
Ok(CgroupSample {
399-
cpu_stat: wrap(reader.read_cpu_stat())?.map(Into::into),
385+
cpu_stat: wrap(reader.read_cpu_stat())?,
400386
io_stat,
401387
tids_current: wrap(reader.read_pids_current())?,
402388
tids_max: wrap(reader.read_pids_max())?,
403389
memory_current: wrap(reader.read_memory_current().map(|v| v as i64))?,
404-
memory_stat: wrap(reader.read_memory_stat())?.map(Into::into),
405-
pressure: pressure_wrap(reader.read_pressure())?.map(Into::into),
390+
memory_stat: wrap(reader.read_memory_stat())?,
391+
pressure: pressure_wrap(reader.read_pressure())?,
406392
// We transpose at the end here to convert the
407393
// Option<Result<BTreeMap... into Result<Option<BTreeMap and
408394
// then bail any errors with `?` - leaving us with the
@@ -448,17 +434,17 @@ fn collect_cgroup_sample(
448434
memory_swap_max: wrap(reader.read_memory_swap_max())?,
449435
memory_zswap_max: wrap(reader.read_memory_zswap_max())?,
450436
memory_zswap_writeback: wrap(reader.read_memory_zswap_writeback())?,
451-
memory_events: wrap(reader.read_memory_events())?.map(Into::into),
452-
memory_events_local: wrap(reader.read_memory_events_local())?.map(Into::into),
437+
memory_events: wrap(reader.read_memory_events())?,
438+
memory_events_local: wrap(reader.read_memory_events_local())?,
453439
inode_number: match reader.read_inode_number() {
454440
Ok(st_ino) => Some(st_ino as i64),
455441
Err(e) => {
456442
error!(logger, "Fail to collect inode number: {:#}", e);
457443
None
458444
}
459445
},
460-
cgroup_stat: wrap(reader.read_cgroup_stat())?.map(Into::into),
461-
memory_numa_stat: wrap(reader.read_memory_numa_stat())?.map(Into::into),
446+
cgroup_stat: wrap(reader.read_cgroup_stat())?,
447+
memory_numa_stat: wrap(reader.read_memory_numa_stat())?,
462448
cpuset_cpus: wrap(reader.read_cpuset_cpus())?,
463449
cpuset_cpus_effective: wrap(reader.read_cpuset_cpus_effective())?,
464450
cpuset_mems: wrap(reader.read_cpuset_mems())?,

below/model/src/collector_plugin.rs

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,14 @@ pub trait AsyncCollectorPlugin {
2828
//
2929
// On success, this should return `Ok(Some(sample))`.
3030
//
31-
// On a recoverable error, this should return `Ok(None)`. The
31+
// On a error handled by the collector, this should return `Ok(None)`. The
3232
// function itself should consume the error (e.g. log the error)
33-
// so that it does not get sent to a consumer thread
3433
//
35-
// On unrecoverable error, this should return `Err(e)`.
34+
// On unhandled errors, this should return `Err(e)`.
3635
async fn try_collect(&mut self) -> Result<Option<Self::T>>;
3736
}
3837

39-
type SharedVal<T> = Arc<Mutex<Option<Result<T>>>>;
38+
type SharedVal<T> = Arc<Mutex<Option<T>>>;
4039

4140
// A wrapper around an `AsyncCollectorPlugin` that allows samples to
4241
// be sent to a `Consumer`.
@@ -50,35 +49,27 @@ impl<T, Plugin: AsyncCollectorPlugin<T = T>> AsyncCollector<T, Plugin> {
5049
Self { shared, plugin }
5150
}
5251

53-
fn update(&self, value: Result<T>) {
54-
*self.shared.lock().unwrap() = Some(value);
52+
fn update(&self, value: Option<T>) {
53+
*self.shared.lock().unwrap() = value;
5554
}
5655

5756
// Collect sample and update value shared with consumer. Replaces
5857
// any existing sample that consumer has not consumed yet.
5958
//
6059
// Returns true if data was collected and sent. Returns false if
6160
// there was a recoverable error. Returns an error if there was an
62-
// unrecoverable error.
61+
// unrecoverable error. Errors are never sent to the consumer.
6362
pub async fn collect_and_update(&mut self) -> Result<bool> {
64-
let collect_result = self
65-
.plugin
66-
.try_collect()
67-
.await
68-
.context("Collector failed to read");
69-
70-
match collect_result {
71-
Ok(Some(sample)) => {
72-
self.update(Ok(sample));
73-
Ok(true)
74-
}
75-
Ok(None) => Ok(false),
63+
let maybe_sample = match self.plugin.try_collect().await {
64+
Ok(maybe_sample) => maybe_sample,
7665
Err(e) => {
77-
let error_msg = format!("{:#}", e);
78-
self.update(Err(e));
79-
Err(anyhow!(error_msg))
66+
self.update(None);
67+
return Err(e).context("Collector failed to collect");
8068
}
81-
}
69+
};
70+
let collected = maybe_sample.is_some();
71+
self.update(maybe_sample);
72+
Ok(collected)
8273
}
8374
}
8475

@@ -93,13 +84,8 @@ impl<T> Consumer<T> {
9384
}
9485

9586
// Try to get latest sample of data if it exists.
96-
// Returns the error if the collector sent an error.
97-
pub fn try_take(&self) -> Result<Option<T>> {
98-
match self.shared.lock().unwrap().take() {
99-
Some(Ok(v)) => Ok(Some(v)),
100-
Some(Err(e)) => Err(e),
101-
None => Ok(None),
102-
}
87+
pub fn take(&self) -> Option<T> {
88+
self.shared.lock().unwrap().take()
10389
}
10490
}
10591

@@ -169,15 +155,15 @@ mod test {
169155
});
170156
// Collector overwriting sample
171157
barrier.wait(); // <-- 1
172-
assert_eq!(Some(2), consumer.try_take().unwrap());
158+
assert_eq!(Some(2), consumer.take());
173159
barrier.wait(); // <-- 2
174160
// Collector sending None
175161
barrier.wait(); // <-- 3
176-
assert_eq!(None, consumer.try_take().unwrap());
162+
assert!(consumer.take().is_none());
177163
barrier.wait(); // <-- 4
178164
// Collector sending error
179165
barrier.wait(); // <-- 5
180-
assert!(consumer.try_take().is_err());
166+
assert!(consumer.take().is_none());
181167

182168
handle.join().unwrap();
183169
}

0 commit comments

Comments
 (0)