Skip to content

Commit f3486bb

Browse files
author
greatmengqi
committed
fix(backend): stream DeerFlowClient AI text as token deltas (#1969)
DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values", "custom"] which only delivers full-state snapshots at graph-node boundaries, so AI replies were dumped as a single messages-tuple event per node instead of streaming token-by-token. `client.stream("hello")` looked identical to `client.chat("hello")` — the bug reported in #1969. Subscribe to "messages" mode as well, forward AIMessageChunk deltas as messages-tuple events with delta semantics (consumers accumulate by id), and dedup the values-snapshot path so it does not re-synthesize AI text that was already streamed. Introduce a per-id usage_metadata counter so the final AIMessage in the values snapshot and the final "messages" chunk — which carry the same cumulative usage — are not double-counted. chat() now accumulates per-id deltas and returns the last message's full accumulated text. Non-streaming mock sources (single event per id) are a degenerate case of the same logic, keeping existing callers and tests backward compatible. Verified end-to-end against a real LLM: a 15-number count emits 35 messages-tuple events with BPE subword boundaries clearly visible ("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across the window, end-event usage matches the values-snapshot usage exactly (not doubled). tests/test_client_live.py::TestLiveStreaming passes. New unit tests: - test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3 delta events with correct content/id/usage, values-snapshot does not duplicate, usage counted once. - test_chat_accumulates_streamed_deltas: chat() rebuilds full text from deltas. - test_messages_mode_tool_message: ToolMessage delivered via messages mode is not duplicated by the values-snapshot synthesis path. The stream() docstring now documents why this client does not reuse Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw LangChain objects vs serialized dicts, single caller vs HTTP fan-out). Fixes #1969
1 parent 0948c7a commit f3486bb

3 files changed

Lines changed: 306 additions & 32 deletions

File tree

backend/CLAUDE.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -395,11 +395,12 @@ Both can be modified at runtime via Gateway API endpoints or `DeerFlowClient` me
395395
**Architecture**: Imports the same `deerflow` modules that LangGraph Server and Gateway API use. Shares the same config files and data directories. No FastAPI dependency.
396396

397397
**Agent Conversation** (replaces LangGraph Server):
398-
- `chat(message, thread_id)` — synchronous, returns final text
399-
- `stream(message, thread_id)` — yields `StreamEvent` aligned with LangGraph SSE protocol:
400-
- `"values"` — full state snapshot (title, messages, artifacts)
401-
- `"messages-tuple"` — per-message update (AI text, tool calls, tool results)
402-
- `"end"` — stream finished
398+
- `chat(message, thread_id)` — synchronous, accumulates streaming deltas per message-id and returns the final AI text
399+
- `stream(message, thread_id)` — subscribes to LangGraph `stream_mode=["values", "messages", "custom"]` and yields `StreamEvent`:
400+
- `"values"` — full state snapshot (title, messages, artifacts); AI text already delivered via `messages` mode is **not** re-synthesized here to avoid duplicate deliveries
401+
- `"messages-tuple"` — per-chunk update: for AI text this is a **delta** (concat per `id` to rebuild the full message); tool calls and tool results are emitted once each
402+
- `"custom"` — forwarded from `StreamWriter`
403+
- `"end"` — stream finished (carries cumulative `usage` counted once per message id)
403404
- Agent created lazily via `create_agent()` + `_build_middlewares()`, same as `make_lead_agent`
404405
- Supports `checkpointer` parameter for state persistence across turns
405406
- `reset_agent()` forces agent recreation (e.g. after memory or skill changes)

backend/packages/harness/deerflow/client.py

