diff --git a/ui/src/components/chat/ChatInterface.tsx b/ui/src/components/chat/ChatInterface.tsx index 778c50163..aa47c72e9 100644 --- a/ui/src/components/chat/ChatInterface.tsx +++ b/ui/src/components/chat/ChatInterface.tsx @@ -31,6 +31,8 @@ import { Message, DataPart, Task, TaskState } from "@a2a-js/sdk"; // Task states where the agent is actively processing — resubscribe to live stream. const RESUBSCRIBE_TASK_STATES: TaskState[] = ["submitted", "working"]; +// Task states that mean the session is busy (used by the cross-tab send guard). +const ACTIVE_TASK_STATES: TaskState[] = ["submitted", "working", "input-required"]; interface ChatInterfaceProps { selectedAgentName: string; @@ -213,6 +215,29 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se const userMessageText = currentInputMessage; + // Cross-tab guard: fetch the latest session state before mutating anything. + // Two cases: (1) another tab is still streaming — reconnect instead of sending; + // (2) another tab completed a turn we haven't loaded — reload so the user sees + // the full context before their next message goes out. + const guardSessionId = session?.id || sessionId; + if (guardSessionId) { + // Compare only non-approval messages to avoid false negatives when + // storedMessages includes appended ToolApprovalRequest / AskUserRequest entries. + const localMessageCount = storedMessages.filter(m => { + const meta = m.metadata as ADKMetadata | undefined; + return meta?.originalType !== "ToolApprovalRequest" && meta?.originalType !== "AskUserRequest"; + }).length; + const guardResult = await checkAndSyncSessionBeforeAction(guardSessionId, { + localMessageCount, + messages: { + inFlight: "This session is already being processed — reconnecting to live updates", + inputRequired: "Session is awaiting your input — please review before sending", + staleOrChanged: "New messages loaded — please review before sending", + }, + }); + if (guardResult === "blocked") return; + } + setCurrentInputMessage(""); setChatStatus("thinking"); setStoredMessages(prev => [...prev, ...streamingMessages]); @@ -473,12 +498,91 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se } } finally { abortControllerRef.current = null; - setChatStatus("ready"); + // Don't override input_required that reloadSessionFromDB() may have set. + setChatStatus(prev => prev === "input_required" ? prev : "ready"); setIsStreaming(false); setStreamingContent(""); } }; + /** + * Cross-tab guard: fetch the latest session state and sync before any action + * that would mutate the session. Returns "proceed" if safe, "blocked" if the + * action was superseded and the handler should return early. + * + * HITL mode (expectedTaskId provided): verifies the specific task is still + * input-required; resubscribes or reloads if another tab already responded. + * + * Send-guard mode (no expectedTaskId): checks for any active task and for + * stale local messages; blocks and syncs if either is detected. + */ + const checkAndSyncSessionBeforeAction = async ( + guardSessionId: string, + opts: { + expectedTaskId?: string; + localMessageCount?: number; + messages: { + inFlight: string; + inputRequired?: string; + staleOrChanged: string; + }; + } + ): Promise<"proceed" | "blocked"> => { + let tasksCheck: Awaited>; + try { + tasksCheck = await getSessionTasks(guardSessionId); + } catch { + // Guard is best-effort: if the check fails, let the action proceed. + return "proceed"; + } + if (!tasksCheck.data) return "proceed"; + + if (opts.expectedTaskId) { + const expectedTask = tasksCheck.data.findLast(task => task.id === opts.expectedTaskId); + if ((expectedTask?.status?.state as TaskState | undefined) !== "input-required") { + const inFlightTask = tasksCheck.data.findLast( + task => RESUBSCRIBE_TASK_STATES.includes(task.status?.state as TaskState) + ); + if (inFlightTask) { + toast.info(opts.messages.inFlight); + setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState)); + await streamResubscribedTask(inFlightTask.id); + } else { + await reloadSessionFromDB(); + toast.info(opts.messages.staleOrChanged); + } + return "blocked"; + } + return "proceed"; + } + + const inFlightTask = tasksCheck.data.findLast( + task => ACTIVE_TASK_STATES.includes(task.status?.state as TaskState) + ); + if (inFlightTask) { + if ((inFlightTask.status?.state as TaskState) === "input-required") { + await reloadSessionFromDB(); + toast.info(opts.messages.inputRequired ?? opts.messages.staleOrChanged); + } else { + toast.info(opts.messages.inFlight); + setChatStatus(mapA2AStateToStatus(inFlightTask.status?.state as TaskState)); + await streamResubscribedTask(inFlightTask.id); + } + return "blocked"; + } + + if (opts.localMessageCount !== undefined) { + const dbMessages = extractMessagesFromTasks(tasksCheck.data); + if (dbMessages.length > opts.localMessageCount) { + await reloadSessionFromDB(); + toast.info(opts.messages.staleOrChanged); + return "blocked"; + } + } + + return "proceed"; + }; + const handleCancel = (e: React.FormEvent) => { e.preventDefault(); @@ -518,13 +622,25 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se displayText: string, ) => { const currentSessionId = session?.id || sessionId; - setChatStatus("thinking"); - setStreamingContent(""); - // Find the taskId from the pending approval message so the A2A framework - // reuses the existing task instead of creating a new one. + // Find the taskId first so the guard can verify the task is still input-required. const { taskId: approvalTaskId } = getPendingApprovalToolIds(); + // Cross-tab guard: another tab may have already submitted this approval. + if (currentSessionId && approvalTaskId) { + const guardResult = await checkAndSyncSessionBeforeAction(currentSessionId, { + expectedTaskId: approvalTaskId, + messages: { + inFlight: "Another tab already responded — reconnecting to live updates", + staleOrChanged: "Session state changed — please review", + }, + }); + if (guardResult === "blocked") return; + } + + setChatStatus("thinking"); + setStreamingContent(""); + // Stamp approvalDecision on the current pending approval messages so they // are excluded from getPendingApprovalToolIds on future HITL cycles. // approvalDecision is either a uniform ToolDecision or a per-tool map @@ -657,10 +773,8 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se * Handle ask_user answers submitted by the user. Sends an "approve" decision * with the answers payload attached, routed to the pending ask_user task. */ - const handleAskUserSubmit = (answers: Array<{ answer: string[] }>) => { + const handleAskUserSubmit = async (answers: Array<{ answer: string[] }>) => { const currentSessionId = session?.id || sessionId; - setChatStatus("thinking"); - setStreamingContent(""); // Find the taskId from the pending AskUserRequest message let askUserTaskId: string | undefined; @@ -673,6 +787,21 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se } } + // Cross-tab guard: another tab may have already answered this question. + if (currentSessionId && askUserTaskId) { + const guardResult = await checkAndSyncSessionBeforeAction(currentSessionId, { + expectedTaskId: askUserTaskId, + messages: { + inFlight: "Another tab already responded — reconnecting to live updates", + staleOrChanged: "Session state changed — please review", + }, + }); + if (guardResult === "blocked") return; + } + + setChatStatus("thinking"); + setStreamingContent(""); + // Stamp the ask-user message as resolved so we don't show the form again const stampAskUser = (msgs: Message[]) => msgs.map(m => { const meta = m.metadata as Record | undefined; @@ -702,7 +831,7 @@ export default function ChatInterface({ selectedAgentName, selectedNamespace, se metadata: { timestamp: Date.now() }, }; - streamA2AMessage(a2aMessage, { + await streamA2AMessage(a2aMessage, { errorLabel: "Ask user response failed", sessionIdForWait: currentSessionId, onFinally: () => {