Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 124 additions & 38 deletions ui/src/components/chat/ChatInterface.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import { createMessageHandlers, extractMessagesFromTasks, extractApprovalMessage
import { kagentA2AClient } from "@/lib/a2aClient";
import { useChatRunInSandbox } from "@/components/chat/ChatAgentContext";
import { v4 as uuidv4 } from "uuid";
import { getStatusPlaceholder } from "@/lib/statusUtils";
import { Message, DataPart } from "@a2a-js/sdk";
import { getStatusPlaceholder, mapA2AStateToStatus } from "@/lib/statusUtils";
import { Message, DataPart, Task, TaskState } from "@a2a-js/sdk";

// Task states where the agent is actively processing — resubscribe to live stream.
const RESUBSCRIBE_TASK_STATES: TaskState[] = ["submitted", "working"];

interface ChatInterfaceProps {
selectedAgentName: string;
Expand Down Expand Up @@ -122,6 +125,8 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
setIsLoading(true);
setSessionNotFound(false);

let activeTask: Task | undefined;

try {
const sessionExistsResponse = await checkSessionExists(sessionId);
if (sessionExistsResponse.error || !sessionExistsResponse.data) {
Expand Down Expand Up @@ -156,17 +161,32 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se

if (hasPendingApproval) {
setChatStatus("input_required");
} else {
// Check for a task still actively running (not input-required, not terminal).
// input-required is excluded: it needs the approval UI, not a stream.
activeTask = messagesResponse.data.findLast(
task => RESUBSCRIBE_TASK_STATES.includes(task.status?.state as TaskState)
);
}
}
} catch (error) {
console.error("Error loading messages:", error);
toast.error("Error loading messages");
setSessionNotFound(true);
setIsLoading(false);
return;
}

setIsLoading(false);

if (activeTask) {
setChatStatus(mapA2AStateToStatus(activeTask.status?.state as TaskState));
await streamResubscribedTask(activeTask.id);
}
Comment on lines +182 to +185
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The project has no component-level tests for ChatInterface at all at present.

}

initializeChat();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [sessionId, selectedAgentName, selectedNamespace, isFirstMessage]);

useEffect(() => {
Expand All @@ -192,6 +212,7 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
}

const userMessageText = currentInputMessage;

setCurrentInputMessage("");
setChatStatus("thinking");
setStoredMessages(prev => [...prev, ...streamingMessages]);
Expand Down Expand Up @@ -289,6 +310,67 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
setCurrentInputMessage(userMessageText);
}
};

const consumeStream = async (stream: AsyncIterable<unknown>) => {
let timeoutTimer: NodeJS.Timeout | null = null;
let streamActive = true;
const STREAM_TIMEOUT_MS = 600000; // 10 minutes

const startTimeout = () => {
if (timeoutTimer) clearTimeout(timeoutTimer);
timeoutTimer = setTimeout(() => {
if (streamActive) {
console.error("⏰ Stream timeout - no events received for 10 minutes");
toast.error("⏰ Stream timed out - no events received for 10 minutes");
streamActive = false;
abortControllerRef.current?.abort();
}
}, STREAM_TIMEOUT_MS);
};
startTimeout();

try {
for await (const event of stream) {
startTimeout();
try {
handleMessageEvent(event as Message);
} catch (err) {
console.error("Error handling stream event:", err);
}
if (abortControllerRef.current?.signal.aborted) {
streamActive = false;
break;
}
}
} finally {
streamActive = false;
if (timeoutTimer) clearTimeout(timeoutTimer);
}
};

const reloadSessionFromDB = async () => {
try {
const currentSessionId = session?.id || sessionId;
if (!currentSessionId) return;
const latest = await getSessionTasks(currentSessionId);
if (latest.data && latest.data.length > 0) {
const extractedMessages = extractMessagesFromTasks(latest.data);
const { messages: pendingApprovalMessages, hasPendingApproval } = extractApprovalMessagesFromTasks(latest.data);
setStoredMessages(
hasPendingApproval
? [...extractedMessages, ...pendingApprovalMessages]
: extractedMessages
);
setSessionStats(extractTokenStatsFromTasks(latest.data));
setStreamingMessages([]);
Comment thread
onematchfox marked this conversation as resolved.
if (hasPendingApproval) {
setChatStatus("input_required");
}
}
} catch {
// Best-effort reload.
}
};

