@@ -623,79 +623,75 @@ def _run(
623623 based on the flow syntax. It also supports custom task injection
624624 and multiple execution loops as configured.
625625 """
626- try :
627- self .conversation .add ("User" , task )
626+ self .conversation .add ("User" , task )
628627
629- if not self .validate_flow ():
630- logger .error ("Flow validation failed" )
631- return "Invalid flow configuration."
628+ if not self .validate_flow ():
629+ logger .error ("Flow validation failed" )
630+ return "Invalid flow configuration."
632631
633- tasks = self .flow .split ("->" )
634- response_dict = {}
632+ tasks = self .flow .split ("->" )
633+ response_dict = {}
635634
636- logger .info (
637- f"Starting task execution with { len (tasks )} steps"
638- )
635+ logger .info (
636+ f"Starting task execution with { len (tasks )} steps"
637+ )
639638
640- # Handle custom tasks
641- if custom_tasks is not None :
642- logger .info ("Processing custom tasks" )
643- c_agent_name , c_task = next (
644- iter (custom_tasks .items ())
645- )
646- position = tasks .index (c_agent_name )
639+ # Handle custom tasks
640+ if custom_tasks is not None :
641+ logger .info ("Processing custom tasks" )
642+ c_agent_name , c_task = next (
643+ iter (custom_tasks .items ())
644+ )
645+ position = tasks .index (c_agent_name )
647646
648- if position > 0 :
649- tasks [position - 1 ] += "->" + c_task
650- else :
651- tasks .insert (position , c_task )
647+ if position > 0 :
648+ tasks [position - 1 ] += "->" + c_task
649+ else :
650+ tasks .insert (position , c_task )
652651
653- loop_count = 0
654- while loop_count < self .max_loops :
655- logger .info (
656- f"Starting loop { loop_count + 1 } /{ self .max_loops } "
657- )
652+ loop_count = 0
653+ while loop_count < self .max_loops :
654+ logger .info (
655+ f"Starting loop { loop_count + 1 } /{ self .max_loops } "
656+ )
658657
659- for task_idx , task in enumerate (tasks ):
660- agent_names = [
661- name .strip () for name in task .split ("," )
662- ]
658+ for task_idx , task in enumerate (tasks ):
659+ agent_names = [
660+ name .strip () for name in task .split ("," )
661+ ]
663662
664- if len (agent_names ) > 1 :
665- # Concurrent processing - comma detected
666- concurrent_results = (
667- self ._run_concurrent_workflow (
668- agent_names = agent_names ,
669- img = img ,
670- * args ,
671- ** kwargs ,
672- )
673- )
674- response_dict .update (concurrent_results )
675-
676- else :
677- # Sequential processing
678- agent_name = agent_names [0 ]
679- result = self ._run_sequential_workflow (
680- agent_name = agent_name ,
681- tasks = tasks ,
663+ if len (agent_names ) > 1 :
664+ # Concurrent processing - comma detected
665+ concurrent_results = (
666+ self ._run_concurrent_workflow (
667+ agent_names = agent_names ,
682668 img = img ,
683669 * args ,
684670 ** kwargs ,
685671 )
686- response_dict [agent_name ] = result
672+ )
673+ response_dict .update (concurrent_results )
687674
688- loop_count += 1
675+ else :
676+ # Sequential processing
677+ agent_name = agent_names [0 ]
678+ result = self ._run_sequential_workflow (
679+ agent_name = agent_name ,
680+ tasks = tasks ,
681+ img = img ,
682+ * args ,
683+ ** kwargs ,
684+ )
685+ response_dict [agent_name ] = result
689686
690- logger . info ( "Task execution completed" )
687+ loop_count += 1
691688
692- return history_output_formatter (
693- conversation = self .conversation ,
694- type = self .output_type ,
695- )
689+ logger .info ("Task execution completed" )
696690
697- except Exception as e :
698- self ._catch_error (e )
691+ return history_output_formatter (
692+ conversation = self .conversation ,
693+ type = self .output_type ,
694+ )
699695
700696 def _catch_error (self , e : Exception ):
701697 """
@@ -722,7 +718,7 @@ def _catch_error(self, e: Exception):
722718 f"AgentRearrange: Id: { self .id } , Name: { self .name } . An error occurred with your agent '{ self .name } ': Error: { e } . Traceback: { e .__traceback__ } "
723719 )
724720
725- return e
721+ raise e
726722
727723 def run (
728724 self ,
@@ -791,11 +787,7 @@ def __call__(self, task: str, *args, **kwargs):
791787 >>> rearrange_system = AgentRearrange(agents=[agent1, agent2], flow="agent1 -> agent2")
792788 >>> result = rearrange_system("Process this data")
793789 """
794- try :
795- return self .run (task = task , * args , ** kwargs )
796- except Exception as e :
797- logger .error (f"An error occurred: { e } " )
798- return e
790+ return self .run (task = task , * args , ** kwargs )
799791
800792 def batch_run (
801793 self ,
@@ -830,31 +822,28 @@ def batch_run(
830822 Each batch is processed sequentially, but individual tasks within
831823 a batch may run concurrently depending on the flow configuration.
832824 """
833- try :
834- results = []
835- for i in range (0 , len (tasks ), batch_size ):
836- batch_tasks = tasks [i : i + batch_size ]
837- batch_imgs = (
838- img [i : i + batch_size ]
839- if img
840- else [None ] * len (batch_tasks )
825+ results = []
826+ for i in range (0 , len (tasks ), batch_size ):
827+ batch_tasks = tasks [i : i + batch_size ]
828+ batch_imgs = (
829+ img [i : i + batch_size ]
830+ if img
831+ else [None ] * len (batch_tasks )
832+ )
833+
834+ # Process batch using concurrent execution
835+ batch_results = [
836+ self .run (
837+ task = task ,
838+ img = img_path ,
839+ * args ,
840+ ** kwargs ,
841841 )
842+ for task , img_path in zip (batch_tasks , batch_imgs )
843+ ]
844+ results .extend (batch_results )
842845
843- # Process batch using concurrent execution
844- batch_results = [
845- self .run (
846- task = task ,
847- img = img_path ,
848- * args ,
849- ** kwargs ,
850- )
851- for task , img_path in zip (batch_tasks , batch_imgs )
852- ]
853- results .extend (batch_results )
854-
855- return results
856- except Exception as e :
857- self ._catch_error (e )
846+ return results
858847
859848 def concurrent_run (
860849 self ,
@@ -889,24 +878,21 @@ def concurrent_run(
889878 The number of concurrent executions is limited by max_workers parameter.
890879 Each task runs independently through the full agent workflow.
891880 """
892- try :
893- with ThreadPoolExecutor (
894- max_workers = max_workers
895- ) as executor :
896- imgs = img if img else [None ] * len (tasks )
897- futures = [
898- executor .submit (
899- self .run ,
900- task = task ,
901- img = img_path ,
902- * args ,
903- ** kwargs ,
904- )
905- for task , img_path in zip (tasks , imgs )
906- ]
907- return [future .result () for future in futures ]
908- except Exception as e :
909- self ._catch_error (e )
881+ with ThreadPoolExecutor (
882+ max_workers = max_workers
883+ ) as executor :
884+ imgs = img if img else [None ] * len (tasks )
885+ futures = [
886+ executor .submit (
887+ self .run ,
888+ task = task ,
889+ img = img_path ,
890+ * args ,
891+ ** kwargs ,
892+ )
893+ for task , img_path in zip (tasks , imgs )
894+ ]
895+ return [future .result () for future in futures ]
910896
911897 async def run_async (
912898 self ,
0 commit comments