|
1 | 1 | from beanie import PydanticObjectId |
| 2 | +from app.models.executed_models import ExecutedRequestModel, ExecutedResponseModel |
| 3 | + |
2 | 4 | from fastapi import HTTPException, status, BackgroundTasks |
3 | 5 |
|
4 | | -from app.models.executed_models import ExecutedRequestModel, ExecutedResponseModel |
5 | 6 | from app.models.db.state import State |
6 | 7 | from app.models.state_status_enum import StateStatusEnum |
7 | 8 | from app.singletons.logs_manager import LogsManager |
8 | 9 | from app.tasks.create_next_states import create_next_states |
9 | 10 |
|
10 | 11 | logger = LogsManager().get_logger() |
11 | 12 |
|
| 13 | +async def executed_state(namespace_name: str, state_id: PydanticObjectId, body: ExecutedRequestModel, x_exosphere_request_id: str, background_tasks: BackgroundTasks) -> ExecutedResponseModel: |
12 | 14 |
|
13 | | -async def executed_state( |
14 | | - namespace_name: str, |
15 | | - state_id: PydanticObjectId, |
16 | | - body: ExecutedRequestModel, |
17 | | - x_exosphere_request_id: str, |
18 | | - background_tasks: BackgroundTasks, |
19 | | -) -> ExecutedResponseModel: |
20 | 15 | try: |
21 | | - logger.info( |
22 | | - f"Executed state {state_id} for namespace {namespace_name}", |
23 | | - x_exosphere_request_id=x_exosphere_request_id, |
24 | | - ) |
| 16 | + logger.info(f"Executed state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) |
25 | 17 |
|
26 | 18 | state = await State.find_one(State.id == state_id) |
27 | 19 | if not state or not state.id: |
28 | | - raise HTTPException( |
29 | | - status_code=status.HTTP_404_NOT_FOUND, |
30 | | - detail="State not found", |
31 | | - ) |
| 20 | + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found") |
32 | 21 |
|
33 | 22 | if state.status != StateStatusEnum.QUEUED: |
34 | | - raise HTTPException( |
35 | | - status_code=status.HTTP_400_BAD_REQUEST, |
36 | | - detail="State is not queued", |
37 | | - ) |
38 | | - |
39 | | - next_state_ids: list[PydanticObjectId] = [] |
40 | | - |
41 | | - # ---- Handle outputs ---- |
| 23 | + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="State is not queued") |
| 24 | + |
| 25 | + next_state_ids = [] |
42 | 26 | if len(body.outputs) == 0: |
43 | 27 | state.status = StateStatusEnum.EXECUTED |
44 | 28 | state.outputs = {} |
45 | 29 | await state.save() |
46 | 30 |
|
47 | 31 | next_state_ids.append(state.id) |
48 | 32 |
|
49 | | - else: |
50 | | - # First output updates the current state |
| 33 | + else: |
51 | 34 | state.outputs = body.outputs[0] |
52 | 35 | state.status = StateStatusEnum.EXECUTED |
53 | 36 | await state.save() |
54 | | - |
55 | 37 | next_state_ids.append(state.id) |
56 | 38 |
|
57 | | - # Remaining outputs create new states |
58 | 39 | new_states = [] |
59 | 40 | for output in body.outputs[1:]: |
60 | | - new_states.append( |
61 | | - State( |
62 | | - node_name=state.node_name, |
63 | | - namespace_name=state.namespace_name, |
64 | | - identifier=state.identifier, |
65 | | - graph_name=state.graph_name, |
66 | | - run_id=state.run_id, |
67 | | - status=StateStatusEnum.EXECUTED, |
68 | | - inputs=state.inputs, |
69 | | - outputs=output, |
70 | | - error=None, |
71 | | - parents=state.parents, |
72 | | - ) |
73 | | - ) |
74 | | - |
75 | | - if new_states: |
76 | | - inserted_ids = ( |
77 | | - await State.insert_many(new_states) |
78 | | - ).inserted_ids |
| 41 | + new_states.append(State( |
| 42 | + node_name=state.node_name, |
| 43 | + namespace_name=state.namespace_name, |
| 44 | + identifier=state.identifier, |
| 45 | + graph_name=state.graph_name, |
| 46 | + run_id=state.run_id, |
| 47 | + status=StateStatusEnum.EXECUTED, |
| 48 | + inputs=state.inputs, |
| 49 | + outputs=output, |
| 50 | + error=None, |
| 51 | + parents=state.parents |
| 52 | + )) |
| 53 | + |
| 54 | + if len(new_states) > 0: |
| 55 | + inserted_ids = (await State.insert_many(new_states)).inserted_ids |
79 | 56 | next_state_ids.extend(inserted_ids) |
80 | 57 |
|
81 | | - # ---- Create next states ---- |
82 | | - background_tasks.add_task( |
83 | | - create_next_states, |
84 | | - next_state_ids, |
85 | | - state.identifier, |
86 | | - state.namespace_name, |
87 | | - state.graph_name, |
88 | | - state.parents, |
89 | | - ) |
| 58 | + background_tasks.add_task(create_next_states, next_state_ids, state.identifier, state.namespace_name, state.graph_name, state.parents) |
90 | 59 |
|
91 | | - return ExecutedResponseModel( |
92 | | - status=StateStatusEnum.EXECUTED |
93 | | - ) |
| 60 | + return ExecutedResponseModel(status=StateStatusEnum.EXECUTED) |
94 | 61 |
|
95 | 62 | except Exception as e: |
96 | | - logger.error( |
97 | | - f"Error executing state {state_id} for namespace {namespace_name}", |
98 | | - x_exosphere_request_id=x_exosphere_request_id, |
99 | | - error=e, |
100 | | - ) |
101 | | - raise |
| 63 | + logger.error(f"Error executing state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id, error=e) |
| 64 | + raise e |
0 commit comments