Skip to content

Commit f1009fd

Browse files
authored
Merge pull request #9 from unicef/fix/pipeline-sse-and-progress-ui
Fix pipeline SSE delivery, add LLM timeout, redesign progress tiles
2 parents 05c66db + a5d081e commit f1009fd

5 files changed

Lines changed: 190 additions & 122 deletions

File tree

apps/api/src/routes/pipeline.ts

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Hono } from "hono"
22
import { streamSSE } from "hono/streaming"
33
import { HTTPException } from "hono/http-exception"
4-
import type { PipelineService } from "../services/pipeline-service.js"
4+
import type { PipelineService, PipelineSSEEvent } from "../services/pipeline-service.js"
55

66
export function createPipelineRoutes(
77
service: PipelineService,
@@ -91,47 +91,75 @@ export function createPipelineRoutes(
9191
return
9292
}
9393

94-
// Stream real-time events
95-
let closed = false
94+
// Queue-based SSE: listener pushes events, loop drains with awaited writes
95+
const queue: PipelineSSEEvent[] = []
96+
let done = false
97+
9698
const unsubscribe = service.addListener(label, (event) => {
97-
if (closed) return
98-
try {
99-
if (event.type === "progress") {
100-
stream.writeSSE({
101-
event: "progress",
102-
data: JSON.stringify(event.data),
103-
})
104-
} else if (event.type === "pipeline-complete") {
105-
stream.writeSSE({
106-
event: "complete",
107-
data: JSON.stringify({ label: event.label }),
108-
})
109-
closed = true
110-
} else if (event.type === "pipeline-error") {
111-
stream.writeSSE({
112-
event: "error",
113-
data: JSON.stringify({
114-
label: event.label,
115-
error: event.error,
116-
}),
117-
})
118-
closed = true
119-
}
120-
} catch {
121-
// Stream write failed (client disconnected)
122-
closed = true
123-
}
99+
if (done) return
100+
queue.push(event)
124101
})
125102

126-
// Keep stream alive until pipeline completes or client disconnects
103+
// Re-check status after subscribing to avoid race where pipeline
104+
// completes between the initial check and listener registration
105+
const jobAfterSubscribe = service.getStatus(label)
106+
if (
107+
jobAfterSubscribe?.status === "completed" ||
108+
jobAfterSubscribe?.status === "failed"
109+
) {
110+
const event =
111+
jobAfterSubscribe.status === "completed" ? "complete" : "error"
112+
const data =
113+
jobAfterSubscribe.status === "completed"
114+
? { label }
115+
: { label, error: jobAfterSubscribe.error }
116+
await stream.writeSSE({ event, data: JSON.stringify(data) })
117+
unsubscribe()
118+
return
119+
}
120+
127121
stream.onAbort(() => {
128-
closed = true
122+
done = true
129123
unsubscribe()
130124
})
131125

132-
// Wait for completion
133-
while (!closed) {
134-
await new Promise((r) => setTimeout(r, 100))
126+
// Drain queue with proper await on each write
127+
while (!done) {
128+
while (queue.length > 0) {
129+
const event = queue.shift()!
130+
try {
131+
if (event.type === "progress") {
132+
await stream.writeSSE({
133+
event: "progress",
134+
data: JSON.stringify(event.data),
135+
})
136+
} else if (event.type === "pipeline-complete") {
137+
await stream.writeSSE({
138+
event: "complete",
139+
data: JSON.stringify({ label: event.label }),
140+
})
141+
done = true
142+
break
143+
} else if (event.type === "pipeline-error") {
144+
await stream.writeSSE({
145+
event: "error",
146+
data: JSON.stringify({
147+
label: event.label,
148+
error: event.error,
149+
}),
150+
})
151+
done = true
152+
break
153+
}
154+
} catch {
155+
// Stream write failed (client disconnected)
156+
done = true
157+
break
158+
}
159+
}
160+
if (!done) {
161+
await new Promise((r) => setTimeout(r, 50))
162+
}
135163
}
136164

137165
unsubscribe()

apps/api/src/services/pipeline-runner.ts

Lines changed: 72 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -150,68 +150,86 @@ export function createPipelineRunner(): PipelineRunner {
150150
let completedSection = 0
151151
let completedRender = 0
152152

153+
const failedPages: string[] = []
154+
153155
await processWithConcurrency(
154156
pages,
155157
effectiveConcurrency,
156158
async (page: PageData) => {
157-
await processPage(
158-
page,
159-
storage,
160-
{
161-
textClassifyConfig,
162-
imageClassifyConfig,
163-
sectioningConfig,
164-
renderConfig,
165-
},
166-
llmModel,
167-
progress,
168-
totalPages,
169-
{
170-
onClassifyImages: () => {
171-
completedClassifyImages++
172-
progress.emit({
173-
type: "step-progress",
174-
step: "image-classification",
175-
message: `${completedClassifyImages}/${totalPages}`,
176-
page: completedClassifyImages,
177-
totalPages,
178-
})
179-
},
180-
onClassifyText: () => {
181-
completedClassifyText++
182-
progress.emit({
183-
type: "step-progress",
184-
step: "text-classification",
185-
message: `${completedClassifyText}/${totalPages}`,
186-
page: completedClassifyText,
187-
totalPages,
188-
})
189-
},
190-
onSection: () => {
191-
completedSection++
192-
progress.emit({
193-
type: "step-progress",
194-
step: "page-sectioning",
195-
message: `${completedSection}/${totalPages}`,
196-
page: completedSection,
197-
totalPages,
198-
})
159+
try {
160+
await processPage(
161+
page,
162+
storage,
163+
{
164+
textClassifyConfig,
165+
imageClassifyConfig,
166+
sectioningConfig,
167+
renderConfig,
199168
},
200-
onRender: () => {
201-
completedRender++
202-
progress.emit({
203-
type: "step-progress",
204-
step: "web-rendering",
205-
message: `${completedRender}/${totalPages}`,
206-
page: completedRender,
207-
totalPages,
208-
})
209-
},
210-
}
211-
)
169+
llmModel,
170+
progress,
171+
totalPages,
172+
{
173+
onClassifyImages: () => {
174+
completedClassifyImages++
175+
progress.emit({
176+
type: "step-progress",
177+
step: "image-classification",
178+
message: `${completedClassifyImages}/${totalPages}`,
179+
page: completedClassifyImages,
180+
totalPages,
181+
})
182+
},
183+
onClassifyText: () => {
184+
completedClassifyText++
185+
progress.emit({
186+
type: "step-progress",
187+
step: "text-classification",
188+
message: `${completedClassifyText}/${totalPages}`,
189+
page: completedClassifyText,
190+
totalPages,
191+
})
192+
},
193+
onSection: () => {
194+
completedSection++
195+
progress.emit({
196+
type: "step-progress",
197+
step: "page-sectioning",
198+
message: `${completedSection}/${totalPages}`,
199+
page: completedSection,
200+
totalPages,
201+
})
202+
},
203+
onRender: () => {
204+
completedRender++
205+
progress.emit({
206+
type: "step-progress",
207+
step: "web-rendering",
208+
message: `${completedRender}/${totalPages}`,
209+
page: completedRender,
210+
totalPages,
211+
})
212+
},
213+
}
214+
)
215+
} catch (err) {
216+
const msg = err instanceof Error ? err.message : String(err)
217+
failedPages.push(`${page.pageId}: ${msg}`)
218+
progress.emit({
219+
type: "step-error",
220+
step: "web-rendering",
221+
error: `${page.pageId} failed: ${msg}`,
222+
})
223+
}
212224
}
213225
)
214226

227+
if (failedPages.length > 0) {
228+
throw new Error(
229+
`${failedPages.length} page(s) failed:\n${failedPages.join("\n")}`
230+
)
231+
}
232+
215233
// Emit completion for all per-page steps
216234
progress.emit({
217235
type: "step-complete",

apps/studio/src/components/pipeline/PipelineProgress.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ function getStepState(
2727
progress: PipelineProgressState
2828
): "pending" | "active" | "completed" | "error" {
2929
if (progress.completedSteps.has(step)) return "completed"
30+
if (progress.stepProgress.has(step)) return "active"
3031
if (progress.currentStep === step) return "active"
3132
if (progress.error?.startsWith(step)) return "error"
3233
return "pending"
@@ -62,7 +63,7 @@ export function PipelineProgress({
6263
</CardHeader>
6364
<CardContent>
6465
{(isRunning || isComplete || error) && (
65-
<div className="mb-4 space-y-0.5">
66+
<div className="mb-4 grid grid-cols-2 gap-2 sm:grid-cols-3">
6667
{STEP_ORDER.map((step) => (
6768
<StepIndicator
6869
key={step}

apps/studio/src/components/pipeline/StepIndicator.tsx

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ function StepIcon({ state }: { state: StepState }) {
4040
case "error":
4141
return <AlertCircle className="h-4 w-4 text-destructive" />
4242
default:
43-
return <Circle className="h-4 w-4 text-muted-foreground/40" />
43+
return <Circle className="h-4 w-4 text-muted-foreground/30" />
4444
}
4545
}
4646

@@ -49,44 +49,64 @@ export function StepIndicator({
4949
state,
5050
progress,
5151
}: StepIndicatorProps) {
52+
const pct =
53+
state === "active" && progress?.totalPages
54+
? Math.round(((progress.page ?? 0) / progress.totalPages) * 100)
55+
: state === "completed"
56+
? 100
57+
: 0
58+
5259
return (
53-
<div className="flex items-center gap-3 py-1.5">
54-
<StepIcon state={state} />
55-
<div className="flex-1">
56-
<div
60+
<div
61+
className={cn(
62+
"rounded-lg border p-3 transition-all",
63+
state === "active" && "border-blue-200 bg-blue-50/50",
64+
state === "completed" && "border-green-200 bg-green-50/30",
65+
state === "error" && "border-destructive/30 bg-destructive/5",
66+
state === "pending" && "border-border/50 bg-muted/20 opacity-60"
67+
)}
68+
>
69+
<div className="mb-2 flex items-center justify-between">
70+
<span
5771
className={cn(
58-
"text-sm",
59-
state === "active" && "font-medium text-foreground",
60-
state === "completed" && "text-muted-foreground",
61-
state === "pending" && "text-muted-foreground/60",
62-
state === "error" && "text-destructive"
72+
"text-sm font-medium",
73+
state === "completed" && "text-green-700",
74+
state === "error" && "text-destructive",
75+
state === "pending" && "text-muted-foreground"
6376
)}
6477
>
6578
{label}
66-
</div>
79+
</span>
80+
<StepIcon state={state} />
81+
</div>
82+
83+
{/* Progress bar — always visible for active/completed */}
84+
<div className="h-1.5 rounded-full bg-muted/60">
85+
<div
86+
className={cn(
87+
"h-full rounded-full transition-all duration-300",
88+
state === "active" && "bg-blue-600",
89+
state === "completed" && "bg-green-500",
90+
state === "error" && "bg-destructive",
91+
state === "pending" && "bg-transparent"
92+
)}
93+
style={{ width: `${pct}%` }}
94+
/>
95+
</div>
96+
97+
{/* Status text */}
98+
<div className="mt-1.5 text-xs text-muted-foreground">
6799
{state === "active" && progress?.totalPages && (
68-
<div className="mt-1">
69-
<div className="flex justify-between text-xs text-muted-foreground">
70-
<span>
71-
{progress.page ?? 0} / {progress.totalPages} pages
72-
</span>
73-
<span>
74-
{Math.round(
75-
((progress.page ?? 0) / progress.totalPages) * 100
76-
)}
77-
%
78-
</span>
79-
</div>
80-
<div className="mt-0.5 h-1.5 rounded-full bg-muted">
81-
<div
82-
className="h-full rounded-full bg-blue-600 transition-all"
83-
style={{
84-
width: `${((progress.page ?? 0) / progress.totalPages) * 100}%`,
85-
}}
86-
/>
87-
</div>
88-
</div>
100+
<span>
101+
{progress.page ?? 0} / {progress.totalPages} pages
102+
</span>
103+
)}
104+
{state === "active" && !progress?.totalPages && (
105+
<span>Processing...</span>
89106
)}
107+
{state === "completed" && <span>Done</span>}
108+
{state === "error" && <span>Failed</span>}
109+
{state === "pending" && <span>Waiting</span>}
90110
</div>
91111
</div>
92112
)

packages/llm/src/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ async function callLLM<T>(
209209
schema: opts.schema,
210210
system,
211211
messages: coreMessages,
212+
abortSignal: AbortSignal.timeout(90_000),
212213
}
213214
if (opts.maxTokens) {
214215
generateOpts.maxTokens = opts.maxTokens

0 commit comments

Comments
 (0)