Skip to content

Commit bb3f2c4

Browse files
authored
feat: add AG2 multi-agent RAG example with Pathway (#210)
1 parent 3ef6092 commit bb3f2c4

4 files changed

Lines changed: 387 additions & 0 deletions

File tree

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# AG2 Multi-Agent Conversations with Pathway Real-Time RAG
2+
3+
This example demonstrates how to combine [AG2](https://ag2.ai/) (formerly AutoGen)
4+
multi-agent conversations with [Pathway](https://pathway.com/)'s real-time RAG pipeline.
5+
6+
AG2 is a multi-agent conversation framework with 500K+ monthly PyPI downloads,
7+
4,300+ GitHub stars, and 400+ contributors.
8+
9+
## Table of Contents
10+
11+
1. [Introduction](#introduction)
12+
2. [Prerequisites](#prerequisites)
13+
3. [Architecture](#architecture)
14+
4. [Setup and Installation](#setup-and-installation)
15+
5. [Usage](#usage)
16+
6. [How It Works](#how-it-works)
17+
7. [Conclusions](#conclusions)
18+
19+
## Introduction
20+
21+
This project combines two powerful frameworks:
22+
- **Pathway** continuously indexes documents in real-time using its streaming engine, serving them through a VectorStoreServer REST API
23+
- **AG2** orchestrates multiple AI agents (Researcher + Analyst) that query Pathway's index as a tool during their conversation
24+
25+
The key advantage: Pathway re-indexes documents automatically whenever they change, so AG2 agents always query the **latest** version of the knowledge base — no manual re-indexing required.
26+
27+
## Prerequisites
28+
29+
- Python >= 3.10
30+
- OpenAI API key
31+
32+
## Architecture
33+
34+
```
35+
Documents (live folder) --> Pathway VectorStoreServer (real-time indexing)
36+
|
37+
REST API /v1/retrieve
38+
|
39+
User Query --> AG2 UserProxy --> GroupChat [Researcher + Analyst]
40+
|
41+
search_documents tool --> HTTP POST --> Pathway
42+
|
43+
Grounded, real-time answers with citations
44+
```
45+
46+
## Setup and Installation
47+
48+
1. **Clone the Repository**:
49+
```bash
50+
git clone https://github.com/pathwaycom/pathway.git
51+
cd pathway/examples/projects/ag2-multiagent-rag/
52+
```
53+
54+
2. **Install Dependencies**:
55+
```bash
56+
pip install -U pathway "ag2[openai]>=0.11.4,<1.0" requests python-dotenv
57+
```
58+
59+
3. **Environment Variables**:
60+
Create a `.env` file in the project root and add your OpenAI API key:
61+
```
62+
OPENAI_API_KEY=your_openai_api_key_here
63+
```
64+
65+
4. **Add Documents**:
66+
Place your documents (TXT, MD, PDF) in the `./data/` directory. A sample document is included for testing.
67+
68+
## Usage
69+
70+
1. **Run the Pipeline**:
71+
```bash
72+
python main.py
73+
```
74+
75+
2. The script will:
76+
- Start a Pathway VectorStoreServer that indexes documents in `./data/`
77+
- Launch AG2 agents that query the server for information
78+
- Print the multi-agent conversation with grounded answers
79+
80+
3. **Add documents while running** — Pathway re-indexes automatically.
81+
82+
## How It Works
83+
84+
- **Pathway** reads documents from `./data/`, chunks them with `TokenCountSplitter`, embeds them with OpenAI embeddings, and serves the index via HTTP
85+
- **AG2 Researcher agent** queries Pathway's `/v1/retrieve` endpoint via the `search_documents` tool to retrieve relevant chunks
86+
- **AG2 Analyst agent** synthesizes retrieved information into a comprehensive answer
87+
- The agents communicate via AG2's `GroupChat`, coordinated by a `GroupChatManager`
88+
89+
## Conclusions
90+
91+
This example shows how Pathway's real-time document indexing complements AG2's multi-agent orchestration. The combination is especially useful for scenarios where documents change frequently and agents need access to the latest information — such as live knowledge bases, continuously updated reports, or streaming data pipelines.
92+
93+
You can find more ready-to-run pipelines in our [templates section](/developers/templates?tab=ai-pipelines).
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Pathway + AG2 Sample Document
2+
3+
This is a sample document for testing the AG2 multi-agent RAG example with Pathway.
4+
5+
## About Pathway
6+
7+
Pathway is a Python ETL framework for stream processing, real-time analytics,
8+
LLM pipelines, and RAG. It uses a high-performance Rust engine under the hood,
9+
providing multithreading and multiprocessing capabilities.
10+
11+
Key features of Pathway include:
12+
- Real-time document indexing and re-indexing
13+
- Support for various data sources (Kafka, S3, GDrive, PostgreSQL, and more)
14+
- Built-in LLM integration with the xpacks.llm module
15+
- VectorStoreServer for serving document embeddings via REST API
16+
- Automatic handling of document updates without manual re-indexing
17+
18+
## About AG2
19+
20+
AG2 (formerly AutoGen) is a multi-agent conversation framework that enables
21+
multiple AI agents to collaborate on complex tasks. It has 500K+ monthly PyPI
22+
downloads, 4,300+ GitHub stars, and 400+ contributors.
23+
24+
Key features of AG2 include:
25+
- Multi-agent conversations with GroupChat
26+
- Tool registration for agents via decorator pattern
27+
- Support for various LLM providers
28+
- Flexible agent orchestration and conversation management
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
# Copyright © 2026 Pathway
2+
3+
"""
4+
AG2 Multi-Agent Conversations with Pathway Real-Time RAG
5+
=========================================================
6+
7+
Demonstrates AG2 (formerly AutoGen) multi-agent conversations using
8+
Pathway's real-time VectorStoreServer as the knowledge retrieval backend.
9+
10+
Pathway continuously indexes documents and serves them via a REST API.
11+
AG2 agents query this API as a tool during multi-agent conversations.
12+
13+
Requirements:
14+
pip install -U pathway "ag2[openai]>=0.11.4,<1.0" requests python-dotenv
15+
16+
Environment variables:
17+
OPENAI_API_KEY - OpenAI API key (used by both Pathway and AG2)
18+
19+
Usage:
20+
1. Place documents in ./data/ folder
21+
2. Run: python main.py
22+
"""
23+
24+
import os
25+
import sys
26+
import threading
27+
import time
28+
29+
import requests
30+
from autogen import (
31+
AssistantAgent,
32+
GroupChat,
33+
GroupChatManager,
34+
LLMConfig,
35+
UserProxyAgent,
36+
)
37+
from dotenv import load_dotenv
38+
39+
import pathway as pw
40+
from pathway.xpacks.llm.embedders import OpenAIEmbedder
41+
from pathway.xpacks.llm.splitters import TokenCountSplitter
42+
from pathway.xpacks.llm.vector_store import VectorStoreServer
43+
44+
# To use advanced features with Pathway Scale, get your free license key from
45+
# https://pathway.com/features and paste it below.
46+
# To use Pathway Community, comment out the line below.
47+
pw.set_license_key("demo-license-key-with-telemetry")
48+
49+
load_dotenv()
50+
51+
PATHWAY_HOST = "127.0.0.1"
52+
PATHWAY_PORT = 8765
53+
DATA_DIR = "./data"
54+
55+
56+
def start_pathway_server():
57+
"""Start Pathway VectorStoreServer in a background thread."""
58+
documents = pw.io.fs.read(
59+
DATA_DIR,
60+
format="binary",
61+
mode="streaming",
62+
with_metadata=True,
63+
)
64+
65+
embedder = OpenAIEmbedder(model="text-embedding-3-small")
66+
splitter = TokenCountSplitter(max_tokens=400)
67+
68+
server = VectorStoreServer(
69+
documents,
70+
embedder=embedder,
71+
splitter=splitter,
72+
)
73+
74+
server.run_server(
75+
host=PATHWAY_HOST,
76+
port=PATHWAY_PORT,
77+
threaded=False,
78+
with_cache=True,
79+
)
80+
81+
82+
def query_pathway_server(query: str, k: int = 5) -> str:
83+
"""Query the Pathway VectorStoreServer via HTTP.
84+
85+
The /v1/retrieve endpoint returns a JSON list of objects:
86+
[{"text": "...", "metadata": {...}, "dist": float}, ...]
87+
sorted by dist (lower = more similar).
88+
"""
89+
url = f"http://{PATHWAY_HOST}:{PATHWAY_PORT}/v1/retrieve"
90+
payload = {"query": query, "k": k}
91+
92+
try:
93+
response = requests.post(
94+
url,
95+
json=payload,
96+
timeout=30,
97+
)
98+
response.raise_for_status()
99+
results = response.json()
100+
101+
if not results:
102+
return "No relevant documents found."
103+
104+
formatted = []
105+
for i, result in enumerate(results, 1):
106+
text = result.get("text", "")
107+
metadata = result.get("metadata", {})
108+
source = metadata.get("path", "Unknown source")
109+
dist = result.get("dist", "N/A")
110+
formatted.append(f"[{i}] Source: {source} (distance: {dist})\n{text}")
111+
112+
return "\n\n---\n\n".join(formatted)
113+
114+
except requests.exceptions.ConnectionError:
115+
return "Error: Pathway server is not running or not ready yet."
116+
except Exception as e:
117+
return f"Error querying Pathway: {e}"
118+
119+
120+
def main():
121+
"""Run AG2 multi-agent RAG with Pathway real-time indexing."""
122+
123+
# Validate environment
124+
if not os.environ.get("OPENAI_API_KEY"):
125+
print("Error: OPENAI_API_KEY environment variable is not set.")
126+
print("Set it in your shell or create a .env file.")
127+
sys.exit(1)
128+
129+
# Validate data directory
130+
if not os.path.exists(DATA_DIR):
131+
os.makedirs(DATA_DIR)
132+
print(f"Created {DATA_DIR}/ directory.")
133+
print("Please add documents (TXT, MD, PDF) and re-run.")
134+
sys.exit(1)
135+
136+
if not any(f for f in os.listdir(DATA_DIR) if not f.startswith(".")):
137+
print(f"No documents found in {DATA_DIR}/")
138+
print("Please add documents (TXT, MD, PDF) and re-run.")
139+
sys.exit(1)
140+
141+
# Start Pathway server in background thread
142+
print("Starting Pathway VectorStoreServer...")
143+
server_thread = threading.Thread(target=start_pathway_server, daemon=True)
144+
server_thread.start()
145+
146+
# Wait for server to be ready
147+
print("Waiting for Pathway server to initialize...")
148+
for attempt in range(60):
149+
try:
150+
resp = requests.post(
151+
f"http://{PATHWAY_HOST}:{PATHWAY_PORT}/v1/statistics",
152+
json={},
153+
timeout=10,
154+
)
155+
if resp.status_code == 200:
156+
stats = resp.json()
157+
print(
158+
f"Pathway server is ready! Indexed files: {stats.get('file_count', 'N/A')}"
159+
)
160+
break
161+
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
162+
time.sleep(2)
163+
else:
164+
print("Warning: Pathway server may not be fully ready.")
165+
166+
# AG2 LLM Configuration
167+
llm_config = LLMConfig(
168+
{
169+
"model": "gpt-4o-mini",
170+
"api_key": os.environ["OPENAI_API_KEY"],
171+
"api_type": "openai",
172+
}
173+
)
174+
175+
# Create AG2 Agents
176+
researcher = AssistantAgent(
177+
name="researcher",
178+
system_message=(
179+
"You are a research agent. When asked a question, use the "
180+
"search_documents tool to retrieve relevant information from "
181+
"the knowledge base. Present your findings clearly with source "
182+
"references. If initial results are insufficient, try different "
183+
"search queries."
184+
),
185+
llm_config=llm_config,
186+
)
187+
188+
analyst = AssistantAgent(
189+
name="analyst",
190+
system_message=(
191+
"You are an analyst. Based on the researcher's findings, "
192+
"synthesize the information into a comprehensive, well-structured "
193+
"answer. Always reference the source documents. If information is "
194+
"insufficient, ask the researcher to search with different terms. "
195+
"End with TERMINATE when the answer is complete."
196+
),
197+
llm_config=llm_config,
198+
)
199+
200+
user_proxy = UserProxyAgent(
201+
name="user_proxy",
202+
human_input_mode="NEVER",
203+
max_consecutive_auto_reply=10,
204+
code_execution_config=False,
205+
is_termination_msg=lambda x: x.get("content", "")
206+
and "TERMINATE" in x.get("content", ""),
207+
)
208+
209+
# Register Pathway search as AG2 tool
210+
@user_proxy.register_for_execution()
211+
@researcher.register_for_llm(
212+
description=(
213+
"Search the document knowledge base powered by Pathway. "
214+
"Returns relevant document chunks with source citations. "
215+
"The knowledge base is continuously updated in real-time. "
216+
"Use specific, targeted search queries for best results."
217+
)
218+
)
219+
def search_documents(query: str, top_k: int = 5) -> str:
220+
"""Search Pathway VectorStoreServer for relevant document chunks.
221+
222+
Args:
223+
query: The search query string.
224+
top_k: Number of results to return (default: 5).
225+
226+
Returns:
227+
Formatted string with retrieved document chunks and sources.
228+
"""
229+
return query_pathway_server(query, k=top_k)
230+
231+
# Set up GroupChat
232+
group_chat = GroupChat(
233+
agents=[user_proxy, researcher, analyst],
234+
messages=[],
235+
max_round=12,
236+
)
237+
238+
manager = GroupChatManager(
239+
groupchat=group_chat,
240+
llm_config=llm_config,
241+
)
242+
243+
# Run the conversation
244+
query = (
245+
"What are the key topics and insights described in the documents? "
246+
"Provide a comprehensive summary with citations."
247+
)
248+
249+
print(f"\n{'=' * 60}")
250+
print("AG2 Multi-Agent RAG with Pathway Real-Time Indexing")
251+
print(f"{'=' * 60}")
252+
print(f"Query: {query}\n")
253+
254+
user_proxy.run(manager, message=query).process()
255+
256+
print(f"\n{'=' * 60}")
257+
print("Conversation complete.")
258+
print(f"{'=' * 60}")
259+
260+
261+
if __name__ == "__main__":
262+
main()
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
pathway
2+
ag2[openai]>=0.11.4,<1.0
3+
requests
4+
python-dotenv

0 commit comments

Comments
 (0)