/**
* Shared streaming helper used by both handleSendMessage and
Expand Down Expand Up @@ -341,42 +423,7 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
runInSandbox
);

let timeoutTimer: NodeJS.Timeout | null = null;
let streamActive = true;
const streamTimeout = 600000; // 10 minutes

const handleTimeout = () => {
if (streamActive) {
console.error("⏰ Stream timeout - no events received for 10 minutes");
toast.error("⏰ Stream timed out - no events received for 10 minutes");
streamActive = false;
if (abortControllerRef.current) abortControllerRef.current.abort();
}
};

const startTimeout = () => {
if (timeoutTimer) clearTimeout(timeoutTimer);
timeoutTimer = setTimeout(handleTimeout, streamTimeout);
};
startTimeout();

try {
for await (const event of stream) {
startTimeout();
try {
handleMessageEvent(event);
} catch (error) {
console.error(`❌ Error handling event: ${error}`);
}
if (abortControllerRef.current?.signal.aborted) {
streamActive = false;
break;
}
}
} finally {
streamActive = false;
if (timeoutTimer) clearTimeout(timeoutTimer);
}
await consumeStream(stream);
} catch (error: unknown) {
if (error instanceof Error && error.name === "AbortError") {
setChatStatus("ready");
Expand All @@ -393,6 +440,45 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se
}
};

const streamResubscribedTask = async (taskId: string) => {
const isTerminalError = (err: unknown) => {
if (!(err instanceof Error)) return false;
const msg = err.message.toLowerCase();
return msg.includes("terminal state") || msg.includes("task not found") || msg.includes("404");
};

abortControllerRef.current = new AbortController();
isFirstAssistantChunkRef.current = true;

try {
const stream = await kagentA2AClient.resubscribeStream(
selectedNamespace,
selectedAgentName,
taskId,
abortControllerRef.current.signal,
runInSandbox,
);

await consumeStream(stream);

// Stream ended cleanly — reload final state from DB and settle.
await reloadSessionFromDB();
} catch (error: unknown) {
if (error instanceof Error && error.name !== "AbortError" && !isTerminalError(error)) {
console.error("Resubscribe failed:", error);
}
// Terminal, AbortError, or unexpected error — reload whatever state we have.
if (!(error instanceof Error && error.name === "AbortError")) {
await reloadSessionFromDB();
}
} finally {
abortControllerRef.current = null;
setChatStatus("ready");
setIsStreaming(false);
setStreamingContent("");
}
};

const handleCancel = (e: React.FormEvent) => {
e.preventDefault();

Expand Down
45 changes: 45 additions & 0 deletions ui/src/lib/a2aClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,51 @@ export class KagentA2AClient {
return this.processSSEStream(response.body);
}

/**
* Resubscribe to an existing in-progress task's event stream.
* Use this on page load when a task is still running to reconnect without
* sending a new message. Fails if the task is already in a terminal state.
*/
async resubscribeStream(
namespace: string,
agentName: string,
taskId: string,
signal?: AbortSignal,
runInSandbox = false
): Promise<AsyncIterable<any>> {
const request = {
jsonrpc: "2.0" as const,
method: "tasks/resubscribe",
params: { id: taskId },
id: uuidv4(),
};

const proxyUrl = runInSandbox
? `/a2a-sandboxes/${namespace}/${agentName}`
: `/a2a/${namespace}/${agentName}`;

const response = await fetch(proxyUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
},
body: JSON.stringify(request),
signal,
});

if (!response.ok) {
const errorText = await response.text();
throw new Error(`Resubscribe failed: ${response.status} ${response.statusText} - ${errorText}`);
}

if (!response.body) {
throw new Error('Response body is null');
}

return this.processSSEStream(response.body);
}

/**
* Process Server-Sent Events stream with proper event boundary detection
*/
Expand Down
Loading