Skip to content

Commit 4cf4fb9

Browse files
committed
feat: add stale job lock cleanup and production safeguards
This addresses a production incident where stale locks accumulated over time, blocking job processing. The root cause was twofold: 1. Cleanup only targeted queue locks, not job locks 2. Cleanup only ran on startup, not periodically Changes: - Add release_stale_job_locks() to clean orphaned job-level locks - Add periodic cleanup task (default: every 60 seconds) - Add separate configurable timeouts for queue vs job locks - Add minimum timeout enforcement with warnings/errors - Add comprehensive metrics for cleanup operations - Add cleanup health gauge for alerting - Add integration tests proving the SQL actually works - Document new config options and metrics New WorkerConfig options: - stale_lock_cleanup_interval: periodic cleanup interval (default: 60s) - stale_queue_lock_timeout: queue lock staleness threshold (default: 5 min) - stale_job_lock_timeout: job lock staleness threshold (default: 30 min) New metrics: - backfill_cleanup_queue_locks_released (counter) - backfill_cleanup_job_locks_released (counter) - backfill_cleanup_failed_jobs_deleted (counter) - backfill_cleanup_failures (counter with operation/error_type labels) - backfill_cleanup_last_success_timestamp (gauge for health alerting) BREAKING: startup_cleanup() now returns (u64, u64, u64) instead of (u64, u64)
1 parent 3ad1125 commit 4cf4fb9

File tree

9 files changed

+773
-17
lines changed

9 files changed

+773
-17
lines changed

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Built on top of `graphile_worker` (v0.8.6), backfill adds these production-ready
2525
- 🏃 **Flexible Worker Patterns** - `WorkerRunner` supporting tokio::select!, background tasks, and one-shot processing
2626
- 🔧 **Admin API** - Optional Axum router for HTTP-based job management (experimental)
2727
- 📝 **Convenience Functions** - `enqueue_fast()`, `enqueue_bulk()`, `enqueue_critical()`, etc.
28+
- 🧹 **Stale Lock Cleanup** - Automatic cleanup of orphaned locks from crashed workers (startup + periodic)
2829

2930
All built on graphile_worker's rock-solid foundation of PostgreSQL SKIP LOCKED and LISTEN/NOTIFY.
3031

@@ -69,6 +70,45 @@ All configuration is passed in via environment variables:
6970
- `POLL_INTERVAL_MS`: Job polling interval (default: 200ms)
7071
- `RUST_LOG`: Logging configuration
7172

73+
### WorkerConfig Options
74+
75+
When building a `WorkerRunner`, you can configure additional options:
76+
77+
```rust
78+
use std::time::Duration;
79+
use backfill::{WorkerConfig, WorkerRunner};
80+
81+
let config = WorkerConfig::new(&database_url)
82+
.with_schema("graphile_worker") // PostgreSQL schema (default)
83+
.with_poll_interval(Duration::from_millis(200)) // Job polling interval
84+
.with_dlq_processor_interval(Some(Duration::from_secs(60))) // DLQ processing
85+
// Stale lock cleanup configuration
86+
.with_stale_lock_cleanup_interval(Some(Duration::from_secs(60))) // Periodic cleanup
87+
.with_stale_queue_lock_timeout(Duration::from_secs(300)) // 5 min (queue locks)
88+
.with_stale_job_lock_timeout(Duration::from_secs(1800)); // 30 min (job locks)
89+
90+
let worker = WorkerRunner::builder(config).await?
91+
.define_job::<MyJob>()
92+
.build().await?;
93+
```
94+
95+
#### Stale Lock Cleanup
96+
97+
When workers crash without graceful shutdown, they can leave locks behind that prevent jobs from being processed. Backfill automatically cleans these up:
98+
99+
- **Startup cleanup**: Runs when the worker starts
100+
- **Periodic cleanup**: Runs every 60 seconds by default (configurable)
101+
102+
**Configuration options:**
103+
104+
| Option | Default | Description |
105+
|--------|---------|-------------|
106+
| `stale_lock_cleanup_interval` | 60s | How often to check for stale locks. Set to `None` to disable periodic cleanup. |
107+
| `stale_queue_lock_timeout` | 5 min | Queue locks older than this are considered stale. Queue locks are normally held for milliseconds. |
108+
| `stale_job_lock_timeout` | 30 min | Job locks older than this are considered stale. **Set this longer than your longest-running job!** |
109+
110+
**⚠️ Warning:** Setting `stale_job_lock_timeout` too short can cause duplicate job execution if jobs legitimately run longer than the timeout. This can lead to data corruption.
111+
72112
### SQLx Compile-Time Query Verification
73113

74114
This library uses SQLx's compile-time query verification for production safety. Set `DATABASE_URL` during compilation to enable type-safe, compile-time checked SQL queries:

