From e301da96c69a388b8e077820c7ed1089fb9a18b8 Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Thu, 21 May 2026 14:09:23 +0200 Subject: [PATCH] feat(ui): reconnect to in-progress tasks on page load When a user reloads the chat page while the agent is still running, detect the active task and resubscribe to its SSE stream via tasks/resubscribe rather than showing a stale UI. Handles clean stream end, terminal errors, and AbortError cancellation. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- ui/src/components/chat/ChatInterface.tsx | 162 +++++++++++++++++------ ui/src/lib/a2aClient.ts | 45 +++++++ 2 files changed, 169 insertions(+), 38 deletions(-) diff --git a/ui/src/components/chat/ChatInterface.tsx b/ui/src/components/chat/ChatInterface.tsx index ff759d718..778c50163 100644 --- a/ui/src/components/chat/ChatInterface.tsx +++ b/ui/src/components/chat/ChatInterface.tsx @@ -26,8 +26,11 @@ import { createMessageHandlers, extractMessagesFromTasks, extractApprovalMessage import { kagentA2AClient } from "@/lib/a2aClient"; import { useChatRunInSandbox } from "@/components/chat/ChatAgentContext"; import { v4 as uuidv4 } from "uuid"; -import { getStatusPlaceholder } from "@/lib/statusUtils"; -import { Message, DataPart } from "@a2a-js/sdk"; +import { getStatusPlaceholder, mapA2AStateToStatus } from "@/lib/statusUtils"; +import { Message, DataPart, Task, TaskState } from "@a2a-js/sdk"; + +// Task states where the agent is actively processing — resubscribe to live stream. +const RESUBSCRIBE_TASK_STATES: TaskState[] = ["submitted", "working"]; interface ChatInterfaceProps { selectedAgentName: string; @@ -122,6 +125,8 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se setIsLoading(true); setSessionNotFound(false); + let activeTask: Task | undefined; + try { const sessionExistsResponse = await checkSessionExists(sessionId); if (sessionExistsResponse.error || !sessionExistsResponse.data) { @@ -156,17 +161,32 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se if (hasPendingApproval) { setChatStatus("input_required"); + } else { + // Check for a task still actively running (not input-required, not terminal). + // input-required is excluded: it needs the approval UI, not a stream. + activeTask = messagesResponse.data.findLast( + task => RESUBSCRIBE_TASK_STATES.includes(task.status?.state as TaskState) + ); } } } catch (error) { console.error("Error loading messages:", error); toast.error("Error loading messages"); setSessionNotFound(true); + setIsLoading(false); + return; } + setIsLoading(false); + + if (activeTask) { + setChatStatus(mapA2AStateToStatus(activeTask.status?.state as TaskState)); + await streamResubscribedTask(activeTask.id); + } } initializeChat(); + // eslint-disable-next-line react-hooks/exhaustive-deps }, [sessionId, selectedAgentName, selectedNamespace, isFirstMessage]); useEffect(() => { @@ -192,6 +212,7 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se } const userMessageText = currentInputMessage; + setCurrentInputMessage(""); setChatStatus("thinking"); setStoredMessages(prev => [...prev, ...streamingMessages]); @@ -289,6 +310,67 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se setCurrentInputMessage(userMessageText); } }; + + const consumeStream = async (stream: AsyncIterable) => { + let timeoutTimer: NodeJS.Timeout | null = null; + let streamActive = true; + const STREAM_TIMEOUT_MS = 600000; // 10 minutes + + const startTimeout = () => { + if (timeoutTimer) clearTimeout(timeoutTimer); + timeoutTimer = setTimeout(() => { + if (streamActive) { + console.error("⏰ Stream timeout - no events received for 10 minutes"); + toast.error("⏰ Stream timed out - no events received for 10 minutes"); + streamActive = false; + abortControllerRef.current?.abort(); + } + }, STREAM_TIMEOUT_MS); + }; + startTimeout(); + + try { + for await (const event of stream) { + startTimeout(); + try { + handleMessageEvent(event as Message); + } catch (err) { + console.error("Error handling stream event:", err); + } + if (abortControllerRef.current?.signal.aborted) { + streamActive = false; + break; + } + } + } finally { + streamActive = false; + if (timeoutTimer) clearTimeout(timeoutTimer); + } + }; + + const reloadSessionFromDB = async () => { + try { + const currentSessionId = session?.id || sessionId; + if (!currentSessionId) return; + const latest = await getSessionTasks(currentSessionId); + if (latest.data && latest.data.length > 0) { + const extractedMessages = extractMessagesFromTasks(latest.data); + const { messages: pendingApprovalMessages, hasPendingApproval } = extractApprovalMessagesFromTasks(latest.data); + setStoredMessages( + hasPendingApproval + ? [...extractedMessages, ...pendingApprovalMessages] + : extractedMessages + ); + setSessionStats(extractTokenStatsFromTasks(latest.data)); + setStreamingMessages([]); + if (hasPendingApproval) { + setChatStatus("input_required"); + } + } + } catch { + // Best-effort reload. + } + }; /** * Shared streaming helper used by both handleSendMessage and @@ -341,42 +423,7 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se runInSandbox ); - let timeoutTimer: NodeJS.Timeout | null = null; - let streamActive = true; - const streamTimeout = 600000; // 10 minutes - - const handleTimeout = () => { - if (streamActive) { - console.error("⏰ Stream timeout - no events received for 10 minutes"); - toast.error("⏰ Stream timed out - no events received for 10 minutes"); - streamActive = false; - if (abortControllerRef.current) abortControllerRef.current.abort(); - } - }; - - const startTimeout = () => { - if (timeoutTimer) clearTimeout(timeoutTimer); - timeoutTimer = setTimeout(handleTimeout, streamTimeout); - }; - startTimeout(); - - try { - for await (const event of stream) { - startTimeout(); - try { - handleMessageEvent(event); - } catch (error) { - console.error(`❌ Error handling event: ${error}`); - } - if (abortControllerRef.current?.signal.aborted) { - streamActive = false; - break; - } - } - } finally { - streamActive = false; - if (timeoutTimer) clearTimeout(timeoutTimer); - } + await consumeStream(stream); } catch (error: unknown) { if (error instanceof Error && error.name === "AbortError") { setChatStatus("ready"); @@ -393,6 +440,45 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se } }; + const streamResubscribedTask = async (taskId: string) => { + const isTerminalError = (err: unknown) => { + if (!(err instanceof Error)) return false; + const msg = err.message.toLowerCase(); + return msg.includes("terminal state") || msg.includes("task not found") || msg.includes("404"); + }; + + abortControllerRef.current = new AbortController(); + isFirstAssistantChunkRef.current = true; + + try { + const stream = await kagentA2AClient.resubscribeStream( + selectedNamespace, + selectedAgentName, + taskId, + abortControllerRef.current.signal, + runInSandbox, + ); + + await consumeStream(stream); + + // Stream ended cleanly — reload final state from DB and settle. + await reloadSessionFromDB(); + } catch (error: unknown) { + if (error instanceof Error && error.name !== "AbortError" && !isTerminalError(error)) { + console.error("Resubscribe failed:", error); + } + // Terminal, AbortError, or unexpected error — reload whatever state we have. + if (!(error instanceof Error && error.name === "AbortError")) { + await reloadSessionFromDB(); + } + } finally { + abortControllerRef.current = null; + setChatStatus("ready"); + setIsStreaming(false); + setStreamingContent(""); + } + }; + const handleCancel = (e: React.FormEvent) => { e.preventDefault(); diff --git a/ui/src/lib/a2aClient.ts b/ui/src/lib/a2aClient.ts index d62f21a1d..789f077b9 100644 --- a/ui/src/lib/a2aClient.ts +++ b/ui/src/lib/a2aClient.ts @@ -77,6 +77,51 @@ export class KagentA2AClient { return this.processSSEStream(response.body); } + /** + * Resubscribe to an existing in-progress task's event stream. + * Use this on page load when a task is still running to reconnect without + * sending a new message. Fails if the task is already in a terminal state. + */ + async resubscribeStream( + namespace: string, + agentName: string, + taskId: string, + signal?: AbortSignal, + runInSandbox = false + ): Promise> { + const request = { + jsonrpc: "2.0" as const, + method: "tasks/resubscribe", + params: { id: taskId }, + id: uuidv4(), + }; + + const proxyUrl = runInSandbox + ? `/a2a-sandboxes/${namespace}/${agentName}` + : `/a2a/${namespace}/${agentName}`; + + const response = await fetch(proxyUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'text/event-stream', + }, + body: JSON.stringify(request), + signal, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Resubscribe failed: ${response.status} ${response.statusText} - ${errorText}`); + } + + if (!response.body) { + throw new Error('Response body is null'); + } + + return this.processSSEStream(response.body); + } + /** * Process Server-Sent Events stream with proper event boundary detection */