Skip to content

Commit ddd8613

Browse files
committed
Merge remote-tracking branch 'origin/rayhpeng/persistence-scaffold' into rayhpeng/persistence-scaffold
2 parents d592a98 + e4e4320 commit ddd8613

4 files changed

Lines changed: 75 additions & 4 deletions

File tree

backend/packages/harness/deerflow/client.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ def stream(
345345
Yields:
346346
StreamEvent with one of:
347347
- type="values" data={"title": str|None, "messages": [...], "artifacts": [...]}
348+
- type="custom" data={...}
348349
- type="messages-tuple" data={"type": "ai", "content": str, "id": str}
349350
- type="messages-tuple" data={"type": "ai", "content": str, "id": str, "usage_metadata": {...}}
350351
- type="messages-tuple" data={"type": "ai", "content": "", "id": str, "tool_calls": [...]}
@@ -365,7 +366,22 @@ def stream(
365366
seen_ids: set[str] = set()
366367
cumulative_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
367368

368-
for chunk in self._agent.stream(state, config=config, context=context, stream_mode="values"):
369+
for item in self._agent.stream(
370+
state,
371+
config=config,
372+
context=context,
373+
stream_mode=["values", "custom"],
374+
):
375+
if isinstance(item, tuple) and len(item) == 2:
376+
mode, chunk = item
377+
mode = str(mode)
378+
else:
379+
mode, chunk = "values", item
380+
381+
if mode == "custom":
382+
yield StreamEvent(type="custom", data=chunk)
383+
continue
384+
369385
messages = chunk.get("messages", [])
370386

371387
for msg in messages:

backend/tests/test_client.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import tempfile
77
import zipfile
8+
from enum import Enum
89
from pathlib import Path
910
from unittest.mock import MagicMock, patch
1011

@@ -205,6 +206,33 @@ def test_basic_message(self, client):
205206
msg_events = _ai_events(events)
206207
assert msg_events[0].data["content"] == "Hello!"
207208

209+
def test_custom_events_are_forwarded(self, client):
210+
"""stream() forwards custom stream events alongside normal values output."""
211+
ai = AIMessage(content="Hello!", id="ai-1")
212+
agent = MagicMock()
213+
agent.stream.return_value = iter(
214+
[
215+
("custom", {"type": "task_started", "task_id": "task-1"}),
216+
("values", {"messages": [HumanMessage(content="hi", id="h-1"), ai]}),
217+
]
218+
)
219+
220+
with (
221+
patch.object(client, "_ensure_agent"),
222+
patch.object(client, "_agent", agent),
223+
):
224+
events = list(client.stream("hi", thread_id="t-custom"))
225+
226+
agent.stream.assert_called_once()
227+
call_kwargs = agent.stream.call_args.kwargs
228+
assert call_kwargs["stream_mode"] == ["values", "custom"]
229+
230+
assert events[0].type == "custom"
231+
assert events[0].data == {"type": "task_started", "task_id": "task-1"}
232+
assert any(event.type == "messages-tuple" and event.data["content"] == "Hello!" for event in events)
233+
assert any(event.type == "values" for event in events)
234+
assert events[-1].type == "end"
235+
208236
def test_context_propagation(self, client):
209237
"""stream() passes agent_name to the context."""
210238
agent = _make_agent_mock([{"messages": [AIMessage(content="ok", id="ai-1")]}])
@@ -222,6 +250,33 @@ def test_context_propagation(self, client):
222250
assert call_kwargs["context"]["thread_id"] == "t1"
223251
assert call_kwargs["context"]["agent_name"] == "test-agent-1"
224252

253+
def test_custom_mode_is_normalized_to_string(self, client):
254+
"""stream() forwards custom events even when the mode is not a plain string."""
255+
256+
class StreamMode(Enum):
257+
CUSTOM = "custom"
258+
259+
def __str__(self):
260+
return self.value
261+
262+
agent = _make_agent_mock(
263+
[
264+
(StreamMode.CUSTOM, {"type": "task_started", "task_id": "task-1"}),
265+
{"messages": [AIMessage(content="Hello!", id="ai-1")]},
266+
]
267+
)
268+
269+
with (
270+
patch.object(client, "_ensure_agent"),
271+
patch.object(client, "_agent", agent),
272+
):
273+
events = list(client.stream("hi", thread_id="t-custom-enum"))
274+
275+
assert events[0].type == "custom"
276+
assert events[0].data == {"type": "task_started", "task_id": "task-1"}
277+
assert any(event.type == "messages-tuple" and event.data["content"] == "Hello!" for event in events)
278+
assert events[-1].type == "end"
279+
225280
def test_tool_call_and_result(self, client):
226281
"""stream() emits messages-tuple events for tool calls and results."""
227282
ai = AIMessage(content="", id="ai-1", tool_calls=[{"name": "bash", "args": {"cmd": "ls"}, "id": "tc-1"}])

docker/docker-compose-dev.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ services:
123123
UV_IMAGE: ${UV_IMAGE:-ghcr.io/astral-sh/uv:0.7.20}
124124
UV_INDEX_URL: ${UV_INDEX_URL:-https://pypi.org/simple}
125125
container_name: deer-flow-gateway
126-
command: sh -c "cd backend && uv sync && PYTHONPATH=. uv run uvicorn app.gateway.app:app --host 0.0.0.0 --port 8001 --reload --reload-include='*.yaml .env' > /app/logs/gateway.log 2>&1"
126+
command: sh -c "{ cd backend && (uv sync || (echo '[startup] uv sync failed; recreating .venv and retrying once' && uv venv --allow-existing .venv && uv sync)) && PYTHONPATH=. uv run uvicorn app.gateway.app:app --host 0.0.0.0 --port 8001 --reload --reload-include='*.yaml .env'; } > /app/logs/gateway.log 2>&1"
127127
volumes:
128128
- ../backend/:/app/backend/
129129
# Preserve the .venv built during Docker image build — mounting the full backend/
@@ -180,7 +180,7 @@ services:
180180
UV_IMAGE: ${UV_IMAGE:-ghcr.io/astral-sh/uv:0.7.20}
181181
UV_INDEX_URL: ${UV_INDEX_URL:-https://pypi.org/simple}
182182
container_name: deer-flow-langgraph
183-
command: sh -c "cd backend && uv sync && allow_blocking='' && if [ \"\${LANGGRAPH_ALLOW_BLOCKING:-0}\" = '1' ]; then allow_blocking='--allow-blocking'; fi && uv run langgraph dev --no-browser \${allow_blocking} --host 0.0.0.0 --port 2024 --n-jobs-per-worker \${LANGGRAPH_JOBS_PER_WORKER:-10} > /app/logs/langgraph.log 2>&1"
183+
command: sh -c "cd backend && { (uv sync || (echo '[startup] uv sync failed; recreating .venv and retrying once' && uv venv --allow-existing .venv && uv sync)) && allow_blocking='' && if [ \"\${LANGGRAPH_ALLOW_BLOCKING:-0}\" = '1' ]; then allow_blocking='--allow-blocking'; fi && uv run langgraph dev --no-browser \${allow_blocking} --host 0.0.0.0 --port 2024 --n-jobs-per-worker \${LANGGRAPH_JOBS_PER_WORKER:-10}; } > /app/logs/langgraph.log 2>&1"
184184
volumes:
185185
- ../backend/:/app/backend/
186186
# Preserve the .venv built during Docker image build — mounting the full backend/

docker/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ services:
127127
UV_INDEX_URL: ${UV_INDEX_URL:-https://pypi.org/simple}
128128
UV_EXTRAS: ${UV_EXTRAS:-}
129129
container_name: deer-flow-langgraph
130-
command: sh -c 'cd /app/backend && allow_blocking_flag="" && if [ "${LANGGRAPH_ALLOW_BLOCKING:-0}" = "1" ]; then allow_blocking_flag="--allow-blocking"; fi && uv run langgraph dev --no-browser ${allow_blocking_flag} --no-reload --host 0.0.0.0 --port 2024 --n-jobs-per-worker ${LANGGRAPH_JOBS_PER_WORKER:-10}'
130+
command: sh -c 'cd /app/backend && allow_blocking="" && if [ "\${LANGGRAPH_ALLOW_BLOCKING:-0}" = "1" ]; then allow_blocking="--allow-blocking"; fi && uv run langgraph dev --no-browser \${allow_blocking} --no-reload --host 0.0.0.0 --port 2024 --n-jobs-per-worker \${LANGGRAPH_JOBS_PER_WORKER:-10}'
131131
volumes:
132132
- ${DEER_FLOW_CONFIG_PATH}:/app/backend/config.yaml:ro
133133
- ${DEER_FLOW_EXTENSIONS_CONFIG_PATH}:/app/backend/extensions_config.json:ro

0 commit comments

Comments
 (0)