🚀 feat(celery-library): add Redis secondary indexes for O(log N) task queries#9025
Conversation
O(log N) task queriesO(log N) task queries
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #9025 +/- ##
==========================================
+ Coverage 87.35% 87.48% +0.12%
==========================================
Files 2056 2044 -12
Lines 80971 80196 -775
Branches 1451 1387 -64
==========================================
- Hits 70731 70156 -575
+ Misses 9829 9650 -179
+ Partials 411 390 -21
Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR refactors the Celery Redis backend task listing to use secondary indexes (Redis ZSETs) instead of SCAN, aiming to make task queries scale better as the number of tasks grows.
Changes:
- Added owner-based ZSET index key construction and indexing on task/group creation.
- Updated
list_tasks()to query ZSET members and fetch task metadata via pipelinedHGETs (with stale-member pruning). - Added unit tests verifying
list_tasks()no longer usesscan_iter, supports wildcard filtering, and prunes stale index entries.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
packages/celery-library/src/celery_library/backends/_redis.py |
Introduces ZSET-based owner indexes and updates task listing/removal to use them instead of key scanning. |
packages/celery-library/tests/unit/test_redis_store.py |
Adds unit tests covering ZSET-backed listing behavior, wildcard filtering, deletion cleanup, and stale index pruning. |
| index_score = datetime.now(tz=UTC).timestamp() | ||
|
|
||
| pipe = self._redis_client_sdk.redis.pipeline() | ||
| pipe.hset( | ||
| name=redis_key, | ||
| key=_CELERY_TASK_EXEC_METADATA_KEY, | ||
| value=execution_metadata.model_dump_json(), | ||
| ) | ||
| for index_key in _build_redis_owner_index_keys_for_task(task_key): | ||
| pipe.zadd(index_key, {task_key: index_score}) | ||
| await pipe.execute() |
There was a problem hiding this comment.
ZSET index keys/members are never given a TTL, so when the task hash expires naturally the index entries can remain indefinitely unless list_tasks() happens to be called (and only for that exact queried index key). This risks unbounded Redis memory growth over time.
Consider encoding an expiry timestamp in the ZSET score (e.g. now + expiry) and pruning via ZREMRANGEBYSCORE during listing, and/or applying/refreshing an EXPIRE on the index keys when adding members so old indexes can age out even if list_tasks() is never called.
There was a problem hiding this comment.
My friend, your're absolutely right. Claude is the guilty.
| def _build_redis_owner_index_keys_for_task(task_or_group_key: TaskKey | GroupKey) -> list[str]: | ||
| owner_tokens = [ | ||
| token.split("=", maxsplit=1) for token in _without_uuid_token(task_or_group_key).split(_CELERY_TASK_DELIMTATOR) | ||
| ] | ||
| wildcard_value = json_dumps(WILDCARD) | ||
|
|
||
| keys: list[str] = [] | ||
| for mask in product((False, True), repeat=len(owner_tokens)): | ||
| query_owner_key = _CELERY_TASK_DELIMTATOR.join( | ||
| f"{key}={wildcard_value if use_wildcard else value}" | ||
| for (key, value), use_wildcard in zip(owner_tokens, mask, strict=True) | ||
| ) | ||
| keys.append(_build_redis_owner_index_key(query_owner_key)) | ||
| return keys |
There was a problem hiding this comment.
_build_redis_owner_index_keys_for_task creates 2**N index keys per task where N is the number of owner-metadata fields (excluding uuid). Since OwnerMetadata supports arbitrary extra fields, this can become very expensive in both write amplification and total Redis key count for schemas with more than a handful of fields.
It would help to either (a) bound the number of indexed fields / supported wildcard fields, or (b) switch to an indexing strategy that doesn't require generating all wildcard combinations (e.g. index only specific dimensions that are actually queried).
| for index_key in _build_redis_owner_index_keys_for_task(group_key): | ||
| pipe.zadd(index_key, {group_key: index_score}) | ||
|
|
||
| # group tasks | ||
| # group sub-tasks: store hash only, no ZSET index (filtered out in list_tasks) | ||
| for task_key, (task_execution_metadata, _) in zip(task_keys, execution_metadata.tasks, strict=True): |
There was a problem hiding this comment.
The comment says group sub-tasks have "no ZSET index", but in the current flow TaskManager.submit_group() calls RedisTaskStore.create_task() for each sub-task before create_group(), and create_task() now adds every task key to the ZSET indexes. That means group sub-tasks will still be indexed (and only filtered out later via ExecutorType.GROUP_TASK), which adds a lot of extra ZSET writes for large groups.
If the intent is to avoid indexing group sub-tasks, consider adding a way to skip indexing in create_task() (or a dedicated method for group sub-tasks) and/or removing sub-task entries from the indexes once they’re re-labeled as GROUP_TASK.
Co-authored-by: Copilot <[email protected]>
…condary-indexes-for-task-queries-speedup
|



What do these changes do?
This pull request refactors the way tasks are indexed and listed in the Redis backend for the celery library. The main improvement is the introduction of a
ZSET-based indexing system for task ownership, which replaces the previous approach that relied on scanning Redis keys. This change significantly improves the efficiency and scalability of task listing operations.Related issue/s
How to test
cd package/celery-library make install-dev pytest -vv --pdb tests/unit/test_redis_store.pyDev-ops
No changes.