Skip to content

Commit 19341dc

Browse files
committed
lruttl: add params to control timeout and retry on sema timeout
These are hooked up only for memoize at this time. No default behavior is changed by this commit, but you can optionally specify these parameters in order to change the behavior.
1 parent d21b98a commit 19341dc

3 files changed

Lines changed: 69 additions & 20 deletions

File tree

crates/lruttl/src/lib.rs

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::collections::HashSet;
88
use std::fmt::Debug;
99
use std::future::Future;
1010
use std::hash::Hash;
11-
use std::sync::atomic::{AtomicUsize, Ordering};
11+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1212
use std::sync::{Arc, LazyLock, Weak};
1313
use tokio::sync::Semaphore;
1414
use tokio::time::{timeout, Duration, Instant};
@@ -25,6 +25,8 @@ struct Inner<K: Clone + Hash + Eq + Debug, V: Clone + Send + Sync + Debug> {
2525
capacity: AtomicUsize,
2626
cache: DashMap<K, Item<V>>,
2727
lru_samples: AtomicUsize,
28+
sema_timeout_milliseconds: AtomicUsize,
29+
retry_on_sema_timeout: AtomicBool,
2830
lookup_counter: IntCounter,
2931
evict_counter: IntCounter,
3032
expire_counter: IntCounter,
@@ -525,6 +527,8 @@ impl<
525527
tick: AtomicUsize::new(0),
526528
capacity: AtomicUsize::new(capacity),
527529
lru_samples: AtomicUsize::new(10),
530+
sema_timeout_milliseconds: AtomicUsize::new(120_000),
531+
retry_on_sema_timeout: AtomicBool::new(false),
528532
lookup_counter,
529533
evict_counter,
530534
expire_counter,
@@ -552,6 +556,18 @@ impl<
552556
Self { inner }
553557
}
554558

559+
pub fn set_retry_on_sema_timeout(&self, value: bool) {
560+
self.inner
561+
.retry_on_sema_timeout
562+
.store(value, Ordering::Relaxed);
563+
}
564+
565+
pub fn set_sema_timeout(&self, duration: Duration) {
566+
self.inner
567+
.sema_timeout_milliseconds
568+
.store(duration.as_millis() as usize, Ordering::Relaxed);
569+
}
570+
555571
pub fn clear(&self) -> usize {
556572
self.inner.clear()
557573
}
@@ -722,22 +738,38 @@ impl<
722738
}
723739

724740
let wait_count = DecOnDrop::new(self.inner.wait_gauge.clone());
725-
let wait_result =
726-
match timeout(Duration::from_secs(120), sema.acquire_owned()).await {
727-
Err(_) => {
728-
self.inner.error_counter.inc();
729-
tracing::error!(
730-
"{} semaphore acquire for {name:?} timed out",
741+
let wait_result = match timeout(
742+
Duration::from_millis(
743+
self.inner.sema_timeout_milliseconds.load(Ordering::Relaxed) as u64,
744+
),
745+
sema.acquire_owned(),
746+
)
747+
.await
748+
{
749+
Err(_) => {
750+
self.inner.error_counter.inc();
751+
752+
if self.inner.retry_on_sema_timeout.load(Ordering::Relaxed) {
753+
tracing::warn!(
754+
"{} semaphore acquire for {name:?} timed out, \
755+
will restart cache resolve.",
731756
self.inner.name
732757
);
733-
return Err(Arc::new(anyhow::anyhow!(
734-
"{} lookup for {name:?} \
735-
timed out on semaphore acquire",
736-
self.inner.name
737-
)));
758+
continue 'retry;
738759
}
739-
Ok(r) => r,
740-
};
760+
761+
tracing::error!(
762+
"{} semaphore acquire for {name:?} timed out",
763+
self.inner.name
764+
);
765+
return Err(Arc::new(anyhow::anyhow!(
766+
"{} lookup for {name:?} \
767+
timed out on semaphore acquire",
768+
self.inner.name
769+
)));
770+
}
771+
Ok(r) => r,
772+
};
741773

742774
drop(wait_count);
743775

crates/mod-memoize/src/lib.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ pub struct MemoizeParams {
6666
pub name: String,
6767
#[serde(default)]
6868
pub invalidate_with_epoch: bool,
69+
#[serde(default)]
70+
pub retry_on_populate_timeout: bool,
71+
#[serde(default, with = "duration_serde")]
72+
pub populate_timeout: Option<Duration>,
6973
}
7074

7175
#[derive(Clone, Hash, Eq, PartialEq)]
@@ -397,12 +401,18 @@ pub fn register(lua: &Lua) -> anyhow::Result<()> {
397401
}
398402
changed
399403
});
400-
CACHES
401-
.entry(cache_name.to_string())
402-
.or_insert_with(|| MemoizeCache {
404+
CACHES.entry(cache_name.to_string()).or_insert_with(|| {
405+
let cache = LruCacheWithTtl::new(cache_name.clone(), params.capacity);
406+
cache.set_retry_on_sema_timeout(params.retry_on_populate_timeout);
407+
if let Some(duration) = params.populate_timeout {
408+
cache.set_sema_timeout(duration);
409+
}
410+
411+
MemoizeCache {
403412
params: params.clone(),
404-
cache: Arc::new(LruCacheWithTtl::new(cache_name.clone(), params.capacity)),
405-
});
413+
cache: Arc::new(cache),
414+
}
415+
});
406416

407417
let lookup_counter = CACHE_LOOKUP
408418
.get_metric_with_label_values(&[&cache_name])

docs/reference/kumo/memoize.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ the *memoized function*.
7171
The parameters it accepts are:
7272

7373
* *FUNCTION* - the function or lambda which will be called when there is a cache miss.
74-
When it is called, it will be passed the parameters that were passed to the *memoized function*.
74+
When it is called, it will be passed the parameters that were passed to the *memoized function* in order to populate the cache.
7575
* *PARAMS* is a required lua table with the following fields, all of which are required:
7676
* `name` - the name for the cache. You should create one name per function/purpose.
7777
* `ttl` - the Time To Live for cache entries; how long a previously computed
@@ -83,6 +83,13 @@ The parameters it accepts are:
8383
* `invalidate_with_epoch` - optional boolean that defaults to `false`.
8484
If true, anything that bumps the config epoch (eg: config file changes,
8585
TSA config overrides and so on) will invalidate the cache. {{since('2025.03.19-1d3f1f67', inline=True)}}
86+
* `populate_timeout` - optional duration string. The effective default
87+
value is `120 seconds`. Specifies how long to allow the cache population
88+
function to run before generating a timeout error. {{since('dev', inline=True)}}
89+
* `retry_on_populate_timeout` - optional boolean that defaults to `false`.
90+
If true, if the `populate_timeout` is reached, then instead of generating
91+
an error, memoize will retry the population attempt.
92+
{{since('dev', inline=True)}}
8693

8794
In the example above calling:
8895

0 commit comments

Comments
 (0)