From 849c47d8b874d4b01db43dd682a1f839a2481b31 Mon Sep 17 00:00:00 2001 From: adichaudhary Date: Mon, 23 Mar 2026 18:59:13 -0400 Subject: [PATCH 1/3] add topology serialization and deserialization support --- .../graph_workflow_serialization.py | 120 ++++++++++++++ swarms/structs/graph_workflow.py | 149 ++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 examples/multi_agent/graphworkflow_examples/graph_workflow_serialization.py diff --git a/examples/multi_agent/graphworkflow_examples/graph_workflow_serialization.py b/examples/multi_agent/graphworkflow_examples/graph_workflow_serialization.py new file mode 100644 index 000000000..5993e96db --- /dev/null +++ b/examples/multi_agent/graphworkflow_examples/graph_workflow_serialization.py @@ -0,0 +1,120 @@ +""" +graph_workflow_serialization.py + +Demonstrates the new serialization / deserialization API added to GraphWorkflow: + + workflow.to_spec() -> lightweight topology dict (no agent objects) + workflow.save_spec(path) -> write that dict to a JSON file + GraphWorkflow.from_topology_spec(spec, registry) -> rebuild from dict + agents + +Round-trip: + 1. Build a workflow programmatically. + 2. Save its topology to "workflow_spec.json". + 3. Reconstruct an identical workflow from that file + the same agent objects. + 4. Run both workflows on the same task and compare outputs. +""" + +import json +import os + +from swarms.structs.agent import Agent +from swarms.structs.graph_workflow import GraphWorkflow + +# --------------------------------------------------------------------------- +# 1. Build agents +# --------------------------------------------------------------------------- +researcher = Agent( + agent_name="Researcher", + model_name="claude-sonnet-4-5", + agent_description="Gathers and summarises relevant information on the topic.", + max_loops=1, + temperature=1, +) + +analyst = Agent( + agent_name="Analyst", + model_name="claude-sonnet-4-5", + agent_description="Analyses the researcher's findings and draws insights.", + max_loops=1, + temperature=1, +) + +writer = Agent( + agent_name="Writer", + model_name="claude-sonnet-4-5", + agent_description="Turns the analyst's insights into a clear, concise report.", + max_loops=1, + temperature=1, +) + +# --------------------------------------------------------------------------- +# 2. Build the original workflow: Researcher -> Analyst -> Writer +# --------------------------------------------------------------------------- +original = GraphWorkflow( + name="Research Pipeline", + description="A three-stage research, analysis, and writing pipeline.", + max_loops=1, + verbose=False, +) + +original.add_nodes([researcher, analyst, writer]) +original.add_edge("Researcher", "Analyst") +original.add_edge("Analyst", "Writer") +original.compile() + +# --------------------------------------------------------------------------- +# 3. Serialize the topology to a JSON file (no agent objects needed) +# --------------------------------------------------------------------------- +spec_path = os.path.join( + os.path.dirname(__file__), "workflow_spec.json" +) +original.save_spec(spec_path) +print(f"Spec saved to: {spec_path}") + +# Inspect what was saved +with open(spec_path) as f: + saved = json.load(f) +print("\n--- Saved spec ---") +print(json.dumps(saved, indent=2)) + +# --------------------------------------------------------------------------- +# 4. Reconstruct the workflow from the spec file + an agent registry +# --------------------------------------------------------------------------- +# The registry maps agent_name strings to live Agent objects. +# In a real scenario these could be freshly constructed from config. +agent_registry = { + "Researcher": researcher, + "Analyst": analyst, + "Writer": writer, +} + +with open(spec_path) as f: + spec = json.load(f) + +reconstructed = GraphWorkflow.from_topology_spec( + spec, + agent_registry, + verbose=False, +) +reconstructed.compile() + +print("\n--- Reconstructed workflow nodes ---") +for node_id in reconstructed.nodes: + print(f" {node_id}") + +print("\n--- Reconstructed workflow edges ---") +for edge in reconstructed.edges: + print(f" {edge.source} -> {edge.target}") + +# --------------------------------------------------------------------------- +# 5. Run both workflows and show results +# --------------------------------------------------------------------------- +TASK = "Summarise the main benefits of multi-agent AI systems in three bullet points." + +print("\n--- Running original workflow ---") +original_result = original.run(TASK) +print(original_result) + +print("\n--- Running reconstructed workflow ---") +reconstructed_result = reconstructed.run(TASK) +print(reconstructed_result) diff --git a/swarms/structs/graph_workflow.py b/swarms/structs/graph_workflow.py index f237825d8..c80e27220 100644 --- a/swarms/structs/graph_workflow.py +++ b/swarms/structs/graph_workflow.py @@ -2301,6 +2301,155 @@ def visualize_simple(self) -> str: ) raise e + def to_spec(self) -> Dict[str, Any]: + """ + Serialize the workflow topology to a lightweight plain-dict spec. + + Unlike ``to_json()``, this method does **not** attempt to serialize the + Agent objects themselves — it only records each agent's ``agent_name`` + so that the spec can be version-controlled, diffed, and shared without + requiring agent implementation details. + + Returns: + Dict[str, Any]: A dictionary containing: + - ``name``, ``description``, ``max_loops`` — workflow metadata. + - ``nodes`` — list of ``{"id": ..., "agent_name": ...}`` dicts. + - ``edges`` — list of ``{"source": ..., "target": ..., "metadata": ...}`` dicts. + - ``entry_points`` — list of entry-point node IDs. + - ``end_points`` — list of end-point node IDs. + + Example:: + + spec = workflow.to_spec() + # version-control or share `spec` + reconstructed = GraphWorkflow.from_topology_spec(spec, agent_registry) + """ + return { + "name": self.name, + "description": self.description, + "max_loops": self.max_loops, + "nodes": [ + { + "id": node_id, + "agent_name": getattr( + node.agent, "agent_name", node_id + ), + "metadata": node.metadata, + } + for node_id, node in self.nodes.items() + ], + "edges": [ + { + "source": e.source, + "target": e.target, + "metadata": e.metadata, + } + for e in self.edges + ], + "entry_points": list(self.entry_points), + "end_points": list(self.end_points), + } + + def save_spec(self, path: str) -> None: + """ + Save the workflow topology spec produced by :meth:`to_spec` to a JSON file. + + This is the recommended way to persist a workflow definition for + version control, sharing, or later reconstruction via + :meth:`from_topology_spec`. + + Args: + path (str): Filesystem path to write the JSON file to. + + Example:: + + workflow.save_spec("my_workflow.json") + """ + with open(path, "w") as f: + json.dump(self.to_spec(), f, indent=2) + if self.verbose: + logger.info(f"Workflow spec saved to {path}") + + @classmethod + def from_topology_spec( + cls, + spec: Dict[str, Any], + agent_registry: Dict[str, "Agent"], + **kwargs: Any, + ) -> "GraphWorkflow": + """ + Reconstruct a :class:`GraphWorkflow` from a topology spec and an agent registry. + + This is the counterpart to :meth:`to_spec` / :meth:`save_spec`. The + spec describes *which* agents exist and how they are connected; the + registry supplies the live ``Agent`` objects that implement each node. + + Args: + spec (Dict[str, Any]): A topology spec as returned by :meth:`to_spec` + or loaded from a file written by :meth:`save_spec`. + agent_registry (Dict[str, Agent]): Mapping from ``agent_name`` to + the corresponding ``Agent`` instance. Every agent referenced in + ``spec["nodes"]`` must appear in the registry. + **kwargs: Additional keyword arguments forwarded to the + :class:`GraphWorkflow` constructor (e.g. ``verbose``, ``backend``). + + Returns: + GraphWorkflow: A fully initialised workflow with the topology + described by *spec* and agents resolved from *agent_registry*. + + Raises: + KeyError: If a node's ``agent_name`` is not found in *agent_registry*. + ValueError: If *spec* is missing required keys. + + Example:: + + with open("my_workflow.json") as f: + spec = json.load(f) + + registry = {"Researcher": researcher_agent, "Writer": writer_agent} + workflow = GraphWorkflow.from_topology_spec(spec, registry) + workflow.run("Write a report on AI trends") + """ + missing = [ + n["agent_name"] + for n in spec.get("nodes", []) + if n["agent_name"] not in agent_registry + ] + if missing: + raise KeyError( + f"The following agent names are referenced in the spec but not " + f"found in agent_registry: {missing}" + ) + + nodes = { + n["id"]: Node( + id=n["id"], + agent=agent_registry[n["agent_name"]], + metadata=n.get("metadata") or {}, + ) + for n in spec.get("nodes", []) + } + + edges = [ + Edge( + source=e["source"], + target=e["target"], + metadata=e.get("metadata") or {}, + ) + for e in spec.get("edges", []) + ] + + return cls( + name=spec.get("name", "Loaded-Workflow"), + description=spec.get("description", ""), + max_loops=spec.get("max_loops", 1), + nodes=nodes, + edges=edges, + entry_points=spec.get("entry_points") or [], + end_points=spec.get("end_points") or [], + **kwargs, + ) + def to_json( self, fast: bool = True, From f5fec709c68e772bccaef71082e4a480613e1451 Mon Sep 17 00:00:00 2001 From: Adi Chaudhary <72666358+adichaudhary@users.noreply.github.com> Date: Mon, 23 Mar 2026 19:08:25 -0400 Subject: [PATCH 2/3] Fixed docstring and aligned save_spec() behavior with other methods Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- swarms/structs/graph_workflow.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/swarms/structs/graph_workflow.py b/swarms/structs/graph_workflow.py index c80e27220..a3cc71113 100644 --- a/swarms/structs/graph_workflow.py +++ b/swarms/structs/graph_workflow.py @@ -2313,7 +2313,7 @@ def to_spec(self) -> Dict[str, Any]: Returns: Dict[str, Any]: A dictionary containing: - ``name``, ``description``, ``max_loops`` — workflow metadata. - - ``nodes`` — list of ``{"id": ..., "agent_name": ...}`` dicts. + - ``nodes`` — list of ``{"id": ..., "agent_name": ..., "metadata": ...}`` dicts. - ``edges`` — list of ``{"source": ..., "target": ..., "metadata": ...}`` dicts. - ``entry_points`` — list of entry-point node IDs. - ``end_points`` — list of end-point node IDs. @@ -2365,8 +2365,11 @@ def save_spec(self, path: str) -> None: workflow.save_spec("my_workflow.json") """ - with open(path, "w") as f: - json.dump(self.to_spec(), f, indent=2) + dir_name = os.path.dirname(path) + if dir_name: + os.makedirs(dir_name, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(self.to_spec(), f, indent=2, default=str) if self.verbose: logger.info(f"Workflow spec saved to {path}") From 2ebced2c7523cbaffb3648b00acdc2f3b673cc6e Mon Sep 17 00:00:00 2001 From: adichaudhary Date: Mon, 23 Mar 2026 20:04:07 -0400 Subject: [PATCH 3/3] Applied Copilot changes - deterministic ordering, consistent ValueError, test coverage --- swarms/structs/graph_workflow.py | 47 +++++++-- tests/structs/test_graph_workflow.py | 151 +++++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 9 deletions(-) diff --git a/swarms/structs/graph_workflow.py b/swarms/structs/graph_workflow.py index a3cc71113..bfa97fd2d 100644 --- a/swarms/structs/graph_workflow.py +++ b/swarms/structs/graph_workflow.py @@ -2328,6 +2328,8 @@ def to_spec(self) -> Dict[str, Any]: "name": self.name, "description": self.description, "max_loops": self.max_loops, + # Sorted for deterministic output — two equivalent workflows + # built in different insertion orders produce identical specs. "nodes": [ { "id": node_id, @@ -2336,7 +2338,7 @@ def to_spec(self) -> Dict[str, Any]: ), "metadata": node.metadata, } - for node_id, node in self.nodes.items() + for node_id, node in sorted(self.nodes.items()) ], "edges": [ { @@ -2344,10 +2346,12 @@ def to_spec(self) -> Dict[str, Any]: "target": e.target, "metadata": e.metadata, } - for e in self.edges + for e in sorted( + self.edges, key=lambda e: (e.source, e.target) + ) ], - "entry_points": list(self.entry_points), - "end_points": list(self.end_points), + "entry_points": sorted(self.entry_points), + "end_points": sorted(self.end_points), } def save_spec(self, path: str) -> None: @@ -2401,8 +2405,9 @@ def from_topology_spec( described by *spec* and agents resolved from *agent_registry*. Raises: - KeyError: If a node's ``agent_name`` is not found in *agent_registry*. - ValueError: If *spec* is missing required keys. + ValueError: If *spec* is missing required keys, any node/edge dict + is malformed, or an ``agent_name`` is absent from + *agent_registry*. Example:: @@ -2413,13 +2418,37 @@ def from_topology_spec( workflow = GraphWorkflow.from_topology_spec(spec, registry) workflow.run("Write a report on AI trends") """ + if not isinstance(spec, dict): + raise ValueError( + f"spec must be a dict, got {type(spec).__name__}" + ) + if "nodes" not in spec: + raise ValueError("spec is missing required key 'nodes'") + + # Validate per-node required keys + for i, n in enumerate(spec.get("nodes", [])): + for key in ("id", "agent_name"): + if key not in n: + raise ValueError( + f"Node at index {i} is missing required key '{key}'" + ) + + # Validate per-edge required keys + for i, e in enumerate(spec.get("edges", [])): + for key in ("source", "target"): + if key not in e: + raise ValueError( + f"Edge at index {i} is missing required key '{key}'" + ) + + # Check all referenced agents exist in the registry missing = [ n["agent_name"] - for n in spec.get("nodes", []) + for n in spec["nodes"] if n["agent_name"] not in agent_registry ] if missing: - raise KeyError( + raise ValueError( f"The following agent names are referenced in the spec but not " f"found in agent_registry: {missing}" ) @@ -2430,7 +2459,7 @@ def from_topology_spec( agent=agent_registry[n["agent_name"]], metadata=n.get("metadata") or {}, ) - for n in spec.get("nodes", []) + for n in spec["nodes"] } edges = [ diff --git a/tests/structs/test_graph_workflow.py b/tests/structs/test_graph_workflow.py index 8820620f0..cf6acad90 100644 --- a/tests/structs/test_graph_workflow.py +++ b/tests/structs/test_graph_workflow.py @@ -528,6 +528,157 @@ def test_graph_workflow_rustworkx_agent_objects(): assert result is not None +def test_graph_workflow_to_spec_round_trip(): + """to_spec / from_topology_spec round-trip preserves topology and metadata.""" + a = create_test_agent("Alpha", "First agent") + b = create_test_agent("Beta", "Second agent") + c = create_test_agent("Gamma", "Third agent") + + wf = GraphWorkflow( + name="RoundTrip", description="Test pipeline", max_loops=2 + ) + wf.add_nodes([a, b, c]) + wf.add_edge("Alpha", "Beta", weight=1) + wf.add_edge("Beta", "Gamma", weight=2) + wf.compile() + + spec = wf.to_spec() + + # Top-level fields + assert spec["name"] == "RoundTrip" + assert spec["description"] == "Test pipeline" + assert spec["max_loops"] == 2 + + # Nodes are sorted by id + node_ids = [n["id"] for n in spec["nodes"]] + assert node_ids == sorted(node_ids) + assert set(node_ids) == {"Alpha", "Beta", "Gamma"} + for n in spec["nodes"]: + assert n["agent_name"] == n["id"] + + # Edges are sorted by (source, target) + edge_pairs = [(e["source"], e["target"]) for e in spec["edges"]] + assert edge_pairs == sorted(edge_pairs) + assert ("Alpha", "Beta") in edge_pairs + assert ("Beta", "Gamma") in edge_pairs + + # entry / end points are sorted lists + assert spec["entry_points"] == sorted(spec["entry_points"]) + assert spec["end_points"] == sorted(spec["end_points"]) + + # Reconstruct + registry = {"Alpha": a, "Beta": b, "Gamma": c} + wf2 = GraphWorkflow.from_topology_spec(spec, registry) + + assert set(wf2.nodes.keys()) == {"Alpha", "Beta", "Gamma"} + assert len(wf2.edges) == 2 + assert wf2.name == "RoundTrip" + assert wf2.max_loops == 2 + assert set(wf2.entry_points) == set(wf.entry_points) + assert set(wf2.end_points) == set(wf.end_points) + + # Agent objects are resolved correctly + assert wf2.nodes["Alpha"].agent is a + assert wf2.nodes["Beta"].agent is b + assert wf2.nodes["Gamma"].agent is c + + +def test_graph_workflow_to_spec_deterministic_order(): + """to_spec output is identical regardless of insertion order.""" + a = create_test_agent("Zebra") + b = create_test_agent("Apple") + c = create_test_agent("Mango") + + wf1 = GraphWorkflow(name="Order-Test") + wf1.add_nodes([a, b, c]) + wf1.add_edge("Apple", "Mango") + wf1.add_edge("Mango", "Zebra") + wf1.compile() + + wf2 = GraphWorkflow(name="Order-Test") + wf2.add_nodes([c, a, b]) # different insertion order + wf2.add_edge("Mango", "Zebra") + wf2.add_edge("Apple", "Mango") + wf2.compile() + + assert wf1.to_spec()["nodes"] == wf2.to_spec()["nodes"] + assert wf1.to_spec()["edges"] == wf2.to_spec()["edges"] + + +def test_graph_workflow_to_spec_node_metadata(): + """Node metadata is preserved through the spec round-trip.""" + a = create_test_agent("Alpha") + b = create_test_agent("Beta") + + from swarms.structs.graph_workflow import Node + + wf = GraphWorkflow(name="Meta-Test") + wf.nodes["Alpha"] = Node( + id="Alpha", agent=a, metadata={"role": "lead", "priority": 1} + ) + wf.nodes["Beta"] = Node( + id="Beta", agent=b, metadata={"role": "support"} + ) + wf.add_edge("Alpha", "Beta") + wf.compile() + + spec = wf.to_spec() + alpha_spec = next(n for n in spec["nodes"] if n["id"] == "Alpha") + assert alpha_spec["metadata"] == {"role": "lead", "priority": 1} + + registry = {"Alpha": a, "Beta": b} + wf2 = GraphWorkflow.from_topology_spec(spec, registry) + assert wf2.nodes["Alpha"].metadata == { + "role": "lead", + "priority": 1, + } + assert wf2.nodes["Beta"].metadata == {"role": "support"} + + +def test_graph_workflow_from_topology_spec_missing_agent(): + """from_topology_spec raises ValueError when an agent is absent from the registry.""" + a = create_test_agent("Alpha") + b = create_test_agent("Beta") + + wf = GraphWorkflow(name="Missing-Agent-Test") + wf.add_nodes([a, b]) + wf.add_edge("Alpha", "Beta") + wf.compile() + + spec = wf.to_spec() + + # Registry is missing "Beta" + with pytest.raises(ValueError, match="Beta"): + GraphWorkflow.from_topology_spec(spec, {"Alpha": a}) + + +def test_graph_workflow_from_topology_spec_malformed_node(): + """from_topology_spec raises ValueError when a node dict is missing required keys.""" + spec = { + "nodes": [{"id": "Alpha"}], # missing "agent_name" + "edges": [], + } + with pytest.raises(ValueError, match="agent_name"): + GraphWorkflow.from_topology_spec(spec, {}) + + +def test_graph_workflow_from_topology_spec_malformed_edge(): + """from_topology_spec raises ValueError when an edge dict is missing required keys.""" + a = create_test_agent("Alpha") + spec = { + "nodes": [{"id": "Alpha", "agent_name": "Alpha"}], + "edges": [{"source": "Alpha"}], # missing "target" + } + with pytest.raises(ValueError, match="target"): + GraphWorkflow.from_topology_spec(spec, {"Alpha": a}) + + +def test_graph_workflow_from_topology_spec_not_a_dict(): + """from_topology_spec raises ValueError when spec is not a dict.""" + with pytest.raises(ValueError, match="dict"): + GraphWorkflow.from_topology_spec("not-a-dict", {}) + + def test_graph_workflow_backend_fallback(): """Test backend fallback when rustworkx unavailable""" workflow = GraphWorkflow(