Skip to content

Commit b94383c

Browse files
rayhpengclaude
andcommitted
fix(persistence): address 22 review comments from CodeQL, Copilot, and Code Quality
Bug fixes: - Sanitize log params to prevent log injection (CodeQL) - Reset threads_meta.status to idle/error when run completes - Attach messages only to latest checkpoint in /history response - Write threads_meta on POST /threads so new threads appear in search Lint fixes: - Remove unused imports (journal.py, migrations/env.py, test_converters.py) - Convert lambda to named function (engine.py, Ruff E731) - Remove unused logger definitions in repos (Ruff F841) - Add logging to JSONL decode errors and empty except blocks - Separate assert side-effects in tests (CodeQL) - Remove unused local variables in tests (Ruff F841) - Fix max_trace_content truncation to use byte length, not char length Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 32f6967 commit b94383c

15 files changed

Lines changed: 94 additions & 55 deletions

File tree

backend/app/gateway/routers/threads.py

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
router = APIRouter(prefix="/api/threads", tags=["threads"])
3636

3737

38+
def _sanitize_log_param(value: str) -> str:
39+
"""Strip control characters to prevent log injection."""
40+
return value.replace("\n", "").replace("\r", "").replace("\x00", "")
41+
42+
3843
# ---------------------------------------------------------------------------
3944
# Response / request models
4045
# ---------------------------------------------------------------------------
@@ -136,13 +141,13 @@ def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDel
136141
raise HTTPException(status_code=422, detail=str(exc)) from exc
137142
except FileNotFoundError:
138143
# Not critical — thread data may not exist on disk
139-
logger.debug("No local thread data to delete for %s", thread_id)
144+
logger.debug("No local thread data to delete for %s", _sanitize_log_param(thread_id))
140145
return ThreadDeleteResponse(success=True, message=f"No local data for {thread_id}")
141146
except Exception as exc:
142-
logger.exception("Failed to delete thread data for %s", thread_id)
147+
logger.exception("Failed to delete thread data for %s", _sanitize_log_param(thread_id))
143148
raise HTTPException(status_code=500, detail="Failed to delete local thread data.") from exc
144149

145-
logger.info("Deleted local thread data for %s", thread_id)
150+
logger.info("Deleted local thread data for %s", _sanitize_log_param(thread_id))
146151
return ThreadDeleteResponse(success=True, message=f"Deleted local thread data for {thread_id}")
147152

148153

@@ -231,7 +236,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
231236
try:
232237
await store.adelete(THREADS_NS, thread_id)
233238
except Exception:
234-
logger.debug("Could not delete store record for thread %s (not critical)", thread_id)
239+
logger.debug("Could not delete store record for thread %s (not critical)", _sanitize_log_param(thread_id))
235240

236241
# Remove checkpoints (best-effort)
237242
checkpointer = getattr(request.app.state, "checkpointer", None)
@@ -240,7 +245,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
240245
if hasattr(checkpointer, "adelete_thread"):
241246
await checkpointer.adelete_thread(thread_id)
242247
except Exception:
243-
logger.debug("Could not delete checkpoints for thread %s (not critical)", thread_id)
248+
logger.debug("Could not delete checkpoints for thread %s (not critical)", _sanitize_log_param(thread_id))
244249

245250
return response
246251

@@ -284,7 +289,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
284289
},
285290
)
286291
except Exception:
287-
logger.exception("Failed to write thread %s to store", thread_id)
292+
logger.exception("Failed to write thread %s to store", _sanitize_log_param(thread_id))
288293
raise HTTPException(status_code=500, detail="Failed to create thread")
289294

290295
# Write an empty checkpoint so state endpoints work immediately
@@ -302,10 +307,24 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
302307
}
303308
await checkpointer.aput(config, empty_checkpoint(), ckpt_metadata, {})
304309
except Exception:
305-
logger.exception("Failed to create checkpoint for thread %s", thread_id)
310+
logger.exception("Failed to create checkpoint for thread %s", _sanitize_log_param(thread_id))
306311
raise HTTPException(status_code=500, detail="Failed to create thread")
307312

