Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
graph_workflow_serialization.py

Demonstrates the new serialization / deserialization API added to GraphWorkflow:

workflow.to_spec() -> lightweight topology dict (no agent objects)
workflow.save_spec(path) -> write that dict to a JSON file
GraphWorkflow.from_topology_spec(spec, registry) -> rebuild from dict + agents

Round-trip:
1. Build a workflow programmatically.
2. Save its topology to "workflow_spec.json".
3. Reconstruct an identical workflow from that file + the same agent objects.
4. Run both workflows on the same task and compare outputs.
"""

import json
import os

from swarms.structs.agent import Agent
from swarms.structs.graph_workflow import GraphWorkflow

# ---------------------------------------------------------------------------
# 1. Build agents
# ---------------------------------------------------------------------------
researcher = Agent(
agent_name="Researcher",
model_name="claude-sonnet-4-5",
agent_description="Gathers and summarises relevant information on the topic.",
max_loops=1,
temperature=1,
)

analyst = Agent(
agent_name="Analyst",
model_name="claude-sonnet-4-5",
agent_description="Analyses the researcher's findings and draws insights.",
max_loops=1,
temperature=1,
)

writer = Agent(
agent_name="Writer",
model_name="claude-sonnet-4-5",
agent_description="Turns the analyst's insights into a clear, concise report.",
max_loops=1,
temperature=1,
)

# ---------------------------------------------------------------------------
# 2. Build the original workflow: Researcher -> Analyst -> Writer
# ---------------------------------------------------------------------------
original = GraphWorkflow(
name="Research Pipeline",
description="A three-stage research, analysis, and writing pipeline.",
max_loops=1,
verbose=False,
)

original.add_nodes([researcher, analyst, writer])
original.add_edge("Researcher", "Analyst")
original.add_edge("Analyst", "Writer")
original.compile()

# ---------------------------------------------------------------------------
# 3. Serialize the topology to a JSON file (no agent objects needed)
# ---------------------------------------------------------------------------
spec_path = os.path.join(
os.path.dirname(__file__), "workflow_spec.json"
)
original.save_spec(spec_path)
print(f"Spec saved to: {spec_path}")

# Inspect what was saved
with open(spec_path) as f:
saved = json.load(f)
print("\n--- Saved spec ---")
print(json.dumps(saved, indent=2))

# ---------------------------------------------------------------------------
# 4. Reconstruct the workflow from the spec file + an agent registry
# ---------------------------------------------------------------------------
# The registry maps agent_name strings to live Agent objects.
# In a real scenario these could be freshly constructed from config.
agent_registry = {
"Researcher": researcher,
"Analyst": analyst,
"Writer": writer,
}

with open(spec_path) as f:
spec = json.load(f)

reconstructed = GraphWorkflow.from_topology_spec(
spec,
agent_registry,
verbose=False,
)
reconstructed.compile()

print("\n--- Reconstructed workflow nodes ---")
for node_id in reconstructed.nodes:
print(f" {node_id}")

print("\n--- Reconstructed workflow edges ---")
for edge in reconstructed.edges:
print(f" {edge.source} -> {edge.target}")

# ---------------------------------------------------------------------------
# 5. Run both workflows and show results
# ---------------------------------------------------------------------------
TASK = "Summarise the main benefits of multi-agent AI systems in three bullet points."

print("\n--- Running original workflow ---")
original_result = original.run(TASK)
print(original_result)

print("\n--- Running reconstructed workflow ---")
reconstructed_result = reconstructed.run(TASK)
print(reconstructed_result)
149 changes: 149 additions & 0 deletions swarms/structs/graph_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2301,6 +2301,155 @@ def visualize_simple(self) -> str:
)
raise e

def to_spec(self) -> Dict[str, Any]:
"""
Serialize the workflow topology to a lightweight plain-dict spec.

Unlike ``to_json()``, this method does **not** attempt to serialize the
Agent objects themselves — it only records each agent's ``agent_name``
so that the spec can be version-controlled, diffed, and shared without
requiring agent implementation details.

