-
-
Notifications
You must be signed in to change notification settings - Fork 939
Expand file tree
/
Copy pathorchestrator-workers.py
More file actions
109 lines (95 loc) · 2.67 KB
/
orchestrator-workers.py
File metadata and controls
109 lines (95 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from praisonaiagents import Agent, Task, AgentTeam
import time
def get_time_check():
current_time = int(time.time())
if current_time % 3 == 0:
result = 1
elif current_time % 3 == 1:
result = 2
else:
result = 3
print(f"Time: {current_time}, Result: {result}")
return result
# Create orchestrator and worker agents
router = Agent(
name="Router",
role="Task router",
goal="Distribute tasks to based on response from get_time_check",
tools=[get_time_check]
)
worker1 = Agent(
name="Worker 1",
role="Specialized worker",
goal="Handle specific subtask type 1",
)
worker2 = Agent(
name="Worker 2",
role="Specialized worker",
goal="Handle specific subtask type 2",
)
worker3 = Agent(
name="Worker 3",
role="Specialized worker",
goal="Handle specific subtask type 3",
)
synthesizer = Agent(
name="Synthesizer",
role="Result synthesizer",
goal="Combine and synthesize worker outputs",
)
# Create orchestrated workflow tasks
router_task = Task(
name="route_task",
description="Analyze input from get_time_check and route to appropriate workers",
expected_output="Task routing decision, 1 , 2 or 3",
agent=router,
is_start=True,
task_type="decision",
next_tasks=["worker1_task", "worker2_task", "worker3_task"],
condition={
"1": ["worker1_task"],
"2": ["worker2_task"],
"3": ["worker3_task"]
}
)
worker1_task = Task(
name="worker1_task",
description="Process type 1 operation",
expected_output="Worker 1 result",
agent=worker1,
next_tasks=["synthesize_task"]
)
worker2_task = Task(
name="worker2_task",
description="Process type 2 operation",
expected_output="Worker 2 result",
agent=worker2,
next_tasks=["synthesize_task"]
)
worker3_task = Task(
name="worker3_task",
description="Process type 3 operation",
expected_output="Worker 3 result",
agent=worker3,
next_tasks=["synthesize_task"]
)
synthesize_task = Task(
name="synthesize_task",
description="Synthesize worker results into final output",
expected_output="Final synthesized result",
agent=synthesizer,
context=[worker1_task, worker2_task, worker3_task]
)
# Create workflow manager
workflow = AgentTeam(
agents=[router, worker1, worker2, worker3, synthesizer],
tasks=[router_task, worker1_task, worker2_task, worker3_task, synthesize_task],
process="workflow", output="verbose"
)
# Run orchestrated workflow
results = workflow.start()
# Print results
print("\nOrchestrator-Workers Results:")
for task_id, result in results["task_results"].items():
if result:
print(f"Task {task_id}: {result.raw}")