feat(apigateway): incremental embed progress and configurable worker concurrency (#430)#437
Merged
Merged
Conversation
…concurrency (#430) A 164-operation spec on a CPU-only Ollama deployment took ~10 minutes to index while the UI sat at 0/164 the entire run and snapped to 164/164 at the end. Two independent gaps surfaced by that workflow, both fixed here. The third half (Ollama batch endpoint) was closed by #435. ## Progress visibility Migration 000046 adds embedded_so_far INT NOT NULL DEFAULT 0 to api_catalog_embedding_jobs. The worker publishes the counter at every chunk boundary via a new Store.UpdateProgress call so the catalog status endpoint can render "running, N/M" while the spec's DELETE+INSERT upsert is still pending. The atomic all-or-nothing write of the embedding vectors is preserved; the counter is a separate column read only while JobStatus == running. ComputeOperationEmbeddings gains an optional progress callback that fires once with the reused-row count up front, then again after every embedInBatches chunk. The embed-jobs Computer adapter wires the callback to Store.UpdateProgress. UpdateProgress writes are best-effort: a DB error is logged at debug level but does not abort the embed pass. The counter resets to 0 only on Claim. Terminal rows (succeeded / failed) and pending rows recovered from a lease expiry may still carry a prior attempt's value; callers gate display on Status == running so the stale value never reaches the UI. ## Worker concurrency New apigateway.embed_jobs.workers config (default 1, preserves prior behavior). Worker.Start spawns N goroutines that share the queue; the existing FOR UPDATE SKIP LOCKED + lease guarantee in Claim keep two goroutines (in the same pod or across pods) from picking the same job. After each successful Claim the worker also calls Notify once so a sibling can drain the next pending job in parallel; the buffered wakeup channel coalesces redundant Notifies. Worker gains a Concurrency() accessor so the wiring test can assert the value flowed from config through WorkerConfig. ## Tests - Unit: TestUpdateProgress_{HappyPath,LeaseRotatedIsNoop,DBError} (sqlmock against PostgresStore). - Unit: TestWorker_PublishesChunkProgress proves the worker hooks the chunk callback into Store.UpdateProgress keyed by (id, worker). - Unit: TestWorker_ProgressWriteFailureIsLogged proves a DB error on the progress write does not abort the job (final Complete is the authoritative success signal). - Unit: TestWorker_ConcurrencyProcessesJobsInParallel proves 4 jobs at 50ms each run in well under the 200ms serial baseline with Concurrency=4. - Unit: TestComputeOperationEmbeddings_ProgressCallback asserts the initial reused-publish and the chunk-done publishes happen. - Wiring: TestWireAPIGatewayEmbedJobsFromDB_WiresWorkerWithConfiguredConcurrency asserts apigateway.embed_jobs.workers=3 produces a Worker reporting Concurrency()==3. - Integration (build tag integration): starts pgvector/pgvector:pg16, enqueues a job against a slow stub Computer that publishes 4/8/12 across three 100ms chunks, polls SpecStatuses while running, asserts embedded_so_far is strictly increasing across observations and terminal status is succeeded. Pre-existing test/e2e/helpers/admin.go also fixed (AuditConfig.Enabled and KnowledgeConfig.Enabled drifted to *bool; helper added a local boolPtr).
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #437 +/- ##
==========================================
+ Coverage 86.11% 86.18% +0.06%
==========================================
Files 235 235
Lines 32208 32241 +33
==========================================
+ Hits 27737 27787 +50
+ Misses 3260 3239 -21
- Partials 1211 1215 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes #430.
A 164-operation spec on a CPU-only Ollama deployment took ~10 minutes to index while the UI sat at
indexing 0/164the entire run and snapped to164/164at the end. From an operator's perspective the run looked frozen. Two independent gaps surfaced by that workflow, both fixed in this PR. The third half of #430 (Ollama batch endpoint) was closed earlier by #435.Approach
Progress visibility
Migration
000046addsembedded_so_far INTEGER NOT NULL DEFAULT 0toapi_catalog_embedding_jobs. The worker publishes the counter at every chunk boundary via a newStore.UpdateProgresscall. The catalog status endpoint reads the column. The portal'sEmbeddingStatusBadgerendersindexing N/Mfromembedded_so_farwhileJobStatus == running, distinct frompending(queued, 0) andsucceeded(the greenN/N indexedstate).The atomic all-or-nothing semantic of the spec's vector upsert is preserved: the existing
DELETE+INSERTtransaction is unchanged.embedded_so_faris a separate column on the job row, populated by best-effort writes. A DB error on the progress write is logged at debug level but does not abort the embed pass: the finalCompleteis the authoritative success signal.ComputeOperationEmbeddingsgains an optionalprogress func(int)callback that fires once with the reused-row count up front (so a fully-cached refresh ticks straight to operation_count without waiting for a no-op embed pass) and again after everyembedInBatcheschunk. The embed-jobsComputeradapter wires the callback toStore.UpdateProgresskeyed by(id, worker_id, status='running')so a stale worker whose lease was rotated cannot clobber the new lease-holder's count.The counter resets to 0 only on
Claim. Terminal rows (succeeded/failed) and pending rows recovered from a lease expiry may still carry a prior attempt's value; callers gate display onStatus == runningso the stale value never reaches the UI. The doc comments and the JSON response struct both call this out explicitly.Worker concurrency
New
apigateway.embed_jobs.workersconfig (default 1, preserves prior single-goroutine behavior).Worker.Startspawns N goroutines that share one wakeup channel and one stopCh. The existingFOR UPDATE SKIP LOCKED LIMIT 1predicate inClaimplus the 10-minute lease guarantee keep two goroutines (in the same pod or across pods) from picking the same job.After each successful
Claimthe worker also callsNotifyonce so a sibling goroutine can drain the next pending job in parallel; the buffered wakeup channel coalesces redundant Notifies. The Notify-after-Claim is placed after the error check so a DB outage that stormsClaimerrors does not also storm sibling-wakeup attempts.Workergains aConcurrency() intaccessor so the wiring test can prove the value flowed from config throughWorkerConfigwithout exporting thecfgfield itself.Wire-format change
GET /api/v1/admin/api-catalogs/{id}/embedding-statusresponse gains one new field:{ "spec_name": "users", "operation_count": 47, "embedding_count": 0, "embedded_so_far": 12, "job_status": "running", "job_attempts": 1, "job_last_error": "", "job_updated_at": "2026-05-19T11:42:03Z" }embedded_so_farisomitemptyso existing consumers that ignore unknown fields see no change in shape when the counter is zero.Tests
Unit
TestUpdateProgress_HappyPath/_LeaseRotatedIsNoop/_DBErroragainstPostgresStorevia sqlmock.TestWorker_PublishesChunkProgressproves the worker hooks the chunk callback intoStore.UpdateProgresskeyed by(id, worker_id).TestWorker_ProgressWriteFailureIsLoggedproves a DB error on the progress write does not abort the job.TestWorker_ConcurrencyProcessesJobsInParallelproves 4 jobs at 50ms each run in well under the 200ms serial baseline withConcurrency=4(typical wall time ~60ms).TestComputeOperationEmbeddings_ProgressCallbackasserts the initial reused-publish and the chunk-done publishes happen and that the cumulative count reachesoperation_count.TestWireAPIGatewayEmbedJobsFromDB_WiresWorkerWithConfiguredConcurrencyassertsapigateway.embed_jobs.workers: 3produces aWorkerreportingConcurrency() == 3.Integration (build tag
integration)pkg/platform/integration_embedjobs_progress_test.gostartspgvector/pgvector:pg16, enqueues a job against a slow stub Computer that publishes 4/8/12 across three 100ms chunks, pollsSpecStatuseswhile running, assertsembedded_so_faris strictly increasing across observations and terminal status issucceeded. Runtime ~5s.Verification
make verifyclean: tools-check, gofmt, race tests, total + patch coverage above gate (patch coverage 100%), golangci-lint (patch-scoped againstorigin/main), gosec, govulncheck, semgrep, codeql, doc-check, dead-code, mutation testing, goreleaser dry-run.make test-integrationclean against pgvector.Claimresets it, comments claimedRetrydid too) plus em dashes; both fixed. Round 2 surfaced a deadslowComputer.callsfield reintroducing a round-1 pattern, plus a wiring test whose name promised more than it asserted; both fixed (drop field, addWorker.Concurrency()+ assertion). Round 3 returned CLEAN.Drive-by fix
test/e2e/helpers/admin.gowas failing theintegrationbuild becauseAuditConfig.EnabledandKnowledgeConfig.Enabledhad drifted to*bool. The helper now wraps with a localboolPtr.Test plan
make verifypasses locally.make test-integrationpasses locally against pgvector.apigateway.embed_jobs.workers: 2, save two API specs back to back, confirm both worker goroutines pick a different spec (lease + SKIP LOCKED in action), confirm both spec badges tickindexing N/Mupward before flipping to green.0/Nbefore completion instead of staying at 0 until the final commit.