docs/03-metrics.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,42 @@ Monitor worker pool health and utilization.
350350
- `result`: Poll result (jobs_found, empty, error)
351351
- **Use**: Monitor polling efficiency, detect issues
352352

353+
### Cleanup Metrics
354+
355+
Track stale lock cleanup operations. These are critical for detecting when cleanup isn't working properly.
356+
357+
#### `backfill_cleanup_queue_locks_released`
358+
- **Type**: Counter
359+
- **Description**: Total number of stale queue locks released
360+
- **Labels**: None
361+
- **Use**: Monitor cleanup activity, detect stuck queues
362+
363+
#### `backfill_cleanup_job_locks_released`
364+
- **Type**: Counter
365+
- **Description**: Total number of stale job locks released
366+
- **Labels**: None
367+
- **Use**: Monitor cleanup activity, detect crashed workers leaving orphaned jobs
368+
369+
#### `backfill_cleanup_failed_jobs_deleted`
370+
- **Type**: Counter
371+
- **Description**: Total number of permanently failed jobs cleaned up from main queue
372+
- **Labels**: None
373+
- **Use**: Track cleanup of exhausted-retry jobs
374+
375+
#### `backfill_cleanup_failures`
376+
- **Type**: Counter
377+
- **Description**: Cleanup operations that failed
378+
- **Labels**:
379+
- `operation`: Which cleanup operation failed (queue_locks, job_locks)
380+
- `error_type`: Error classification (timeout, network, etc.)
381+
- **Use**: Alert on cleanup failures
382+
383+
#### `backfill_cleanup_last_success_timestamp`
384+
- **Type**: Gauge
385+
- **Description**: Unix timestamp of last successful cleanup run
386+
- **Labels**: None
387+
- **Use**: **Critical health signal** - alert if cleanup hasn't succeeded recently
388+
353389
### Retry Metrics
354390

