-
Notifications
You must be signed in to change notification settings - Fork 81
Expand file tree
/
Copy pathrq_worker_high_priority.py
More file actions
52 lines (41 loc) · 2.02 KB
/
rq_worker_high_priority.py
File metadata and controls
52 lines (41 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# /home/guido/Music/AudioMuse-AI/rq_worker_high_priority.py
import os
import sys
import logging
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Signal to app.py that we are an RQ worker, so it should skip index loading and background threads
os.environ['AUDIOMUSE_ROLE'] = 'worker'
from rq import Worker
try:
from app_helper import redis_conn
from config import APP_VERSION
except ImportError as e:
print(f"Error importing from app.py: {e}")
print("Please ensure app.py is in the Python path and does not have top-level errors.")
sys.exit(1)
# This worker ONLY listens to the 'high' queue.
queues_to_listen = ['high']
if __name__ == '__main__':
print(f"🚀 DEDICATED HIGH PRIORITY RQ Worker starting. Version: {APP_VERSION}. Listening ONLY on queues: {queues_to_listen}")
print(f"Using Redis connection: {redis_conn.connection_pool.connection_kwargs}")
# High priority worker doesn't analyze songs, so no CLAP preload needed
# Only rq_worker.py (default queue) handles song analysis tasks
worker = Worker(
queues_to_listen,
connection=redis_conn,
# --- Resilience Settings for Kubernetes ---
worker_ttl=30, # Consider worker dead if no heartbeat for 30 seconds.
job_monitoring_interval=10 # Check for dead workers every 10 seconds.
)
# Memory leak prevention: restart after N jobs
# Higher than default worker since this doesn't load CLAP model
max_jobs_before_restart = int(os.getenv('RQ_MAX_JOBS_HIGH', '100'))
logging_level = os.getenv("RQ_LOGGING_LEVEL", "INFO").upper()
print(f"RQ Worker logging level set to: {logging_level}")
print(f"Worker will restart after {max_jobs_before_restart} jobs to prevent memory leaks")
try:
# The job function itself is responsible for creating an app context if needed.
worker.work(logging_level=logging_level, max_jobs=max_jobs_before_restart)
except Exception as e:
print(f"High Priority RQ Worker failed to start or encountered an error: {e}")
sys.exit(1)