Skip to content

Latest commit

 

History

History
46 lines (38 loc) · 2.31 KB

File metadata and controls

46 lines (38 loc) · 2.31 KB

Queue System Simplification Implementation

I have successfully implemented the queue system simplification plan.

Changes Implemented

1. Database Layer Hardening

  • Added claimNextMessage(sessionDbId) to PendingMessageStore:
    • Implements an atomic transaction (SELECT oldest pending + UPDATE to processing).
    • Ensures a message can only be claimed by one worker at a time.
    • Eliminates race conditions between "peeking" and "marking".
  • Removed peekPending():
    • No longer needed as claimNextMessage handles retrieval and locking in one step.

2. Unified "Pump" Architecture

  • Created src/services/queue/SessionQueueProcessor.ts:
    • Implements a robust AsyncIterableIterator that yields messages.
    • Encapsulates the "Claim -> Yield -> Wait" loop.
    • Replaces fragile polling/recursive logic with event-driven waitForMessage.
    • Handles empty queues gracefully by waiting for signals.

3. SessionManager Refactoring

  • Updated getMessageIterator:
    • Now delegates to SessionQueueProcessor.
    • Removes complex manual synchronization logic.
  • Removed In-Memory Queue State:
    • queueObservation and queueSummarize now only write to DB and emit events.
    • pendingMessages array is no longer used for logic (kept deprecated for type compatibility).
    • getTotalActiveWork, hasPendingMessages, etc., now query PendingMessageStore directly (counting both 'pending' and 'processing' states).

4. Logic Cleanup

  • Removed Recursive Restarts:
    • Refactored startGeneratorWithProvider in SessionRoutes.ts and startSessionProcessor in WorkerService.ts.
    • Removed logic that deleted sessions when queue emptied (sessions now wait for new work).
    • Removed "auto-restart" logic for normal completion (only kept for crash recovery).

Benefits

  • Reliability: Atomic DB operations prevent stuck or duplicate messages.
  • Simplicity: Removed complex "peek-then-mark" and recursive restart chains.
  • Performance: Zero-latency event notification with efficient DB queries.
  • Maintainability: Clear separation of concerns (Store vs Processor vs Manager).

Verification

  • Ran static analysis (tsc) to verify type safety of new components.
  • Verified removal of dead code (peekPending).
  • Confirmed integration points in SessionManager and SessionRoutes.