-
-
Notifications
You must be signed in to change notification settings - Fork 895
Expand file tree
/
Copy pathdeep_research_agent.py
More file actions
1442 lines (1244 loc) · 56.8 KB
/
deep_research_agent.py
File metadata and controls
1442 lines (1244 loc) · 56.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Deep Research Agent Module
This module provides the DeepResearchAgent class for automating complex research
workflows using Deep Research APIs from multiple providers:
- **OpenAI**: o3-deep-research, o4-mini-deep-research (via Responses API)
- **Gemini**: deep-research-pro (via Interactions API)
The agent automatically detects the provider based on the model name and uses
the appropriate API.
Example:
from praisonaiagents import DeepResearchAgent
# OpenAI Deep Research
agent = DeepResearchAgent(
name="Research Assistant",
model="o3-deep-research",
instructions="You are a professional researcher..."
)
# Gemini Deep Research
agent = DeepResearchAgent(
name="Research Assistant",
model="deep-research-pro",
instructions="You are a professional researcher..."
)
result = agent.research("What are the economic impacts of AI on healthcare?")
print(result.report)
for citation in result.citations:
print(f"- {citation.title}: {citation.url}")
"""
import os
import logging
from praisonaiagents._logging import get_logger
import time
import asyncio
from typing import List, Optional, Any, Dict, Literal
from dataclasses import dataclass, field
from enum import Enum
# Data classes for Deep Research response types
@dataclass
class Citation:
"""Represents a citation in the research report."""
title: str
url: str
start_index: int = 0
end_index: int = 0
def __repr__(self):
return f"Citation(title='{self.title}', url='{self.url}')"
@dataclass
class ReasoningStep:
"""Represents a reasoning step in the research process."""
text: str
type: str = "reasoning"
def __repr__(self):
return f"ReasoningStep(text='{self.text[:50]}...')" if len(self.text) > 50 else f"ReasoningStep(text='{self.text}')"
@dataclass
class WebSearchCall:
"""Represents a web search call made during research."""
query: str
status: str
def __repr__(self):
return f"WebSearchCall(query='{self.query}', status='{self.status}')"
@dataclass
class CodeExecutionStep:
"""Represents a code execution step during research."""
input_code: str
output: Optional[str] = None
def __repr__(self):
return f"CodeExecutionStep(input='{self.input_code[:50]}...')" if len(self.input_code) > 50 else f"CodeExecutionStep(input='{self.input_code}')"
@dataclass
class MCPCall:
"""Represents an MCP tool call during research."""
name: str
server_label: str
arguments: Dict[str, Any]
def __repr__(self):
return f"MCPCall(name='{self.name}', server='{self.server_label}')"
@dataclass
class FileSearchCall:
"""Represents a file search call (Gemini-specific)."""
store_names: List[str]
def __repr__(self):
return f"FileSearchCall(stores={self.store_names})"
@dataclass
class DeepResearchResponse:
"""
Complete response from a Deep Research query.
Attributes:
report: The final research report text
citations: List of citations with source metadata
reasoning_steps: List of reasoning steps taken
web_searches: List of web search queries executed
code_executions: List of code execution steps
mcp_calls: List of MCP tool calls
file_searches: List of file search calls (Gemini)
provider: The provider used (openai, gemini, litellm)
interaction_id: Interaction ID (Gemini) or Response ID (OpenAI)
raw_response: The raw API response object
"""
report: str
citations: List[Citation] = field(default_factory=list)
reasoning_steps: List[ReasoningStep] = field(default_factory=list)
web_searches: List[WebSearchCall] = field(default_factory=list)
code_executions: List[CodeExecutionStep] = field(default_factory=list)
mcp_calls: List[MCPCall] = field(default_factory=list)
file_searches: List[FileSearchCall] = field(default_factory=list)
provider: str = "openai"
interaction_id: Optional[str] = None
raw_response: Optional[Any] = None
def get_citation_text(self, citation: Citation) -> str:
"""Extract the text that a citation refers to."""
if citation.start_index and citation.end_index:
return self.report[citation.start_index:citation.end_index]
return ""
def get_all_sources(self) -> List[Dict[str, str]]:
"""Get a list of all unique sources cited."""
seen = set()
sources = []
for c in self.citations:
if c.url not in seen:
seen.add(c.url)
sources.append({"title": c.title, "url": c.url})
return sources
class Provider(Enum):
"""Supported Deep Research providers."""
OPENAI = "openai"
GEMINI = "gemini"
LITELLM = "litellm"
class DeepResearchAgent:
"""
Agent for performing deep research using multiple provider APIs.
Supports:
- **OpenAI Deep Research**: o3-deep-research, o4-mini-deep-research
- **Gemini Deep Research**: deep-research-pro-preview
- **LiteLLM**: Unified interface for OpenAI models
The provider is auto-detected based on the model name, or can be explicitly set.
Example:
# OpenAI (default)
agent = DeepResearchAgent(
model="o3-deep-research",
instructions="You are a professional researcher."
)
# Gemini
agent = DeepResearchAgent(
model="deep-research-pro",
instructions="You are a professional researcher."
)
# Using LiteLLM
agent = DeepResearchAgent(
model="o3-deep-research",
use_litellm=True
)
result = agent.research("Research the economic impact of AI on healthcare.")
print(result.report)
"""
# OpenAI Deep Research models
OPENAI_MODELS = {
"o3-deep-research": "o3-deep-research-2025-06-26",
"o3-deep-research-2025-06-26": "o3-deep-research-2025-06-26",
"o4-mini-deep-research": "o4-mini-deep-research-2025-06-26",
"o4-mini-deep-research-2025-06-26": "o4-mini-deep-research-2025-06-26",
}
# Gemini Deep Research models
GEMINI_MODELS = {
"deep-research-pro": "deep-research-pro-preview-12-2025",
"deep-research-pro-preview": "deep-research-pro-preview-12-2025",
"deep-research-pro-preview-12-2025": "deep-research-pro-preview-12-2025",
"gemini-deep-research": "deep-research-pro-preview-12-2025",
}
# Default models
DEFAULT_OPENAI_MODEL = "o3-deep-research-2025-06-26"
DEFAULT_GEMINI_MODEL = "deep-research-pro-preview-12-2025"
def __init__(
self,
name: Optional[str] = None,
instructions: Optional[str] = None,
model: str = "o3-deep-research",
provider: Optional[Literal["openai", "gemini", "litellm", "auto"]] = "auto",
api_key: Optional[str] = None,
base_url: Optional[str] = None,
verbose: bool = True,
summary_mode: Literal["auto", "detailed", "concise"] = "auto",
enable_web_search: bool = True,
enable_code_interpreter: bool = False,
enable_file_search: bool = False,
mcp_servers: Optional[List[Dict[str, Any]]] = None,
file_search_stores: Optional[List[str]] = None,
use_litellm: bool = False,
poll_interval: int = 10,
max_wait_time: int = 3600,
):
"""
Initialize a DeepResearchAgent.
Args:
name: Name of the agent for identification
instructions: System instructions that define the agent's behavior
model: Deep Research model to use:
- OpenAI: "o3-deep-research", "o4-mini-deep-research"
- Gemini: "deep-research-pro", "deep-research-pro-preview"
provider: Provider to use ("openai", "gemini", "litellm", or "auto")
api_key: API key (defaults to OPENAI_API_KEY or GEMINI_API_KEY)
base_url: Custom base URL for API endpoints
verbose: Enable detailed logging and progress updates
summary_mode: Summary detail level - "auto", "detailed", or "concise"
enable_web_search: Enable web search tool
enable_code_interpreter: Enable code interpreter for data analysis
enable_file_search: Enable file search (Gemini only)
mcp_servers: List of MCP server configurations (OpenAI only)
file_search_stores: File search store names (Gemini only)
use_litellm: Use LiteLLM for OpenAI models
poll_interval: Seconds between status polls (Gemini background mode)
max_wait_time: Maximum seconds to wait for research completion
"""
self.name = name or "DeepResearchAgent"
# Default instructions for comprehensive research
default_instructions = """
You are a professional research analyst. When conducting research:
- Include specific figures, trends, statistics, and measurable outcomes
- Prioritize reliable, up-to-date sources: peer-reviewed research, official organizations, regulatory agencies
- Include inline citations and return all source metadata
- Be analytical, avoid generalities, and ensure data-backed reasoning
- Structure your response with clear headers and formatting
- If creating comparisons, use tables for clarity
"""
self.instructions = instructions or default_instructions
self.verbose = verbose
self.summary_mode = summary_mode
self.enable_web_search = enable_web_search
self.enable_code_interpreter = enable_code_interpreter
self.enable_file_search = enable_file_search
self.mcp_servers = mcp_servers or []
self.file_search_stores = file_search_stores or []
self.use_litellm = use_litellm
self.poll_interval = poll_interval
self.max_wait_time = max_wait_time
self.base_url = base_url
# Detect provider from model name
self.provider = self._detect_provider(model, provider)
# Resolve model name
if self.provider == Provider.GEMINI:
self.model = self.GEMINI_MODELS.get(model, model)
else:
self.model = self.OPENAI_MODELS.get(model, model)
# Set up API key
self._setup_api_key(api_key)
# Initialize clients lazily
self._openai_client = None
self._async_openai_client = None
self._gemini_client = None
# Configure logging
self.logger = get_logger(__name__)
if self.verbose:
self.logger.debug(f"DeepResearchAgent initialized: provider={self.provider.value}, model={self.model}")
def _detect_provider(
self,
model: str,
provider: Optional[str]
) -> Provider:
"""Detect the provider based on model name or explicit setting."""
if provider and provider != "auto":
if provider == "litellm":
return Provider.LITELLM
elif provider == "gemini":
return Provider.GEMINI
else:
return Provider.OPENAI
# Auto-detect from model name
model_lower = model.lower()
if any(g in model_lower for g in ["gemini", "deep-research-pro"]):
return Provider.GEMINI
elif self.use_litellm:
return Provider.LITELLM
else:
return Provider.OPENAI
def _setup_api_key(self, api_key: Optional[str]):
"""Set up the API key based on provider."""
if api_key:
self.api_key = api_key
elif self.provider == Provider.GEMINI:
self.api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY")
if not self.api_key:
raise ValueError(
"GEMINI_API_KEY or GOOGLE_API_KEY environment variable is required for Gemini Deep Research."
)
else:
self.api_key = os.environ.get("OPENAI_API_KEY")
if not self.api_key:
raise ValueError(
"OPENAI_API_KEY environment variable is required for OpenAI Deep Research."
)
@property
def openai_client(self):
"""Get the synchronous OpenAI client (lazy initialization)."""
if self._openai_client is None:
from openai import OpenAI
self._openai_client = OpenAI(api_key=self.api_key, base_url=self.base_url)
return self._openai_client
@property
def async_openai_client(self):
"""Get the asynchronous OpenAI client (lazy initialization)."""
if self._async_openai_client is None:
from openai import AsyncOpenAI
self._async_openai_client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)
return self._async_openai_client
@property
def gemini_client(self):
"""Get the Gemini client (lazy initialization)."""
if self._gemini_client is None:
try:
from google import genai
self._gemini_client = genai.Client(api_key=self.api_key)
except ImportError:
raise ImportError(
"google-genai package is required for Gemini Deep Research. "
"Install with: pip install google-genai"
)
return self._gemini_client
def _build_openai_tools(
self,
web_search: bool = True,
code_interpreter: bool = False,
mcp_servers: Optional[List[Dict[str, Any]]] = None,
file_ids: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Build tools list for OpenAI Deep Research API."""
tools = []
if web_search:
tools.append({"type": "web_search_preview"})
if code_interpreter:
tools.append({
"type": "code_interpreter",
"container": {
"type": "auto",
"file_ids": file_ids or []
}
})
if mcp_servers:
for server in mcp_servers:
tools.append({
"type": "mcp",
"server_label": server.get("label", "mcp_server"),
"server_url": server.get("url"),
"require_approval": server.get("require_approval", "never")
})
return tools
def _build_gemini_tools(
self,
file_search: bool = False,
file_search_stores: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Build tools list for Gemini Deep Research API."""
tools = []
# Gemini has google_search and url_context enabled by default
# Only need to add file_search if requested
if file_search and file_search_stores:
tools.append({
"type": "file_search",
"file_search_store_names": file_search_stores
})
return tools
def _parse_openai_response(self, response: Any) -> DeepResearchResponse:
"""Parse OpenAI Deep Research API response."""
report = ""
citations = []
reasoning_steps = []
web_searches = []
code_executions = []
mcp_calls = []
for item in response.output:
item_type = getattr(item, 'type', None)
if item_type == "message":
if hasattr(item, 'content') and item.content:
for content_item in item.content:
if hasattr(content_item, 'text'):
report = content_item.text
if hasattr(content_item, 'annotations'):
for ann in content_item.annotations:
citations.append(Citation(
title=getattr(ann, 'title', ''),
url=getattr(ann, 'url', ''),
start_index=getattr(ann, 'start_index', 0),
end_index=getattr(ann, 'end_index', 0)
))
elif item_type == "reasoning":
if hasattr(item, 'summary'):
for s in item.summary:
reasoning_steps.append(ReasoningStep(
text=getattr(s, 'text', str(s))
))
elif item_type == "web_search_call":
action = getattr(item, 'action', {})
query = action.get('query', '') if isinstance(action, dict) else ''
status = getattr(item, 'status', 'unknown')
web_searches.append(WebSearchCall(query=query, status=status))
elif item_type == "code_interpreter_call":
code_executions.append(CodeExecutionStep(
input_code=getattr(item, 'input', ''),
output=getattr(item, 'output', None)
))
elif item_type == "mcp_call":
if hasattr(item, 'arguments') and item.arguments:
mcp_calls.append(MCPCall(
name=getattr(item, 'name', ''),
server_label=getattr(item, 'server_label', ''),
arguments=item.arguments
))
return DeepResearchResponse(
report=report,
citations=citations,
reasoning_steps=reasoning_steps,
web_searches=web_searches,
code_executions=code_executions,
mcp_calls=mcp_calls,
provider="openai",
interaction_id=getattr(response, 'id', None),
raw_response=response
)
def _parse_gemini_response(
self,
interaction: Any,
fallback_text: str = "",
fallback_reasoning: Optional[List[ReasoningStep]] = None
) -> DeepResearchResponse:
"""Parse Gemini Deep Research API response.
Args:
interaction: The Gemini interaction object
fallback_text: Accumulated text from streaming (used if outputs parsing fails)
fallback_reasoning: Reasoning steps collected during streaming
"""
report = ""
citations = []
reasoning_steps = fallback_reasoning or []
# Try multiple attribute paths for Gemini output structure
# Path 1: Direct outputs with text attribute
if hasattr(interaction, 'outputs') and interaction.outputs:
last_output = interaction.outputs[-1]
if hasattr(last_output, 'text') and last_output.text:
report = last_output.text
elif hasattr(last_output, 'content'):
# Gemini nested structure: content.parts[0].text
content = last_output.content
if hasattr(content, 'parts') and content.parts:
first_part = content.parts[0]
if hasattr(first_part, 'text'):
report = first_part.text
else:
report = str(first_part)
elif hasattr(content, 'text'):
report = content.text
else:
report = str(content)
# Path 2: Direct result attribute
if not report and hasattr(interaction, 'result'):
result = interaction.result
if hasattr(result, 'text'):
report = result.text
else:
report = str(result)
# Path 3: Response attribute
if not report and hasattr(interaction, 'response'):
resp = interaction.response
if hasattr(resp, 'text'):
report = resp.text
elif hasattr(resp, 'content'):
report = str(resp.content)
# Path 4: Fallback to streamed content (critical fix)
if not report and fallback_text:
report = fallback_text
if self.verbose:
self.logger.debug("Using fallback streamed text for report")
# Try to extract citations from grounding metadata
if hasattr(interaction, 'outputs') and interaction.outputs:
for output in interaction.outputs:
if hasattr(output, 'grounding_metadata'):
metadata = output.grounding_metadata
if hasattr(metadata, 'grounding_chunks'):
for chunk in metadata.grounding_chunks:
if hasattr(chunk, 'web') and chunk.web:
citations.append(Citation(
title=getattr(chunk.web, 'title', ''),
url=getattr(chunk.web, 'uri', ''),
))
# Log warning if report is empty
if not report:
self.logger.warning(
"Gemini response parsing returned empty report. "
f"Interaction ID: {getattr(interaction, 'id', 'unknown')}, "
f"Status: {getattr(interaction, 'status', 'unknown')}"
)
return DeepResearchResponse(
report=report,
citations=citations,
reasoning_steps=reasoning_steps,
provider="gemini",
interaction_id=getattr(interaction, 'id', None),
raw_response=interaction
)
def _parse_litellm_response(self, response: Any) -> DeepResearchResponse:
"""Parse LiteLLM response (same structure as OpenAI)."""
return self._parse_openai_response(response)
def _research_openai(
self,
query: str,
instructions: Optional[str] = None,
model: Optional[str] = None,
summary_mode: Optional[str] = None,
web_search: bool = True,
code_interpreter: bool = False,
mcp_servers: Optional[List[Dict[str, Any]]] = None,
file_ids: Optional[List[str]] = None,
stream: bool = False,
) -> DeepResearchResponse:
"""Perform research using OpenAI Deep Research API."""
input_messages = []
system_instructions = instructions or self.instructions
if system_instructions:
input_messages.append({
"role": "developer",
"content": [{"type": "input_text", "text": system_instructions}]
})
input_messages.append({
"role": "user",
"content": [{"type": "input_text", "text": query}]
})
tools = self._build_openai_tools(
web_search=web_search,
code_interpreter=code_interpreter,
mcp_servers=mcp_servers,
file_ids=file_ids
)
if stream:
return self._research_openai_streaming(
input_messages=input_messages,
model=model or self.model,
summary_mode=summary_mode or self.summary_mode,
tools=tools
)
response = self.openai_client.responses.create(
model=model or self.model,
input=input_messages,
reasoning={"summary": summary_mode or self.summary_mode},
tools=tools if tools else None
)
return self._parse_openai_response(response)
def _research_openai_streaming(
self,
input_messages: List[Dict[str, Any]],
model: str,
summary_mode: str,
tools: Optional[List[Dict[str, Any]]] = None,
) -> DeepResearchResponse:
"""
Perform OpenAI research with streaming for real-time progress updates.
Streaming provides:
- Real-time reasoning summaries (buffered per paragraph)
- Web search progress updates
- Incremental text output
"""
response_id = None
final_text = ""
citations = []
reasoning_steps = []
web_searches = []
code_executions = []
mcp_calls = []
# Buffer for accumulating reasoning text before display
reasoning_buffer = ""
if self.verbose:
print("\n" + "=" * 60)
print("🔍 OPENAI DEEP RESEARCH (Streaming)")
print("=" * 60 + "\n")
try:
stream = self.openai_client.responses.create(
model=model,
input=input_messages,
reasoning={"summary": summary_mode},
tools=tools if tools else None,
stream=True
)
for event in stream:
event_type = getattr(event, 'type', None)
# Capture response ID
if event_type == "response.created":
response_id = getattr(event.response, 'id', None)
if self.verbose:
print(f"📋 Research started: {response_id}\n")
# Handle reasoning/thinking updates - buffer tokens
elif event_type == "response.reasoning_summary_text.delta":
text = getattr(event, 'delta', '')
if text:
reasoning_buffer += text
# When reasoning paragraph is complete, display it
elif event_type == "response.reasoning_summary_text.done":
text = getattr(event, 'text', '')
if text:
reasoning_steps.append(ReasoningStep(text=text))
if self.verbose:
# Display the complete thought with single 💭
print(f"💭 {text}\n", flush=True)
reasoning_buffer = "" # Reset buffer
# Handle web search events
elif event_type == "response.web_search_call.searching":
query = getattr(event, 'query', '')
if self.verbose:
print(f"🔎 Searching: {query}", flush=True)
web_searches.append(WebSearchCall(query=query, status="searching"))
elif event_type == "response.web_search_call.completed":
if self.verbose:
print(" ✓ Search completed", flush=True)
# Handle output text streaming
elif event_type == "response.output_text.delta":
text = getattr(event, 'delta', '')
final_text += text
if self.verbose:
print(text, end="", flush=True)
# Handle code interpreter events
elif event_type == "response.code_interpreter_call.code.delta":
code = getattr(event, 'delta', '')
if self.verbose:
print(f"💻 {code}", end="", flush=True)
elif event_type == "response.code_interpreter_call.completed":
input_code = getattr(event, 'code', '')
output = getattr(event, 'output', '')
code_executions.append(CodeExecutionStep(input_code=input_code, output=output))
# Handle MCP events
elif event_type == "response.mcp_call.arguments.delta":
if self.verbose:
args = getattr(event, 'delta', '')
print(f"🔧 MCP: {args}", end="", flush=True)
# Handle completion
elif event_type == "response.completed":
if self.verbose:
print("\n\n" + "=" * 60)
print("✅ Research Complete")
print("=" * 60 + "\n")
# Parse the final response for citations
response = getattr(event, 'response', None)
if response:
return self._parse_openai_response(response)
except Exception as e:
self.logger.error(f"OpenAI streaming error: {e}")
raise
# Fallback: construct response from streamed content
return DeepResearchResponse(
report=final_text,
citations=citations,
reasoning_steps=reasoning_steps,
web_searches=web_searches,
code_executions=code_executions,
mcp_calls=mcp_calls,
provider="openai",
interaction_id=response_id
)
def _research_litellm(
self,
query: str,
instructions: Optional[str] = None,
model: Optional[str] = None,
summary_mode: Optional[str] = None,
web_search: bool = True,
code_interpreter: bool = False,
mcp_servers: Optional[List[Dict[str, Any]]] = None,
file_ids: Optional[List[str]] = None,
) -> DeepResearchResponse:
"""Perform research using LiteLLM (OpenAI Responses API bridge)."""
try:
import litellm
except ImportError:
raise ImportError(
"litellm package is required for LiteLLM provider. "
"Install with: pip install litellm"
)
tools = self._build_openai_tools(
web_search=web_search,
code_interpreter=code_interpreter,
mcp_servers=mcp_servers,
file_ids=file_ids
)
# Build input for LiteLLM responses
input_content = query
if instructions or self.instructions:
system_instructions = instructions or self.instructions
input_content = f"{system_instructions}\n\n{query}"
response = litellm.responses(
model=model or self.model,
input=input_content,
tools=tools if tools else None
)
return self._parse_litellm_response(response)
def _research_gemini(
self,
query: str,
instructions: Optional[str] = None,
model: Optional[str] = None,
file_search: bool = False,
file_search_stores: Optional[List[str]] = None,
stream: bool = False,
) -> DeepResearchResponse:
"""Perform research using Gemini Deep Research API (Interactions API)."""
# Build input with instructions
input_text = query
if instructions or self.instructions:
system_instructions = instructions or self.instructions
input_text = f"{system_instructions}\n\n{query}"
tools = self._build_gemini_tools(
file_search=file_search,
file_search_stores=file_search_stores
)
# Create interaction with background=True (required for Deep Research)
create_params = {
"input": input_text,
"agent": model or self.model,
"background": True,
}
if tools:
create_params["tools"] = tools
# Use streaming for real-time progress updates
if stream:
return self._research_gemini_streaming(create_params)
# Non-streaming: poll for completion
interaction = self.gemini_client.interactions.create(**create_params)
if self.verbose:
self.logger.debug(f"Gemini research started: {interaction.id}")
start_time = time.time()
while True:
interaction = self.gemini_client.interactions.get(interaction.id)
if interaction.status == "completed":
if self.verbose:
self.logger.debug("Gemini research completed")
break
elif interaction.status == "failed":
error = getattr(interaction, 'error', 'Unknown error')
raise RuntimeError(f"Gemini Deep Research failed: {error}")
elif interaction.status == "cancelled":
raise RuntimeError("Gemini Deep Research was cancelled")
elapsed = time.time() - start_time
if elapsed > self.max_wait_time:
raise TimeoutError(
f"Gemini Deep Research timed out after {self.max_wait_time}s"
)
if self.verbose:
self.logger.debug(f"Research in progress... ({elapsed:.0f}s)")
time.sleep(self.poll_interval)
return self._parse_gemini_response(interaction)
async def _research_gemini_async(
self,
query: str,
instructions: Optional[str] = None,
model: Optional[str] = None,
file_search: bool = False,
file_search_stores: Optional[List[Dict[str, Any]]] = None,
grounding_file_prompt: Optional[str] = None,
) -> DeepResearchResponse:
"""
Async version of _research_gemini with non-blocking polling.
Uses asyncio.sleep() to prevent blocking the event loop.
"""
import asyncio
import time
if self.provider != Provider.GEMINI:
raise ValueError("This method requires GEMINI provider")
if not self.gemini_client:
raise RuntimeError("Gemini client is required for Deep Research")
create_params = {
"query": query,
"file_search": file_search,
}
if instructions:
create_params["instructions"] = instructions
if model:
create_params["model"] = model
if file_search_stores:
create_params["file_search_stores"] = file_search_stores
if grounding_file_prompt:
create_params["grounding_file_prompt"] = grounding_file_prompt
interaction = self.gemini_client.interactions.create(**create_params)
if self.verbose:
self.logger.debug(f"Gemini research started (async): {interaction.id}")
start_time = time.time()
while True:
interaction = self.gemini_client.interactions.get(interaction.id)
if interaction.status == "completed":
if self.verbose:
self.logger.debug("Gemini research completed (async)")
break
elif interaction.status == "failed":
error = getattr(interaction, 'error', 'Unknown error')
raise RuntimeError(f"Gemini Deep Research failed: {error}")
elif interaction.status == "cancelled":
raise RuntimeError("Gemini Deep Research was cancelled")
elapsed = time.time() - start_time
if elapsed > self.max_wait_time:
raise TimeoutError(
f"Gemini Deep Research timed out after {self.max_wait_time}s"
)
if self.verbose:
self.logger.debug(f"Research in progress (async)... ({elapsed:.0f}s)")
# Use async sleep to avoid blocking event loop
await asyncio.sleep(self.poll_interval)
return self._parse_gemini_response(interaction)
def _research_gemini_streaming(
self,
create_params: Dict[str, Any],
) -> DeepResearchResponse:
"""
Perform Gemini research with streaming for real-time progress updates.
Streaming provides:
- Real-time thinking summaries (research progress)
- Incremental text output as it's generated
- Better user experience for long-running research
"""
# Enable streaming with thinking summaries
create_params["stream"] = True
create_params["agent_config"] = {
"type": "deep-research",
"thinking_summaries": "auto"
}
interaction_id = None
last_event_id = None
final_text = ""
reasoning_steps = []
is_complete = False
if self.verbose:
print("\n" + "=" * 60)
print("🔍 GEMINI DEEP RESEARCH (Streaming)")
print("=" * 60 + "\n")
try:
stream = self.gemini_client.interactions.create(**create_params)
for chunk in stream:
# Capture interaction ID for reconnection
if chunk.event_type == "interaction.start":
interaction_id = chunk.interaction.id
if self.verbose:
print(f"📋 Research started: {interaction_id}\n")
# Track event ID for potential reconnection
if hasattr(chunk, 'event_id') and chunk.event_id:
last_event_id = chunk.event_id
# Handle content deltas
if chunk.event_type == "content.delta":
delta = chunk.delta
if hasattr(delta, 'type'):
if delta.type == "text":
# Stream final report text
text = delta.text if hasattr(delta, 'text') else ""
final_text += text
if self.verbose:
print(text, end="", flush=True)
elif delta.type == "thought_summary":
# Show thinking/reasoning progress
thought = ""
if hasattr(delta, 'content') and hasattr(delta.content, 'text'):
thought = delta.content.text
elif hasattr(delta, 'text'):
thought = delta.text
if thought:
reasoning_steps.append(ReasoningStep(text=thought))
if self.verbose:
print(f"💭 {thought}", flush=True)