Skip to content

Commit 3a725e2

Browse files
authored
Merge branch 'main' into batch-poc
2 parents e0b6e38 + 176721e commit 3a725e2

25 files changed

Lines changed: 680 additions & 50 deletions

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
solid_queue (1.3.1)
4+
solid_queue (1.4.0)
55
activejob (>= 7.1)
66
activerecord (>= 7.1)
77
concurrent-ruby (>= 1.3.1)

README.md

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
1717
- [Workers, dispatchers, and scheduler](#workers-dispatchers-and-scheduler)
1818
- [Fork vs. async mode](#fork-vs-async-mode)
1919
- [Configuration](#configuration)
20+
- [Optional scheduler configuration](#optional-scheduler-configuration)
2021
- [Queue order and priorities](#queue-order-and-priorities)
2122
- [Queues specification and performance](#queues-specification-and-performance)
2223
- [Threads, processes, and signals](#threads-processes-and-signals)
@@ -32,6 +33,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
3233
- [Puma plugin](#puma-plugin)
3334
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
3435
- [Recurring tasks](#recurring-tasks)
36+
- [Scheduling and unscheduling recurring tasks dynamically](#scheduling-and-unscheduling-recurring-tasks-dynamically)
3537
- [Inspiration](#inspiration)
3638
- [License](#license)
3739

@@ -210,7 +212,7 @@ By default, Solid Queue will try to find your configuration under `config/queue.
210212
bin/jobs -c config/calendar.yml
211213
```
212214

213-
You can also skip all recurring tasks by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`.
215+
You can also skip the scheduler process by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`.
214216

215217
This is what this configuration looks like:
216218

@@ -228,6 +230,10 @@ production:
228230
threads: 5
229231
polling_interval: 0.1
230232
processes: 3
233+
scheduler:
234+
dynamic_tasks_enabled: true
235+
polling_interval: 5
236+
231237
```
232238

233239
Everything is optional. If no configuration at all is provided, Solid Queue will run with one dispatcher and one worker with default settings. If you want to run only dispatchers or workers, you just need to include that section alone in the configuration. For example, with the following configuration:
@@ -272,6 +278,19 @@ It is recommended to set this value less than or equal to the queue database's c
272278
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
273279

274280

281+
### Optional scheduler configuration
282+
283+
Optionally, you can configure the scheduler process under the `scheduler` section in your `config/queue.yml` if you'd like to [schedule recurring tasks dynamically](#scheduling-and-unscheduling-recurring-tasks-dynamically).
284+
285+
```yaml
286+
scheduler:
287+
dynamic_tasks_enabled: true
288+
polling_interval: 5
289+
```
290+
291+
- `dynamic_tasks_enabled`: whether the scheduler should poll for [dynamically scheduled recurring tasks](#scheduling-and-unscheduling-recurring-tasks-dynamically). This is `false` by default. When enabled, the scheduler will poll the database at the given `polling_interval` to pick up tasks scheduled via `SolidQueue.schedule_recurring_task`.
292+
- `polling_interval`: how frequently (in seconds) the scheduler checks for dynamic task changes. Defaults to `5`.
293+
275294
### Queue order and priorities
276295

277296
As mentioned above, if you specify a list of queues for a worker, these will be polled in the order given, such as for the list `real_time,background`, no jobs will be taken from `background` unless there aren't any more jobs waiting in `real_time`.
@@ -463,7 +482,7 @@ class MyJob < ApplicationJob
463482
- `group` is used to control the concurrency of different job classes together. It defaults to the job class name.
464483
- `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following:
465484
- (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires.
466-
- `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed.
485+
- `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded until the interval defined by `duration` has elapsed.
467486

468487
When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping).
469488

@@ -473,7 +492,7 @@ Since something can happen that prevents the first job from releasing the semaph
473492

474493
It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated.
475494

476-
When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore.
495+
When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for until the `duration` interval if something happens and a running job fails to release the semaphore.
477496

478497

479498
For example:
@@ -793,6 +812,38 @@ my_periodic_resque_job:
793812

794813
and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.
795814

815+
### Scheduling and unscheduling recurring tasks dynamically
816+
817+
You can schedule and unschedule recurring tasks at runtime, without editing the configuration file. To enable this, you need to set `dynamic_tasks_enabled: true` in the `scheduler` section of your `config/queue.yml`, [as explained earlier](#optional-scheduler-configuration).
818+
819+
```yaml
820+
scheduler:
821+
dynamic_tasks_enabled: true
822+
```
823+
824+
Then you can use the following methods to add recurring tasks dynamically:
825+
826+
```ruby
827+
SolidQueue.schedule_recurring_task(
828+
"my_dynamic_task",
829+
class: "MyJob",
830+
args: [1, 2],
831+
schedule: "every 10 minutes"
832+
)
833+
```
834+
835+
This accepts the same options as the YAML configuration: `class`, `args`, `command`, `schedule`, `queue`, `priority`, and `description`.
836+
837+
To remove a dynamically scheduled task:
838+
839+
```ruby
840+
SolidQueue.unschedule_recurring_task("my_dynamic_task")
841+
```
842+
843+
Only dynamic tasks can be unscheduled at runtime. Attempting to unschedule a static task (defined in `config/recurring.yml`) will raise an `ActiveRecord::RecordNotFound` error.
844+
845+
Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them.
846+
796847
## Inspiration
797848

798849
Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot.

app/models/solid_queue/blocked_execution.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ def release_many(concurrency_keys)
2626

2727
def release_one(concurrency_key)
2828
transaction do
29-
if execution = ordered.where(concurrency_key: concurrency_key).limit(1).non_blocking_lock.first
29+
if execution = ordered.where(concurrency_key: concurrency_key).limit(1)
30+
.use_index(:index_solid_queue_blocked_executions_for_release)
31+
.non_blocking_lock.first
3032
execution.release
3133
end
3234
end

app/models/solid_queue/failed_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class FailedExecution < Execution
66

77
serialize :error, coder: JSON
88

9-
before_create :expand_error_details_from_exception
9+
before_save :expand_error_details_from_exception, if: :exception
1010

1111
attr_accessor :exception
1212

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def unblock_next_blocked_job
2626
end
2727

2828
def concurrency_limited?
29-
concurrency_key.present?
29+
concurrency_key.present? && job_class.present?
3030
end
3131

3232
def blocked?

app/models/solid_queue/job/retryable.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@ def retry
1616
end
1717

1818
def failed_with(exception)
19-
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
19+
FailedExecution.transaction(requires_new: true) do
20+
FailedExecution.create!(job_id: id, exception: exception)
21+
end
22+
rescue ActiveRecord::RecordNotUnique
23+
if (failed_execution = FailedExecution.find_by(job_id: id))
24+
failed_execution.exception = exception
25+
failed_execution.save!
26+
else
27+
retry
28+
end
2029
end
2130

2231
def reset_execution_counters

app/models/solid_queue/ready_execution.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ def select_and_lock(queue_relation, process_id, limit)
3030
end
3131

3232
def select_candidates(queue_relation, limit)
33-
queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id)
33+
# Force query execution here with #to_a to avoid unintended FOR UPDATE query executions
34+
queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id).to_a
3435
end
3536

3637
def lock_candidates(executions, process_id)

app/models/solid_queue/record.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ def supports_insert_conflict_target?
2020
connection.supports_insert_conflict_target?
2121
end
2222
end
23+
24+
# Pass index hints to the query optimizer using SQL comment hints.
25+
# Uses MySQL 8 optimizer hint query comments, which SQLite and
26+
# PostgreSQL ignore.
27+
def use_index(*indexes)
28+
optimizer_hints "INDEX(#{quoted_table_name} #{indexes.join(', ')})"
29+
end
2330
end
2431
end
2532
end

app/models/solid_queue/recurring_task.rb

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ module SolidQueue
66
class RecurringTask < Record
77
serialize :arguments, coder: Arguments, default: []
88

9-
validate :supported_schedule
9+
validate :ensure_schedule_supported
1010
validate :ensure_command_or_class_present
11-
validate :existing_job_class
11+
validate :ensure_existing_job_class
1212

1313
scope :static, -> { where(static: true) }
14+
scope :dynamic, -> { where(static: false) }
1415

1516
has_many :recurring_executions, foreign_key: :task_key, primary_key: :key
1617

@@ -32,7 +33,15 @@ def from_configuration(key, **options)
3233
queue_name: options[:queue].presence,
3334
priority: options[:priority].presence,
3435
description: options[:description],
35-
static: true
36+
static: options.fetch(:static, true)
37+
end
38+
39+
def create_dynamic_task(key, **options)
40+
from_configuration(key, **options.merge(static: false)).save!
41+
end
42+
43+
def delete_dynamic_task(key)
44+
RecurringTask.dynamic.find_by!(key: key).destroy
3645
end
3746

3847
def create_or_update_all(tasks)
@@ -102,10 +111,19 @@ def attributes_for_upsert
102111
end
103112

104113
private
105-
def supported_schedule
114+
def ensure_schedule_supported
106115
unless parsed_schedule.instance_of?(Fugit::Cron)
107116
errors.add :schedule, :unsupported, message: "is not a supported recurring schedule"
108117
end
118+
rescue ArgumentError => error
119+
message = if error.message.include?("multiple crons")
120+
"generates multiple cron schedules. Please use separate recurring tasks for each schedule, " +
121+
"or use explicit cron syntax (e.g., '40 0,15 * * *' for multiple times with the same minutes)"
122+
else
123+
error.message
124+
end
125+
126+
errors.add :schedule, :unsupported, message: message
109127
end
110128

111129
def ensure_command_or_class_present
@@ -114,7 +132,7 @@ def ensure_command_or_class_present
114132
end
115133
end
116134

117-
def existing_job_class
135+
def ensure_existing_job_class
118136
if class_name.present? && job_class.nil?
119137
errors.add :class_name, :undefined, message: "doesn't correspond to an existing class"
120138
end
@@ -152,7 +170,7 @@ def arguments_with_kwargs
152170

153171

154172
def parsed_schedule
155-
@parsed_schedule ||= Fugit.parse(schedule)
173+
@parsed_schedule ||= Fugit.parse(schedule, multi: :fail)
156174
end
157175

158176
def job_class

app/models/solid_queue/semaphore.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def initialize(job)
4040
end
4141

4242
def wait
43-
if semaphore = Semaphore.find_by(key: key)
43+
if semaphore = Semaphore.lock.find_by(key: key)
4444
semaphore.value > 0 && attempt_decrement
4545
else
4646
attempt_creation

0 commit comments

Comments
 (0)