355391
Understand retry patterns and effectiveness.
@@ -511,6 +547,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
511547
for: 5m
512548
severity: critical
513549
description: Job failure rate >10% for 5+ minutes
550+
551+
# Cleanup not running (CRITICAL - can cause job lock buildup)
552+
- alert: CleanupNotRunning
553+
expr: time() - backfill_cleanup_last_success_timestamp > 300
554+
for: 5m
555+
severity: critical
556+
description: Stale lock cleanup hasn't succeeded in 5+ minutes
557+
558+
# Cleanup releasing locks (indicates crashed workers)
559+
- alert: StaleLocksReleased
560+
expr: increase(backfill_cleanup_job_locks_released[5m]) > 0
561+
for: 0m
562+
severity: warning
563+
description: Cleanup released stale job locks - indicates worker crash
514564
```
515565
516566
### Warning Alerts

examples/basic_worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ impl From<ExampleWorkerConfig> for WorkerConfig {
118118
],
119119
poll_interval: value.poll_interval,
120120
dlq_processor_interval: Some(value.dlq_processor_interval),
121+
..Default::default()
121122
}
122123
}
123124
}

src/client/cleanup.rs

Lines changed: 192 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,43 @@
22
//!
33
//! Provides functions to clean up stale state that can accumulate when workers
44
//! crash or are forcibly terminated without graceful shutdown.
5+
//!
6+
//! ## Lock Types
7+
//!
8+
//! Graphile Worker uses two types of locks:
9+
//!
10+
//! 1. **Queue locks** (`_private_job_queues.locked_at`) - Brief locks held
11+
//! during job selection. These are typically held for milliseconds.
12+
//!
13+
//! 2. **Job locks** (`_private_jobs.locked_at`) - Locks held while jobs are
14+
//! being processed. These can be held for the duration of job execution
15+
//! (minutes).
16+
//!
17+
//! When a worker crashes, both types of locks can become orphaned. This module
18+
//! provides functions to clean up both.
519
620
use std::time::Duration;
721

22+
use tokio::task::JoinHandle;
23+
use tokio_util::sync::CancellationToken;
24+
825
use super::BackfillClient;
926
use crate::BackfillError;
1027

1128
/// Default timeout for considering a queue lock stale.
1229
///
1330
/// Queue locks are held briefly during job selection (milliseconds), so any
1431
/// lock older than this is almost certainly from a crashed worker.
15-
pub const DEFAULT_STALE_LOCK_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes
32+
pub const DEFAULT_STALE_QUEUE_LOCK_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes
33+
34+
/// Default timeout for considering a job lock stale.
35+
///
36+
/// Job locks are held while jobs execute, which can take minutes. This timeout
37+
/// should be longer than your longest-running job. Default: 30 minutes.
38+
pub const DEFAULT_STALE_JOB_LOCK_TIMEOUT: Duration = Duration::from_secs(1800); // 30 minutes
39+
40+
/// Default interval for periodic stale lock cleanup.
41+
pub const DEFAULT_STALE_LOCK_CLEANUP_INTERVAL: Duration = Duration::from_secs(60); // 1 minute
1642

1743
impl BackfillClient {
1844
/// Release stale queue locks that were left behind by crashed workers.
@@ -45,6 +71,9 @@ impl BackfillClient {
4571
let result = sqlx::query(&query).execute(&self.pool).await?;
4672
let released = result.rows_affected();
4773

74+
// Always emit metrics (counter increments even for 0)
75+
crate::metrics::record_cleanup_queue_locks_released(released);
76+
4877
if released > 0 {
4978
log::info!(
5079
"Released stale queue locks (count: {}, timeout_secs: {})",
@@ -56,6 +85,49 @@ impl BackfillClient {
5685
Ok(released)
5786
}
5887

88+
/// Release stale job locks that were left behind by crashed workers.
89+
///
90+
/// When a worker crashes while processing a job, the job remains locked
91+
/// in `_private_jobs` and will never be retried. This function releases
92+
/// any job locks older than the specified timeout, allowing the jobs to
93+
/// be picked up again by other workers.
94+
///
95+
/// # Arguments
96+
/// * `timeout` - Locks older than this duration are considered stale
97+
///
98+
/// # Returns
99+
/// Number of job locks that were released
100+
pub async fn release_stale_job_locks(&self, timeout: Duration) -> Result<u64, BackfillError> {
101+
let timeout_secs = timeout.as_secs();
102+
103+
let query = format!(
104+
r#"
105+
UPDATE {schema}._private_jobs
106+
SET locked_at = NULL, locked_by = NULL
107+
WHERE locked_at IS NOT NULL
108+
AND locked_at < NOW() - INTERVAL '{timeout_secs} seconds'
109+
"#,
110+
schema = self.schema,
111+
timeout_secs = timeout_secs
112+
);
113+
114+
let result = sqlx::query(&query).execute(&self.pool).await?;
115+
let released = result.rows_affected();
116+
117+
// Always emit metrics (counter increments even for 0)
118+
crate::metrics::record_cleanup_job_locks_released(released);
119+
120+
if released > 0 {
121+
log::info!(
122+
"Released stale job locks (count: {}, timeout_secs: {})",
123+
released,
124+
timeout_secs
125+
);
126+
}
127+
128+
Ok(released)
129+
}
130+
59131
/// Delete permanently failed jobs from the main queue.
60132
///
61133
/// Jobs that have exhausted all retry attempts (attempts >= max_attempts)
@@ -81,6 +153,9 @@ impl BackfillClient {
81153
let result = sqlx::query(&query).execute(&self.pool).await?;
82154
let deleted = result.rows_affected();
83155

156+
// Emit metric
157+
crate::metrics::record_cleanup_failed_jobs_deleted(deleted);
158+
84159
if deleted > 0 {
85160
log::info!(
86161
"Cleaned up permanently failed jobs from main queue (count: {})",
@@ -91,27 +166,133 @@ impl BackfillClient {
91166
Ok(deleted)
92167
}
93168

94-
/// Run all startup cleanup tasks.
169+
/// Run all startup cleanup tasks with default timeouts.
95170
///
96171
/// This should be called when a worker starts to clean up any stale state
97172
/// left behind by previous workers. It performs:
98-
/// 1. Release stale queue locks (using default timeout)
99-
/// 2. Delete permanently failed jobs from main queue
173+
/// 1. Release stale queue locks (5 minute timeout)
174+
/// 2. Release stale job locks (30 minute timeout)
175+
/// 3. Delete permanently failed jobs from main queue
100176
///
101177
/// # Returns
102-
/// Tuple of (stale_locks_released, failed_jobs_deleted)
103-
pub async fn startup_cleanup(&self) -> Result<(u64, u64), BackfillError> {
104-
log::info!("Running startup cleanup tasks");
178+
/// Tuple of (queue_locks_released, job_locks_released, failed_jobs_deleted)
179+
pub async fn startup_cleanup(&self) -> Result<(u64, u64, u64), BackfillError> {
180+
self.startup_cleanup_with_timeouts(DEFAULT_STALE_QUEUE_LOCK_TIMEOUT, DEFAULT_STALE_JOB_LOCK_TIMEOUT)
181+
.await
182+
}
105183

106-
let locks_released = self.release_stale_queue_locks(DEFAULT_STALE_LOCK_TIMEOUT).await?;
184+
/// Run all startup cleanup tasks with custom timeouts.
185+
///
186+
/// This allows configuring the stale lock thresholds for environments
187+
/// where the defaults aren't appropriate.
188+
///
189+
/// # Arguments
190+
/// * `queue_lock_timeout` - Timeout for queue locks (normally held for ms)
191+
/// * `job_lock_timeout` - Timeout for job locks (held during job execution)
192+
///
193+
/// # Returns
194+
/// Tuple of (queue_locks_released, job_locks_released, failed_jobs_deleted)
195+
pub async fn startup_cleanup_with_timeouts(
196+
&self,
197+
queue_lock_timeout: Duration,
198+
job_lock_timeout: Duration,
199+
) -> Result<(u64, u64, u64), BackfillError> {
200+
log::info!(
201+
"Running startup cleanup (queue_lock_timeout: {}s, job_lock_timeout: {}s)",
202+
queue_lock_timeout.as_secs(),
203+
job_lock_timeout.as_secs()
204+
);
205+
206+
let queue_locks_released = self.release_stale_queue_locks(queue_lock_timeout).await?;
207+
let job_locks_released = self.release_stale_job_locks(job_lock_timeout).await?;
107208
let jobs_deleted = self.cleanup_permanently_failed_jobs().await?;
108209

109210
log::info!(
110-
"Startup cleanup completed (stale_locks_released: {}, failed_jobs_deleted: {})",
111-
locks_released,
211+
"Startup cleanup completed (queue_locks: {}, job_locks: {}, failed_jobs: {})",
212+
queue_locks_released,
213+
job_locks_released,
112214
jobs_deleted
113215
);
114216

115-
Ok((locks_released, jobs_deleted))
217+
Ok((queue_locks_released, job_locks_released, jobs_deleted))
218+
}
219+
220+
/// Start a background task that periodically cleans up stale locks.
221+
///
222+
/// This spawns a task that runs at the specified interval, cleaning up
223+
/// both queue-level and job-level locks using separate timeout thresholds.
224+
///
225+
/// # Arguments
226+
/// * `interval` - How often to check for stale locks
227+
/// * `queue_lock_timeout` - Timeout for queue locks (normally held for ms)
228+
/// * `job_lock_timeout` - Timeout for job locks (held during job execution)
229+
/// * `cancellation_token` - Token to signal when to stop the background
230+
/// task
231+
///
232+
/// # Returns
233+
/// A JoinHandle for the background task
234+
pub fn start_stale_lock_cleanup(
235+
&self,
236+
interval: Duration,
237+
queue_lock_timeout: Duration,
238+
job_lock_timeout: Duration,
239+
cancellation_token: CancellationToken,
240+
) -> JoinHandle<()> {
241+
let client = self.clone();
242+
243+
tokio::spawn(async move {
244+
let mut interval_timer = tokio::time::interval(interval);
245+
interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
246+
247+
log::info!(
248+
"Starting stale lock cleanup task (interval: {}s, queue_timeout: {}s, job_timeout: {}s)",
249+
interval.as_secs(),
250+
queue_lock_timeout.as_secs(),
251+
job_lock_timeout.as_secs()
252+
);
253+
254+
loop {
255+
tokio::select! {
256+
_ = cancellation_token.cancelled() => {
257+
log::info!("Stale lock cleanup task shutting down");
258+
break;
259+
}
260+
_ = interval_timer.tick() => {
261+
let mut all_succeeded = true;
262+
263+
// Clean queue locks (short timeout - these are normally held briefly)
264+
match client.release_stale_queue_locks(queue_lock_timeout).await {
265+
Ok(_) => {}
266+
Err(e) => {
267+
log::warn!("Failed to release stale queue locks: {}", e);
268+
crate::metrics::record_cleanup_failure(
269+
"queue_locks",
270+
crate::metrics::classify_error_for_metrics(&e),
271+
);
272+
all_succeeded = false;
273+
}
274+
}
275+
276+
// Clean job locks (longer timeout - jobs can run for a while)
277+
match client.release_stale_job_locks(job_lock_timeout).await {
278+
Ok(_) => {}
279+
Err(e) => {
280+
log::warn!("Failed to release stale job locks: {}", e);
281+
crate::metrics::record_cleanup_failure(
282+
"job_locks",
283+
crate::metrics::classify_error_for_metrics(&e),
284+
);
285+
all_succeeded = false;
286+
}
287+
}
288+
289+
// Update health timestamp if both operations succeeded
290+
if all_succeeded {
291+
crate::metrics::update_cleanup_health_timestamp();
292+
}
293+
}
294+
}
295+
}
296+
})
116297
}
117298
}

src/client/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
//! The backfill client, split across a couple of files.
22
3-
mod cleanup;
3+
pub mod cleanup;
44
mod dlq;
55
mod enqueue;
66

7-
pub use cleanup::DEFAULT_STALE_LOCK_TIMEOUT;
87
pub use dlq::*;
98

109
/// High-level client for the backfill job queue system.

0 commit comments

Comments
 (0)