Lines changed: 179 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,52 @@ def stream(
336336
consumers can switch between HTTP streaming and embedded mode
337337
without changing their event-handling logic.
338338
339+
Token-level streaming
340+
~~~~~~~~~~~~~~~~~~~~~
341+
This method subscribes to LangGraph's ``messages`` stream mode, so
342+
``messages-tuple`` events for AI text are emitted as **deltas** as
343+
the model generates tokens, not as one cumulative dump at node
344+
completion. Each delta carries a stable ``id`` — consumers that
345+
want the full text must accumulate ``content`` per ``id``.
346+
``chat()`` already does this for you.
347+
348+
Tool calls and tool results are still emitted once per logical
349+
message. ``values`` events continue to carry full state snapshots
350+
after each graph node finishes; AI text already delivered via the
351+
``messages`` stream is **not** re-synthesized from the snapshot to
352+
avoid duplicate deliveries.
353+
354+
Why not reuse Gateway's ``run_agent``?
355+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
356+
Gateway (``runtime/runs/worker.py``) has a complete streaming
357+
pipeline: ``run_agent`` → ``StreamBridge`` → ``sse_consumer``. It
358+
looks like this client duplicates that work, but the two paths
359+
serve different audiences and **cannot** share execution:
360+
361+
* ``run_agent`` is ``async def`` and uses ``agent.astream()``;
362+
this method is a sync generator using ``agent.stream()`` so
363+
callers can write ``for event in client.stream(...)`` without
364+
touching asyncio. Bridging the two would require spinning up
365+
an event loop + thread per call.
366+
* Gateway events are JSON-serialized by ``serialize()`` for SSE
367+
wire transmission. In-process callers want the raw LangChain
368+
objects (``AIMessage``, ``usage_metadata`` as dataclasses), not
369+
dicts.
370+
* ``StreamBridge`` is an asyncio-queue decoupling producers from
371+
consumers across an HTTP boundary (``Last-Event-ID`` replay,
372+
heartbeats, multi-subscriber fan-out). A single in-process
373+
caller with a direct iterator needs none of that.
374+
375+
So ``DeerFlowClient.stream()`` is a parallel, sync, in-process
376+
consumer of the same ``create_agent()`` factory — not a wrapper
377+
around Gateway. The two paths **should** stay in sync on which
378+
LangGraph stream modes they subscribe to; that invariant is
379+
enforced by ``tests/test_client.py::test_messages_mode_emits_token_deltas``
380+
rather than by a shared constant, because the three layers
381+
(Graph, Platform SDK, HTTP) each use their own naming
382+
(``messages`` vs ``messages-tuple``) and cannot literally share
383+
a string.
384+
339385
Args:
340386
message: User message text.
341387
thread_id: Thread ID for conversation context. Auto-generated if None.
@@ -346,8 +392,8 @@ def stream(
346392
StreamEvent with one of:
347393
- type="values" data={"title": str|None, "messages": [...], "artifacts": [...]}
348394
- type="custom" data={...}
349-
- type="messages-tuple" data={"type": "ai", "content": str, "id": str}
350-
- type="messages-tuple" data={"type": "ai", "content": str, "id": str, "usage_metadata": {...}}
395+
- type="messages-tuple" data={"type": "ai", "content": <delta>, "id": str}
396+
- type="messages-tuple" data={"type": "ai", "content": <delta>, "id": str, "usage_metadata": {...}}
351397
- type="messages-tuple" data={"type": "ai", "content": "", "id": str, "tool_calls": [...]}
352398
- type="messages-tuple" data={"type": "tool", "content": str, "name": str, "tool_call_id": str, "id": str}
353399
- type="end" data={"usage": {"input_tokens": int, "output_tokens": int, "total_tokens": int}}
@@ -363,14 +409,52 @@ def stream(
363409
if self._agent_name:
364410
context["agent_name"] = self._agent_name
365411

412+
# ids already emitted as a complete message via the ``values``
413+
# snapshot path — used by the values path itself to avoid
414+
# duplicate per-message synthesis when the same message appears
415+
# in consecutive snapshots.
366416
seen_ids: set[str] = set()
417+
# ids whose text / tool_calls have already been streamed via the
418+
# LangGraph ``messages`` mode. The ``values`` path uses this set
419+
# to skip re-emitting synthesized messages-tuple events for the
420+
# same message.
421+
streamed_ids: set[str] = set()
422+
# ids whose ``usage_metadata`` has already been counted into
423+
# ``cumulative_usage``. The same message id shows up both in
424+
# ``messages`` chunks (last chunk carries usage) and in ``values``
425+
# snapshots (final AIMessage carries the same usage) — count once.
426+
counted_usage_ids: set[str] = set()
367427
cumulative_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
368428

429+
def _account_usage(msg_id: str | None, usage: dict | None) -> dict | None:
430+
"""Add *usage* to cumulative totals if this id has not been counted.
431+
432+
Returns the normalized usage dict (for attaching to an event)
433+
when we accepted it, otherwise ``None``.
434+
"""
435+
if not usage:
436+
return None
437+
if msg_id and msg_id in counted_usage_ids:
438+
return None
439+
if msg_id:
440+
counted_usage_ids.add(msg_id)
441+
input_tokens = usage.get("input_tokens", 0) or 0
442+
output_tokens = usage.get("output_tokens", 0) or 0
443+
total_tokens = usage.get("total_tokens", 0) or 0
444+
cumulative_usage["input_tokens"] += input_tokens
445+
cumulative_usage["output_tokens"] += output_tokens
446+
cumulative_usage["total_tokens"] += total_tokens
447+
return {
448+
"input_tokens": input_tokens,
449+
"output_tokens": output_tokens,
450+
"total_tokens": total_tokens,
451+
}
452+
369453
for item in self._agent.stream(
370454
state,
371455
config=config,
372456
context=context,
373-
stream_mode=["values", "custom"],
457+
stream_mode=["values", "messages", "custom"],
374458
):
375459
if isinstance(item, tuple) and len(item) == 2:
376460
mode, chunk = item
@@ -382,6 +466,62 @@ def stream(
382466
yield StreamEvent(type="custom", data=chunk)
383467
continue
384468

469+
if mode == "messages":
470+
# LangGraph emits ``(message_chunk, metadata_dict)`` for
471+
# each LLM delta and each tool message. ``message_chunk``
472+
# is typically an ``AIMessageChunk`` (subclass of
473+
# ``AIMessage``) during LLM streaming; for tool nodes it
474+
# is a ``ToolMessage``.
475+
if isinstance(chunk, tuple) and len(chunk) == 2:
476+
msg_chunk, _metadata = chunk
477+
else:
478+
msg_chunk = chunk
479+
480+
msg_id = getattr(msg_chunk, "id", None)
481+
482+
if isinstance(msg_chunk, AIMessage):
483+
text = self._extract_text(msg_chunk.content)
484+
usage = getattr(msg_chunk, "usage_metadata", None)
485+
counted_usage = _account_usage(msg_id, usage)
486+
487+
if text:
488+
if msg_id:
489+
streamed_ids.add(msg_id)
490+
event_data: dict[str, Any] = {"type": "ai", "content": text, "id": msg_id}
491+
if counted_usage:
492+
event_data["usage_metadata"] = counted_usage
493+
yield StreamEvent(type="messages-tuple", data=event_data)
494+
495+
tool_calls = getattr(msg_chunk, "tool_calls", None)
496+
if tool_calls:
497+
if msg_id:
498+
streamed_ids.add(msg_id)
499+
yield StreamEvent(
500+
type="messages-tuple",
501+
data={
502+
"type": "ai",
503+
"content": "",
504+
"id": msg_id,
505+
"tool_calls": [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in tool_calls],
506+
},
507+
)
508+
509+
elif isinstance(msg_chunk, ToolMessage):
510+
if msg_id:
511+
streamed_ids.add(msg_id)
512+
yield StreamEvent(
513+
type="messages-tuple",
514+
data={
515+
"type": "tool",
516+
"content": self._extract_text(msg_chunk.content),
517+
"name": getattr(msg_chunk, "name", None),
518+
"tool_call_id": getattr(msg_chunk, "tool_call_id", None),
519+
"id": msg_id,
520+
},
521+
)
522+
continue
523+
524+
# mode == "values"
385525
messages = chunk.get("messages", [])
386526

387527
for msg in messages:
@@ -391,13 +531,20 @@ def stream(
391531
if msg_id:
392532
seen_ids.add(msg_id)
393533

534+
# Already streamed through ``messages`` mode — capture
535+
# usage once more (defensive: the final AIMessage in the
536+
# snapshot may carry usage_metadata that the streamed
537+
# chunks did not) but skip synthesizing messages-tuple
538+
# events, which would duplicate what the consumer already
539+
# received.
540+
if msg_id and msg_id in streamed_ids:
541+
if isinstance(msg, AIMessage):
542+
_account_usage(msg_id, getattr(msg, "usage_metadata", None))
543+
continue
544+
394545
if isinstance(msg, AIMessage):
395-
# Track token usage from AI messages
396546
usage = getattr(msg, "usage_metadata", None)
397-
if usage:
398-
cumulative_usage["input_tokens"] += usage.get("input_tokens", 0) or 0
399-
cumulative_usage["output_tokens"] += usage.get("output_tokens", 0) or 0
400-
cumulative_usage["total_tokens"] += usage.get("total_tokens", 0) or 0
547+
counted_usage = _account_usage(msg_id, usage)
401548

402549
if msg.tool_calls:
403550
yield StreamEvent(
@@ -412,13 +559,9 @@ def stream(
412559

413560
text = self._extract_text(msg.content)
414561
if text:
415-
event_data: dict[str, Any] = {"type": "ai", "content": text, "id": msg_id}
416-
if usage:
417-
event_data["usage_metadata"] = {
418-
"input_tokens": usage.get("input_tokens", 0) or 0,
419-
"output_tokens": usage.get("output_tokens", 0) or 0,
420-
"total_tokens": usage.get("total_tokens", 0) or 0,
421-
}
562+
event_data = {"type": "ai", "content": text, "id": msg_id}
563+
if counted_usage:
564+
event_data["usage_metadata"] = counted_usage
422565
yield StreamEvent(type="messages-tuple", data=event_data)
423566

424567
elif isinstance(msg, ToolMessage):
@@ -448,26 +591,37 @@ def stream(
448591
def chat(self, message: str, *, thread_id: str | None = None, **kwargs) -> str:
449592
"""Send a message and return the final text response.
450593
451-
Convenience wrapper around :meth:`stream` that returns only the
452-
**last** AI text from ``messages-tuple`` events. If the agent emits
453-
multiple text segments in one turn, intermediate segments are
454-
discarded. Use :meth:`stream` directly to capture all events.
594+
Convenience wrapper around :meth:`stream` that accumulates delta
595+
``messages-tuple`` events per ``id`` and returns the text of the
596+
**last** AI message to complete. Intermediate AI messages (e.g.
597+
planner drafts) are discarded — only the final id's accumulated
598+
text is returned. Use :meth:`stream` directly if you need every
599+
delta as it arrives.
455600
456601
Args:
457602
message: User message text.
458603
thread_id: Thread ID for conversation context. Auto-generated if None.
459604
**kwargs: Override client defaults (same as stream()).
460605
461606
Returns:
462-
The last AI message text, or empty string if no response.
607+
The accumulated text of the last AI message, or empty string
608+
if no AI text was produced.
463609
"""
464-
last_text = ""
610+
# Accumulator keyed by message id. Token-level streaming yields
611+
# multiple ``messages-tuple`` events sharing the same id, each
612+
# carrying a delta that must be concatenated. Non-streaming mock
613+
# sources that emit a single event per id are a degenerate case
614+
# of the same logic.
615+
buffers: dict[str, str] = {}
616+
last_id: str = ""
465617
for event in self.stream(message, thread_id=thread_id, **kwargs):
466618
if event.type == "messages-tuple" and event.data.get("type") == "ai":
467-
content = event.data.get("content", "")
468-
if content:
469-
last_text = content
470-
return last_text
619+
msg_id = event.data.get("id") or ""
620+
delta = event.data.get("content", "")
621+
if delta:
622+
buffers[msg_id] = buffers.get(msg_id, "") + delta
623+
last_id = msg_id
624+
return buffers.get(last_id, "")
471625

472626
# ------------------------------------------------------------------
473627
# Public API — configuration queries

0 commit comments

Comments
 (0)