-
-
Notifications
You must be signed in to change notification settings - Fork 7.2k
Expand file tree
/
Copy pathConflictBasedSearch.py
More file actions
208 lines (168 loc) · 9.9 KB
/
ConflictBasedSearch.py
File metadata and controls
208 lines (168 loc) · 9.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
Conflict Based Search generates paths in 2 dimensions (x, y, time) for a set of agents. It does
so by performing searches on two levels. The top level search applies constraints that agents
must avoid, and the bottom level performs a single-agent search for individual agents given
the constraints provided by the top level search. Initially, paths are generated for each agent
with no constraints. The paths are checked for conflicts with one another. If any are found, the
top level search generates two nodes. These nodes apply a constraint at the point of conflict for
one of the two agents in conflict. This process repeats until a set of paths are found where no
agents are in conflict. The top level search chooses successor nodes based on the sum of the
cost of all paths, which guarantees optimiality of the final solution.
The full algorithm is defined in this paper: https://cdn.aaai.org/ojs/8140/8140-13-11667-1-2-20201228.pdf
"""
from copy import deepcopy
from enum import Enum
import numpy as np
from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import (
Grid,
ObstacleArrangement,
Position,
)
from typing import Optional
from PathPlanning.TimeBasedPathPlanning.BaseClasses import MultiAgentPlanner, StartAndGoal
from PathPlanning.TimeBasedPathPlanning.Node import NodePath
from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner
from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner
from PathPlanning.TimeBasedPathPlanning.SpaceTimeAStar import SpaceTimeAStar
from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePaths
from PathPlanning.TimeBasedPathPlanning.ConstraintTree import AgentId, AppliedConstraint, ConstraintTree, ConstraintTreeNode, ForkingConstraint
import time
class ConflictBasedSearch(MultiAgentPlanner):
@staticmethod
def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> tuple[list[StartAndGoal], list[NodePath]]:
"""
Generate a path from the start to the goal for each agent in the `start_and_goals` list.
Returns the re-ordered StartAndGoal combinations, and a list of path plans. The order of the plans
corresponds to the order of the `start_and_goals` list.
"""
print(f"Using single-agent planner: {single_agent_planner_class}")
initial_solution: dict[AgentId, NodePath] = {}
# Generate initial solution (no constraints)
for start_and_goal in start_and_goals:
path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, start_and_goal.agent_id, verbose)
initial_solution[AgentId(start_and_goal.agent_id)] = path
if verbose:
print("Initial solution:")
for (agent_idx, path) in initial_solution.items():
print(f"\nAgent {agent_idx} path:\n {path}")
constraint_tree = ConstraintTree(initial_solution)
attempted_constraint_combos = set()
while constraint_tree.nodes_to_expand:
constraint_tree_node = constraint_tree.get_next_node_to_expand()
ancestor_constraints = constraint_tree.get_ancestor_constraints(constraint_tree_node.parent_idx)
if verbose:
print(f"Expanding node with constraint {constraint_tree_node.constraint} and parent {constraint_tree_node.parent_idx}")
print(f"\tCOST: {constraint_tree_node.cost}")
if constraint_tree_node is None:
raise RuntimeError("No more nodes to expand in the constraint tree.")
if not constraint_tree_node.constraint:
# This means we found a solution!
print(f"\nFound a path with constraints after {constraint_tree.expanded_node_count()} expansions")
print(f"Final cost: {constraint_tree_node.cost}")
print(f"Number of constraints on solution: {len(constraint_tree_node.all_constraints)}")
return (start_and_goals, [constraint_tree_node.paths[start_and_goal.agent_id] for start_and_goal in start_and_goals])
if not isinstance(constraint_tree_node.constraint, ForkingConstraint):
raise ValueError(f"Expected a ForkingConstraint, but got: {constraint_tree_node.constraint}")
for constrained_agent in constraint_tree_node.constraint.constrained_agents:
applied_constraint = AppliedConstraint(constrained_agent.constraint, constrained_agent.agent)
all_constraints = ancestor_constraints
all_constraints.append(applied_constraint)
new_path = ConflictBasedSearch.plan_for_agent(constrained_agent, all_constraints, constraint_tree, attempted_constraint_combos, grid, single_agent_planner_class, start_and_goals, verbose)
if not new_path:
continue
# Deepcopy to update with applied constraint and new paths
applied_constraint_parent = deepcopy(constraint_tree_node)
# Copy paths for child node - we just need to update constrained agent's path
applied_constraint_parent.paths[constrained_agent.agent] = new_path
if verbose:
for (agent_idx, path) in applied_constraint_parent.paths.items():
print(f"\nAgent {agent_idx} path:\n {path}")
applied_constraint_parent.constraint = applied_constraint
parent_idx = constraint_tree.add_expanded_node(applied_constraint_parent)
new_constraint_tree_node = ConstraintTreeNode(applied_constraint_parent.paths, parent_idx, all_constraints)
if verbose:
print(f"Adding new constraint tree node with constraint: {new_constraint_tree_node.constraint}")
constraint_tree.add_node_to_tree(new_constraint_tree_node)
raise RuntimeError("No solution found")
def get_agents_start_and_goal(start_and_goal_list: list[StartAndGoal], target_index: AgentId) -> StartAndGoal:
for item in start_and_goal_list:
if item.agent_id == target_index:
return item
raise RuntimeError(f"Could not find agent with index {target_index} in {start_and_goal_list}")
def plan_for_agent(constrained_agent: ConstraintTreeNode,
all_constraints: list[AppliedConstraint],
constraint_tree: ConstraintTree,
attempted_constraint_combos: set,
grid: Grid,
single_agent_planner_class: SingleAgentPlanner,
start_and_goals: list[StartAndGoal],
verbose: False) -> Optional[tuple[list[StartAndGoal], list[NodePath]]]:
num_expansions = constraint_tree.expanded_node_count()
if num_expansions % 50 == 0:
print(f"Expanded {num_expansions} nodes so far...")
print(f"\tNumber of constraints on expanded node: {len(all_constraints)}")
# Skip if we have already tried this set of constraints
constraint_hash = hash(frozenset(all_constraints))
if constraint_hash in attempted_constraint_combos:
if verbose:
print(f"\tSkipping already attempted constraint combination: {all_constraints}")
return None
else:
attempted_constraint_combos.add(constraint_hash)
if verbose:
print(f"\tall constraints: {all_constraints}")
grid.clear_constraint_points()
grid.apply_constraint_points(all_constraints)
# Just plan for agent with new constraint
start_and_goal: StartAndGoal = ConflictBasedSearch.get_agents_start_and_goal(start_and_goals, constrained_agent.agent)
try:
if verbose:
print("\tplanning for: {}", start_and_goal)
new_path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, start_and_goal.agent_id, verbose)
return new_path
except Exception as e:
if verbose:
print(f"Error: {e}")
return None
class Scenario(Enum):
# Five robots all trying to get through a single cell choke point to reach their goals
NARROW_CORRIDOR = 0
# Three robots in a narrow hallway that requires intelligent conflict resolution
HALLWAY_CROSS = 1
scenario = Scenario.HALLWAY_CROSS
verbose = False
show_animation = True
np.random.seed(42) # For reproducibility
def main():
grid_side_length = 21
# Default: no obstacles
obstacle_arrangement = ObstacleArrangement.NONE
start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(10)]
if scenario == Scenario.NARROW_CORRIDOR:
obstacle_arrangement=ObstacleArrangement.NARROW_CORRIDOR
start_and_goals = [StartAndGoal(i, Position(1, 8+i), Position(19, 19-i)) for i in range(5)]
elif scenario == Scenario.HALLWAY_CROSS:
obstacle_arrangement=ObstacleArrangement.HALLWAY
start_and_goals = [StartAndGoal(0, Position(6, 10), Position(13, 10)),
StartAndGoal(1, Position(11, 10), Position(6, 10)),
StartAndGoal(2, Position(13, 10), Position(7, 10))]
grid = Grid(
np.array([grid_side_length, grid_side_length]),
num_obstacles=250,
obstacle_avoid_points=[pos for item in start_and_goals for pos in (item.start, item.goal)],
obstacle_arrangement=obstacle_arrangement,
)
start_time = time.time()
start_and_goals, paths = ConflictBasedSearch.plan(grid, start_and_goals, SafeIntervalPathPlanner, verbose)
# start_and_goals, paths = ConflictBasedSearch.plan(grid, start_and_goals, SpaceTimeAStar, verbose)
runtime = time.time() - start_time
print(f"\nPlanning took: {runtime:.5f} seconds")
if verbose:
print(f"Paths:")
for path in paths:
print(f"{path}\n")
if not show_animation:
return
PlotNodePaths(grid, start_and_goals, paths)
if __name__ == "__main__":
main()