308-
logger.info("Thread created: %s", thread_id)
313+
# Write thread_meta so the thread appears in /threads/search immediately
314+
from app.gateway.deps import get_thread_meta_repo
315+
316+
thread_meta_repo = get_thread_meta_repo(request)
317+
if thread_meta_repo is not None:
318+
try:
319+
await thread_meta_repo.create(
320+
thread_id,
321+
assistant_id=getattr(body, "assistant_id", None),
322+
metadata=body.metadata,
323+
)
324+
except Exception:
325+
logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", _sanitize_log_param(thread_id))
326+
327+
logger.info("Thread created: %s", _sanitize_log_param(thread_id))
309328
return ThreadResponse(
310329
thread_id=thread_id,
311330
status="idle",
@@ -372,7 +391,7 @@ async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Reques
372391
try:
373392
await _store_put(store, updated)
374393
except Exception:
375-
logger.exception("Failed to patch thread %s", thread_id)
394+
logger.exception("Failed to patch thread %s", _sanitize_log_param(thread_id))
376395
raise HTTPException(status_code=500, detail="Failed to update thread")
377396

378397
return ThreadResponse(
@@ -404,7 +423,7 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
404423
try:
405424
checkpoint_tuple = await checkpointer.aget_tuple(config)
406425
except Exception:
407-
logger.exception("Failed to get checkpoint for thread %s", thread_id)
426+
logger.exception("Failed to get checkpoint for thread %s", _sanitize_log_param(thread_id))
408427
raise HTTPException(status_code=500, detail="Failed to get thread")
409428

410429
if record is None and checkpoint_tuple is None:
@@ -452,7 +471,7 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo
452471
try:
453472
checkpoint_tuple = await checkpointer.aget_tuple(config)
454473
except Exception:
455-
logger.exception("Failed to get state for thread %s", thread_id)
474+
logger.exception("Failed to get state for thread %s", _sanitize_log_param(thread_id))
456475
raise HTTPException(status_code=500, detail="Failed to get thread state")
457476

458477
if checkpoint_tuple is None:
@@ -514,7 +533,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
514533
try:
515534
checkpoint_tuple = await checkpointer.aget_tuple(read_config)
516535
except Exception:
517-
logger.exception("Failed to get state for thread %s", thread_id)
536+
logger.exception("Failed to get state for thread %s", _sanitize_log_param(thread_id))
518537
raise HTTPException(status_code=500, detail="Failed to get thread state")
519538

520539
if checkpoint_tuple is None:
@@ -548,7 +567,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
548567
try:
549568
new_config = await checkpointer.aput(write_config, checkpoint, metadata, {})
550569
except Exception:
551-
logger.exception("Failed to update state for thread %s", thread_id)
570+
logger.exception("Failed to update state for thread %s", _sanitize_log_param(thread_id))
552571
raise HTTPException(status_code=500, detail="Failed to update thread state")
553572

554573
new_checkpoint_id: str | None = None
@@ -560,7 +579,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
560579
try:
561580
await _store_upsert(store, thread_id, values={"title": body.values["title"]})
562581
except Exception:
563-
logger.debug("Failed to sync title to store for thread %s (non-fatal)", thread_id)
582+
logger.debug("Failed to sync title to store for thread %s (non-fatal)", _sanitize_log_param(thread_id))
564583

565584
return ThreadStateResponse(
566585
values=serialize_channel_values(channel_values),
@@ -594,16 +613,12 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
594613
try:
595614
all_messages = await event_store.list_messages(thread_id, limit=10_000)
596615
except Exception:
597-
logger.warning("Failed to load messages from event store for thread %s", thread_id, exc_info=True)
616+
logger.warning("Failed to load messages from event store for thread %s", _sanitize_log_param(thread_id), exc_info=True)
598617
all_messages = []
599618

600-
# Group messages by run_id for per-checkpoint assembly
601-
messages_by_run: dict[str, list[dict]] = {}
602-
for msg in all_messages:
603-
run_id = msg.get("run_id", "")
604-
messages_by_run.setdefault(run_id, []).append(msg.get("content", {}))
605619

606620
entries: list[HistoryEntry] = []
621+
is_latest_checkpoint = True
607622
try:
608623
async for checkpoint_tuple in checkpointer.alist(config, limit=body.limit):
609624
ckpt_config = getattr(checkpoint_tuple, "config", {})
@@ -625,9 +640,10 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
625640
if thread_data := channel_values.get("thread_data"):
626641
values["thread_data"] = thread_data
627642

628-
# Attach all messages from event store (not just this checkpoint's run)
629-
if all_messages:
643+
# Attach all messages only to the latest (first) checkpoint entry
644+
if is_latest_checkpoint and all_messages:
630645
values["messages"] = [m.get("content", {}) for m in all_messages]
646+
is_latest_checkpoint = False
631647

632648
# Derive next tasks
633649
tasks_raw = getattr(checkpoint_tuple, "tasks", []) or []
@@ -650,7 +666,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
650666
)
651667
)
652668
except Exception:
653-
logger.exception("Failed to get history for thread %s", thread_id)
669+
logger.exception("Failed to get history for thread %s", _sanitize_log_param(thread_id))
654670
raise HTTPException(status_code=500, detail="Failed to get thread history")
655671

656672
return entries

backend/app/gateway/services.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from langchain_core.messages import HumanMessage
1919

2020
from app.gateway.deps import get_checkpointer, get_run_event_store, get_run_manager, get_run_store, get_store, get_stream_bridge, get_thread_meta_repo
21+
from app.gateway.routers.threads import _sanitize_log_param
2122
from deerflow.runtime import (
2223
END_SENTINEL,
2324
HEARTBEAT_SENTINEL,
@@ -184,7 +185,7 @@ async def _upsert_thread_in_store(store, thread_id: str, metadata: dict | None)
184185
try:
185186
await _store_upsert(store, thread_id, metadata=metadata)
186187
except Exception:
187-
logger.warning("Failed to upsert thread %s in store (non-fatal)", thread_id)
188+
logger.warning("Failed to upsert thread %s in store (non-fatal)", _sanitize_log_param(thread_id))
188189

189190

190191
async def _sync_thread_title_after_run(
@@ -312,7 +313,7 @@ async def start_run(
312313
else:
313314
await thread_meta_repo.update_status(thread_id, "running")
314315
except Exception:
315-
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", thread_id)
316+
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", _sanitize_log_param(thread_id))
316317

317318
agent_factory = resolve_agent_factory(body.assistant_id)
318319
graph_input = normalize_input(body.input)

backend/packages/harness/deerflow/persistence/engine.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010

1111
from __future__ import annotations
1212

13-
import logging
14-
1513
import json
14+
import logging
1615

1716
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
1817

19-
_json_serializer = lambda obj: json.dumps(obj, ensure_ascii=False)
18+
19+
def _json_serializer(obj: object) -> str:
20+
"""JSON serializer with ensure_ascii=False for Chinese character support."""
21+
return json.dumps(obj, ensure_ascii=False)
2022

2123
logger = logging.getLogger(__name__)
2224

@@ -106,7 +108,9 @@ async def init_engine(
106108
try:
107109
import deerflow.persistence.models # noqa: F401
108110
except ImportError:
109-
pass
111+
# Models package not yet available — tables won't be auto-created.
112+
# This is expected during initial scaffolding or minimal installs.
113+
logger.debug("deerflow.persistence.models not found; skipping auto-create tables")
110114

111115
try:
112116
async with _engine.begin() as conn:

backend/packages/harness/deerflow/persistence/migrations/env.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from __future__ import annotations
99

1010
import asyncio
11+
import logging
1112
from logging.config import fileConfig
1213

1314
from alembic import context
@@ -17,9 +18,13 @@
1718

1819
# Import all models so metadata is populated.
1920
try:
20-
import deerflow.persistence.models # noqa: F401
21+
import deerflow.persistence.models # noqa: F401 — register ORM models with Base.metadata
2122
except ImportError:
22-
pass
23+
# Models not available — migration will work with existing metadata only.
24+
logging.getLogger(__name__).warning(
25+
"Could not import deerflow.persistence.models; "
26+
"Alembic may not detect all tables"
27+
)
2328

2429
config = context.config
2530
if config.config_file_name is not None:

backend/packages/harness/deerflow/persistence/repositories/feedback_repo.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from __future__ import annotations
77

8-
import logging
98
import uuid
109
from datetime import UTC, datetime
1110

@@ -14,8 +13,6 @@
1413

1514
from deerflow.persistence.models.feedback import FeedbackRow
1615

17-
logger = logging.getLogger(__name__)
18-
1916

2017
class FeedbackRepository:
2118
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:

backend/packages/harness/deerflow/persistence/repositories/run_repo.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from __future__ import annotations
99

1010
import json
11-
import logging
1211
from datetime import UTC, datetime
1312
from typing import Any
1413

@@ -18,8 +17,6 @@
1817
from deerflow.persistence.models.run import RunRow
1918
from deerflow.runtime.runs.store.base import RunStore
2019

21-
logger = logging.getLogger(__name__)
22-
2320

2421
class RunRepository(RunStore):
2522
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:

backend/packages/harness/deerflow/persistence/repositories/thread_meta_repo.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from __future__ import annotations
44

5-
import logging
65
from datetime import UTC, datetime
76
from typing import Any
87

@@ -11,8 +10,6 @@
1110

1211
from deerflow.persistence.models.thread_meta import ThreadMetaRow
1312

14-
logger = logging.getLogger(__name__)
15-
1613

1714
class ThreadMetaRepository:
1815
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:

backend/packages/harness/deerflow/runtime/events/store/db.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from __future__ import annotations
88

99
import json
10+
import logging
1011
from datetime import UTC, datetime
1112

1213
from sqlalchemy import delete, func, select
@@ -15,6 +16,8 @@
1516
from deerflow.persistence.models.run_event import RunEventRow
1617
from deerflow.runtime.events.store.base import RunEventStore
1718

19+
logger = logging.getLogger(__name__)
20+
1821

1922
class DbRunEventStore(RunEventStore):
2023
def __init__(self, session_factory: async_sessionmaker[AsyncSession], *, max_trace_content: int = 10240):
@@ -35,15 +38,19 @@ def _row_to_dict(row: RunEventRow) -> dict:
3538
try:
3639
d["content"] = json.loads(raw)
3740
except (json.JSONDecodeError, ValueError):
38-
pass
41+
# Content looked like JSON (content_is_dict flag) but failed to parse;
42+
# keep the raw string as-is.
43+
logger.debug("Failed to deserialize content as JSON for event seq=%s", d.get("seq"))
3944
return d
4045

4146
def _truncate_trace(self, category: str, content: str | dict, metadata: dict | None) -> tuple[str | dict, dict]:
4247
if category == "trace":
4348
text = json.dumps(content, default=str, ensure_ascii=False) if isinstance(content, dict) else content
44-
if len(text) > self._max_trace_content:
45-
content = text[: self._max_trace_content]
46-
metadata = {**(metadata or {}), "content_truncated": True}
49+
encoded = text.encode("utf-8")
50+
if len(encoded) > self._max_trace_content:
51+
# Truncate by bytes, then decode back (may cut a multi-byte char, so use errors="ignore")
52+
content = encoded[: self._max_trace_content].decode("utf-8", errors="ignore")
53+
metadata = {**(metadata or {}), "content_truncated": True, "original_byte_length": len(encoded)}
4754
return content, metadata or {}
4855

4956
async def put(self, *, thread_id, run_id, event_type, category, content="", metadata=None, created_at=None):

backend/packages/harness/deerflow/runtime/events/store/jsonl.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def _ensure_seq_loaded(self, thread_id: str) -> None:
5151
record = json.loads(line)
5252
max_seq = max(max_seq, record.get("seq", 0))
5353
except json.JSONDecodeError:
54+
logger.debug("Skipping malformed JSONL line in %s", f)
5455
continue
5556
self._seq_counters[thread_id] = max_seq
5657

@@ -73,6 +74,7 @@ def _read_thread_events(self, thread_id: str) -> list[dict]:
7374
try:
7475
events.append(json.loads(line))
7576
except json.JSONDecodeError:
77+
logger.debug("Skipping malformed JSONL line in %s", f)
7678
continue
7779
events.sort(key=lambda e: e.get("seq", 0))
7880
return events
@@ -89,6 +91,7 @@ def _read_run_events(self, thread_id: str, run_id: str) -> list[dict]:
8991
try:
9092
events.append(json.loads(line))
9193
except json.JSONDecodeError:
94+
logger.debug("Skipping malformed JSONL line in %s", path)
9295
continue
9396
events.sort(key=lambda e: e.get("seq", 0))
9497
return events

backend/packages/harness/deerflow/runtime/journal.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def on_llm_start(self, serialized: dict, prompts: list[str], *, run_id: UUID, **
135135
self._llm_start_times[str(run_id)] = time.monotonic()
136136

137137
def on_llm_end(self, response: Any, *, run_id: UUID, **kwargs: Any) -> None:
138-
from deerflow.runtime.converters import langchain_to_openai_completion, langchain_to_openai_message
138+
from deerflow.runtime.converters import langchain_to_openai_completion
139139

140140
try:
141141
message = response.generations[0][0].message

0 commit comments

Comments
 (0)