Returns:
Dict[str, Any]: A dictionary containing:
- ``name``, ``description``, ``max_loops`` — workflow metadata.
- ``nodes`` — list of ``{"id": ..., "agent_name": ...}`` dicts.
- ``edges`` — list of ``{"source": ..., "target": ..., "metadata": ...}`` dicts.
- ``entry_points`` — list of entry-point node IDs.
- ``end_points`` — list of end-point node IDs.

Example::

spec = workflow.to_spec()
# version-control or share `spec`
reconstructed = GraphWorkflow.from_topology_spec(spec, agent_registry)
"""
return {
"name": self.name,
"description": self.description,
"max_loops": self.max_loops,
"nodes": [
{
"id": node_id,
"agent_name": getattr(
node.agent, "agent_name", node_id
),
"metadata": node.metadata,
}
for node_id, node in self.nodes.items()
],
"edges": [
{
"source": e.source,
"target": e.target,
"metadata": e.metadata,
}
for e in self.edges
],
"entry_points": list(self.entry_points),
"end_points": list(self.end_points),
}

def save_spec(self, path: str) -> None:
"""
Save the workflow topology spec produced by :meth:`to_spec` to a JSON file.

This is the recommended way to persist a workflow definition for
version control, sharing, or later reconstruction via
:meth:`from_topology_spec`.

Args:
path (str): Filesystem path to write the JSON file to.

Example::

workflow.save_spec("my_workflow.json")
"""
with open(path, "w") as f:
json.dump(self.to_spec(), f, indent=2)
if self.verbose:
logger.info(f"Workflow spec saved to {path}")

@classmethod
def from_topology_spec(
cls,
spec: Dict[str, Any],
agent_registry: Dict[str, "Agent"],
**kwargs: Any,
) -> "GraphWorkflow":
"""
Reconstruct a :class:`GraphWorkflow` from a topology spec and an agent registry.

This is the counterpart to :meth:`to_spec` / :meth:`save_spec`. The
spec describes *which* agents exist and how they are connected; the
registry supplies the live ``Agent`` objects that implement each node.

Args:
spec (Dict[str, Any]): A topology spec as returned by :meth:`to_spec`
or loaded from a file written by :meth:`save_spec`.
agent_registry (Dict[str, Agent]): Mapping from ``agent_name`` to
the corresponding ``Agent`` instance. Every agent referenced in
``spec["nodes"]`` must appear in the registry.
**kwargs: Additional keyword arguments forwarded to the
:class:`GraphWorkflow` constructor (e.g. ``verbose``, ``backend``).

Returns:
GraphWorkflow: A fully initialised workflow with the topology
described by *spec* and agents resolved from *agent_registry*.

Raises:
KeyError: If a node's ``agent_name`` is not found in *agent_registry*.
ValueError: If *spec* is missing required keys.

Example::

with open("my_workflow.json") as f:
spec = json.load(f)

registry = {"Researcher": researcher_agent, "Writer": writer_agent}
workflow = GraphWorkflow.from_topology_spec(spec, registry)
workflow.run("Write a report on AI trends")
"""
missing = [
n["agent_name"]
for n in spec.get("nodes", [])
if n["agent_name"] not in agent_registry
]
if missing:
raise KeyError(
f"The following agent names are referenced in the spec but not "
f"found in agent_registry: {missing}"
)

nodes = {
n["id"]: Node(
id=n["id"],
agent=agent_registry[n["agent_name"]],
metadata=n.get("metadata") or {},
)
for n in spec.get("nodes", [])
}

edges = [
Edge(
source=e["source"],
target=e["target"],
metadata=e.get("metadata") or {},
)
for e in spec.get("edges", [])
]

return cls(
name=spec.get("name", "Loaded-Workflow"),
description=spec.get("description", ""),
max_loops=spec.get("max_loops", 1),
nodes=nodes,
edges=edges,
entry_points=spec.get("entry_points") or [],
end_points=spec.get("end_points") or [],
**kwargs,
)

def to_json(
self,
fast: bool = True,
Expand Down
Loading