-
Notifications
You must be signed in to change notification settings - Fork 495
openai: async streaming spans never finished — AsyncAPIResponse.parse() unawaited in sync hook #17060
Description
Summary
When using the OpenAI Agents SDK with Runner.run_streamed() and ddtrace's OpenAI integration, streaming spans for responses.create(stream=True) are never finished. This breaks LLM Observability trace grouping — sub-agent spans appear disconnected from the parent trace.
Runner.run() (non-streaming) works correctly.
Root cause
In ddtrace/contrib/internal/openai/_endpoint_hooks.py, _EndpointHook.handle_request (a sync generator) calls resp.parse() on AsyncAPIResponse:
def handle_request(self, pin, integration, instance, span, args, kwargs):
self._record_request(pin, integration, instance, span, args, kwargs)
resp, error = yield
if hasattr(resp, "parse"):
self._record_response(pin, integration, span, args, kwargs, resp.parse(), error)
return respFor async responses, resp.parse() returns a coroutine that is never awaited. The unawaited coroutine flows into _record_response → _handle_streamed_response, where _is_async_generator() returns False (a coroutine is not an AsyncStream). The stream is never wrapped in TracedAsyncStream, so:
- The span for
responses.create(stream=True)is never finished (theTracedAsyncStreamthat would callspan.finish()infinalize_streamwas never created). - The dangling open span corrupts parent context for subsequent sub-agent OpenAI calls.
- At process exit:
RuntimeWarning: coroutine 'AsyncAPIResponse.parse' was never awaited.
Fix (two parts, both required)
The fix lives in ddtrace/contrib/internal/openai/patch.py, in async_wrapper (inside _patched_endpoint_async) and _trace_and_await (inside _TracedAsyncPaginator.__await__):
-
Pre-parse
AsyncAPIResponsein the async caller before sending into the sync_traced_endpointgenerator, so the hook receives the actualAsyncStreamand can wrap it inTracedAsyncStream. -
Inject the hook's
TracedAsyncStreamback into the response's parse cache (_parsed_by_typein OpenAI SDK v2.x), so the SDK caller iterates the traced version — which finishes the span on stream completion.
Reproduction
Minimal repro with the OpenAI Agents SDK (orchestrator + two sub-agents via as_tool()):
pip install "ddtrace>=4.6.1" "openai-agents>=0.12.3"
export OPENAI_API_KEY="sk-..." DD_API_KEY="..." DD_LLMOBS_ENABLED=1 DD_LLMOBS_AGENTLESS_ENABLED=1
# Baseline — spans grouped correctly:
ddtrace-run python repro.py non-streamed
# Broken — orphaned sub-agent spans + RuntimeWarning:
ddtrace-run python repro.py streamed
# With runtime fix applied — spans grouped correctly:
ddtrace-run python repro.py streamed-fixrepro.py
import asyncio
import sys
from agents import Agent, Runner
from ddtrace.llmobs import LLMObs
research_agent = Agent(
name="ResearchAgent",
instructions="You are a research assistant. Given a topic, provide a short 2-3 sentence summary of key facts.",
model="gpt-4o-mini",
)
analysis_agent = Agent(
name="AnalysisAgent",
instructions="You are an analysis assistant. Given some information, provide a brief analytical assessment in 2-3 sentences.",
model="gpt-4o-mini",
)
orchestrator = Agent(
name="OrchestratorAgent",
instructions=(
"You coordinate research and analysis. For any user question:\n"
"1. First call research_topic to gather facts.\n"
"2. Then call analyze_info to get an assessment.\n"
"3. Combine both into a final answer.\n"
"Always call BOTH tools."
),
tools=[
research_agent.as_tool(tool_name="research_topic", tool_description="Research a topic and return key facts."),
analysis_agent.as_tool(tool_name="analyze_info", tool_description="Analyze information and return an assessment."),
],
model="gpt-4o-mini",
)
QUESTION = "What are the pros and cons of microservices architecture?"
async def run_non_streamed():
with LLMObs.workflow(name="repro_non_streamed", ml_app="dd-agents-repro"):
result = await Runner.run(starting_agent=orchestrator, input=QUESTION)
print(result.final_output)
async def run_streamed():
with LLMObs.workflow(name="repro_streamed", ml_app="dd-agents-repro"):
result = Runner.run_streamed(starting_agent=orchestrator, input=QUESTION)
async for _ in result.stream_events():
pass
print(result.final_output)
async def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "both"
if mode == "streamed-fix":
import fix; fix.apply()
if mode in ("non-streamed", "both"):
await run_non_streamed()
if mode in ("streamed", "streamed-fix", "both"):
await run_streamed()
LLMObs.flush()
if __name__ == "__main__":
asyncio.run(main())Environment
- ddtrace 4.6.1
- openai 2.29.0
- openai-agents 0.12.5
- Python 3.12.12
- macOS (arm64)