
    i[                         d Z ddlZddlZddlmZmZmZmZmZm	Z	 ddl
mZmZ ddlmZ ddlmZ ddlmZ ddlmZ  G d	 d
      Zy)z:Cycle executor that runs workflow graphs containing loops.    N)DictListCallableAnySetOptional)NodeEdgeLink)
LogManager)CycleManager)ParallelExecutor)GraphTopologyBuilderc                   L   e Zd ZdZdedeeef   deeee	f      de
deegdf   f
dZd)d
Zdeeee	f      d	dfdZdeeee	f      d	dfdZdeee	f   d	dfdZded	dfdZdeee	f   d	dfdZdedee   d	edz  fdZdedee   deded	ee   f
dZdee   ded	eee      fdZ	 d*dee   dee   d	eeef   fdZ	 d+dee   dedeee      ded	eeee	f      f
d Z	 	 d,d!eeeee	f         d"ed#ee   dee   ded	ee   fd$Z	 d+ded%ee   d&ed	ee   fd'Zdedee   d	efd(Zy)-CycleExecutorzExecute workflow graphs that contain cycles.
    
    Features:
    - Scheduling is based on "super nodes"
    - Parallel execution inside cycles
    - Automatic detection of exit conditions
    log_managernodescycle_execution_ordercycle_managerexecute_node_funcNc                 l    || _         || _        || _        || _        || _        t        ||      | _        y)af  Initialize the cycle executor.
        
        Args:
            log_manager: Logger instance
            nodes: Mapping of node ids to nodes
            cycle_execution_order: Super-node execution order with cycles
            cycle_manager: Cycle manager coordinating iterations
            execute_node_func: Callable that executes a single node
        N)r   r   r   r   r   r   parallel_executor)selfr   r   r   r   r   s         M/Users/bowang/.openclaw/workspace/ChatDev/workflow/executor/cycle_executor.py__init__zCycleExecutor.__init__   s:    " '
%:"*!2!1+u!E    returnc           	          | j                   j                  d       t        | j                        D ]A  \  }}| j                   j                  d| dt	        |       d       | j                  |       C y)z&Run the workflow that contains cycles.z6Executing graph with cycles using super-node schedulerzExecuting super-node layer z with z itemsN)r   debug	enumerater   len_execute_super_layer)r   	layer_idxlayer_itemss      r   executezCycleExecutor.execute/   sm    WX&/0J0J&K"I{""%@6RUVaRbQcci#jk%%k2 'Lr   r#   c                 &    | j                  |       y)z"Execute a single super-node layer.N)_execute_super_layer_parallel)r   r#   s     r   r!   z"CycleExecutor._execute_super_layer7   s    **;7r   c                     dt         t        t        f   dt        fd}| j                  j	                  || j
                  |       y)z'Execute a super-node layer in parallel.itemr   c                 X    | d   dk(  rd| d    S | d   dk(  rd| d    S d| d   d	    S )
Ntypecyclecycle cycle_idnodenode node_idr   r    r(   s    r   item_desc_funczCCycleExecutor._execute_super_layer_parallel.<locals>.item_desc_func=   sX    F|w&Z 0122f'tI/00 tG}Q/011r   N)r   strr   r   execute_items_parallel_execute_super_item)r   r#   r3   s      r   r&   z+CycleExecutor._execute_super_layer_parallel;   sB    	2c3h 	2C 	2 	55$$	
r   r(   c                     |d   dk(  r| j                  |d   d          y|d   dk(  r| j                  |d          y|d   dk(  r| j                  |       yy)	z1Execute a single super-node item (node or cycle).r*   layerr   r   r.   r0   r+   N)_execute_single_node_execute_cycle)r   r(   s     r   r6   z!CycleExecutor._execute_super_itemM   sb    <7"%%d7mA&67&\V#%%d9o6&\W$% %r   r0   c                     | j                   j                  d|        | j                  |   }|j                         r| j	                  |       y| j                   j                  d| d       y)zExecute a non-cycle node.zExecuting non-cycle node: Node z% is not triggered, skipping executionN)r   r   r   is_triggeredr   warning)r   r0   r.   s      r   r9   z"CycleExecutor._execute_single_nodeX   sd    !;G9EFzz'"""4($$uWI5Z%[\r   
cycle_infoc                    |d   }|d   }| j                   j                  d| d|        	 | j                  ||      }| | j                   j                  d| d       y|| j                  j                  |   _        | j                   j                  d| d|        | j                  j                  |       | j                  |||| j                  j                  |   j                         	       | j                  j                  |       | j                   j                  d| d
       y# t        $ r*}| j                   j	                  t        |              d}~ww xY w)z0Execute a cycle using the multi-iteration logic.r-   r   zExecuting cycle z with nodes: NCycle z= has no triggered entry node in this pass; skipping executionz initial node: max_iterationsz
 completed)r   r   _validate_cycle_entry
ValueErrorerrorr4   r   cyclesinitial_nodeactivate_cycle_execute_cycle_with_iterationsget_max_iterationsdeactivate_cycle)r   r?   r-   r   initial_node_ides         r   r:   zCycleExecutor._execute_cycleb   sl   j)7#!1(=PQ	"885IO
 """
"_`  <K!!(+8z@QRS 	))(3 	++--44X>QQS	 	, 	
 	++H5z<=7  	""3q6*	s   D0 0	E#9%EE#r-   c           	         g }|D ]q  }| j                   |   }|j                  D ]Q  }|j                  |vs|j                  |      }|s&|j                  s3|j
                  s@|j                  |        q s | j                  j                  j                  |      }|r|j                  nd}	t        |      dk(  r|	r|	S yt        |      dkD  rt        d| d| d      |d   }
|	r|
|	k7  rt        d| d|	 d|
 d	      |
S )
aT  
        Validate that exactly one node in the cycle is triggered by external edges.

        Args:
            cycle_id: The cycle ID
            nodes: List of node IDs in the cycle

        Returns:
            The ID of the unique initial node

        Raises:
            ValueError: If no node or multiple nodes are triggered
        Nr      rA   z% has multiple triggered entry nodes: z>. Only one entry node must be triggered when entering a cycle.z entry mismatch: configured 'z' but triggered '')r   predecessorsidfind_outgoing_edgetrigger	triggeredappendr   rG   getconfigured_entry_noder    rE   )r   r-   r   triggered_nodesr0   r.   predecessoredger?   configured_entry
entry_nodes              r   rD   z#CycleExecutor._validate_cycle_entry   s8    &(G::g&D#00>>.&99'BD'..w7  1  ''..228<
?I:;;t1$''!A%
"GGX YO O  %Q'

.> >
"?@P?Q R"",Q0 
 r   cycle_nodesrM   rC   c                    d}||k  r| j                   j                  d| d|dz    d|        | j                  ||      }| j                  ||||dk(        }| j	                  |||||dk(        }|r,| j                   j                  d| dt        |              |S | j                  ||      s | j                   j                  d| d	       n|dz  }||k  r||k\  r"| j                   j                  d| d
| d       t               S )a  
        Execute a cycle with multiple iterations.

        Args:
            cycle_id: Cycle ID
            cycle_nodes: List of all nodes in the cycle
            initial_node_id: Initial node ID
            max_iterations: Maximum number of iterations

        Returns:
            A tuple of two sets:
                - exit_nodes: nodes triggered outside the *current* cycle scope
                - external_nodes: subset of exit_nodes that are also outside the
                  provided parent_cycle_nodes scope
        r   rA   z iteration rP   /)is_first_iteration)rM   rb   z$ exited - external nodes triggered: z) completed - initial node not retriggeredz reached max iterations ())	r   r   _detect_cycles_in_scope"_build_topological_layers_in_scope_execute_scope_layerssorted_is_initial_node_retriggeredr>   set)	r   r-   r_   rM   rC   	iterationinner_cyclesexecution_layersexternal_nodess	            r   rJ   z,CycleExecutor._execute_cycle_with_iterations   sf   , 	.(""
+i!m_An=MN
  77_UL  $FF_l$-N  G   "77  /$-N 8 N   &&XJ&J6R`KaJbc &% 44_kR  &&XJ&OP NII .(L &$$
";N;K1M ur   scope_nodesc                     | j                  ||      }t        j                  |      }|D cg c]  }t        |      dkD  r| }}|S c c}w )a#  
        Detect nested cycles within the scoped subgraph.

        Constructs a subgraph containing only:
        1. Nodes in scope_nodes
        2. Edges where both source and target are in scope_nodes
        3. Initial node's incoming edges are REMOVED (to break the outer cycle)

        Args:
            scope_nodes: List of node IDs in the current scope
            initial_node_id: Initial node ID (whose incoming edges are removed)

        Returns:
            List of detected nested cycles (excluding the current cycle itself)
        clear_entry_noderP   )_build_scoped_nodesr   detect_cyclesr    )r   rn   rM   scoped_nodes
all_cyclesr+   nested_cycless          r   rd   z%CycleExecutor._detect_cycles_in_scope   sd    * //o/^ *77E
  *
)e5zA~ z 	 

 
s   Arq   c           	         i }t        |      }|D ]U  }| j                  |   }t        j                  |      }|j                         D cg c]A  }|j                  j
                  |v r'|j                  r|j                  j
                  |k7  r|C }	}|	|_        ||k(  rg |_        n[g }
|j                  D ]C  }|j
                  |v s|j                  |      }|s&|j                  s3|
j                  |       E |
|_        |j                  D cg c]C  j
                  |v r3j
                  |k7  r$t        fd|j                         D              rE }}||_        |||<   X |S c c}w c c}w )a  
        Build a scoped subgraph containing only nodes and edges within the scope.

        Args:
            scope_nodes: List of node IDs in the scope
            clear_entry_node: If specified, this node's incoming edges will be removed
                            (used to break the outer cycle when detecting nested cycles)

        Returns:
            Dictionary of scoped nodes
        c              3      K   | ]5  }|j                   j                  j                  k(  xr |j                   7 y wN)targetrS   rU   ).0	edge_linksuccs     r   	<genexpr>z4CycleExecutor._build_scoped_nodes.<locals>.<genexpr>U  s<      %H	 $$''4772Hy7H7HH%Hs   ;>)ri   r   copyiter_outgoing_edgesrz   rS   rU   _outgoing_edgesrR   rT   rW   
successorsany)r   rn   rq   rt   scope_nodes_setr0   original_nodescoped_noder|   scoped_edgesscoped_predecessorspredr\   r}   scoped_successorss                ` r   rr   z!CycleExecutor._build_scoped_nodes  s     k*"G JJw/M))M2K
 ,9+L+L+N+Ni##&&/9%%$$''+;; +N   +7K' **+-(&(#)66Dww/1#66w?DLL/66t< 7 ,?(
 "/!9!9!!977o-GG// %2%F%F%H  !9  ! &7K"$/L!U #X K0!s   AE,AE1rk   rb   c           	         | j                  |d      }|r||v r:g ||   _        n/|D ]*  }| j                  |   j                         s!g ||   _        , g }t	               }|r|j                  |       n6|D ]1  }| j                  |   j                         s!|j                  |       3 |D ]  }|j                  |      }	|	s|	j                         D ]Y  }
|
j                  j                  |v r|j                  ||
j                  j                  |
j                  |
j                  d       [  |st        j                  |      }|S t        j                  |||      }t        j                   ||      }|S )a  
        Build topological execution order for the scoped subgraph.

        Args:
            scope_nodes: List of node IDs in the scope
            initial_node_id: Initial node ID
            inner_cycles: List of nested cycles detected in the scope
            is_first_iteration: Whether this is the first iteration (affects initial node handling)

        Returns:
            List of execution layers, each containing execution items
        Nrp   )fromtorU   	condition)rr   rR   r   r=   ri   addrX   r   rz   rS   rW   rU   r   r   build_dag_layerscreate_super_node_graphtopological_sort_super_nodes)r   rn   rM   rk   rb   rt   r0   r   exclude_targetsr   r|   layerssuper_graphs                r   re   z0CycleExecutor._build_topological_layers_in_scope`  s   * //d/S
 ,.=?_-: '::g&3359;L)6 '  %0 '::g&335#''0 ' #G&**73K!,!@!@!BI ''**o=  '' ''..11#,#4#4%.%8%8	) 	 "C	 #  )::<HFM /FFlLK *FF\F Mr   rl   parent_cycle_idparent_cycle_nodesc           	         
 t        |      t               
t        j                         t        j                         dt        t
           ddf
fddt        t
        t        f   dt
        fd}|D ]d  }j                         r nRdt        t
        t        f   ddf fd} j                  j                  |||       j                         sd n 
r$D ]  }	 j                  |	   j                          ! 
S )a  
        Execute scoped layers with parallelism, supporting nested cycles.

        Args:
            execution_layers: List of execution layers
            parent_cycle_id: Parent cycle ID
            parent_cycle_nodes: List of nodes in the parent cycle
            initial_node_id: Initial node ID (for first iteration special handling)
            is_first_iteration: Whether this is the first iteration

        Returns:
            external_nodes: subset of exit_nodes outside parent_cycle_nodes_set
        r   r   Nc                     | sy 5  | r!j                  |        j                          d d d        y # 1 sw Y   y xY wry   )updateri   )r   rm   result_lock
stop_events    r   record_externalz<CycleExecutor._execute_scope_layers.<locals>.record_external  s1    "))%0NN$ s   $4=r(   c                 D    | d   dk(  rd| d    S | d   dk(  rd| d    S y)	Nr*   r.   r/   r0   r+   r,   r-   
layer_itemr1   r2   s    r   	item_descz6CycleExecutor._execute_scope_layers.<locals>.item_desc  sB    F|v%tI/00F|w&Z 0122r   c                 H   j                         ry | d   dk(  r.| d   }xr |k(  }j                  ||      }|r	 |       y y | d   dk(  r| d   }| d   }j                  j                  d| d	        	 j	                  ||      }| j                  j                  d
| d       y j                  |||d      }|r|D 	ch c]  }	|	vr|	
 }
}	|
r	 |
       y y y y # t
        $ r*}j                  j                  t        |              d }~ww xY wc c}	w )Nr*   r.   r0   )force_executer+   r   r-   zExecuting nested cycle z within cycle zNested cycle z! has no triggered entry; skippingd   rB   )	is_set#_execute_single_cycle_node_in_scoper   r   rD   rE   rF   r4   rJ   )r(   _node_idr   targetsinner_cycle_nodesinner_cycle_idinner_initial_noderN   inner_external_nodesr.   filteredrM   rb   r   r   scope_node_setr   r   s              r   executor_funcz:CycleExecutor._execute_scope_layers.<locals>.executor_func  s   $$&<6)#IH$6$XH<WM"FF &&3 G G
 '0  &\W,(,W%%)*%5N$$**1.1AP_O`a-1-G-G*,=.* *1((..+N+;;\] +/+N+N&)*'*	 ,O ,( , )=$(<#>9 !(< ! $
 $+H5 $ ,; - & ((..s1v6$$s   =C) D)	D2%DD)ri   	threadingEventLockr   r4   r   r   r   r   r5   r   reset_triggers)r   rl   r   r   rM   rb   r   r8   r   r0   rm   r   r   r   r   s   ` ` ``    @@@@@r   rf   z#CycleExecutor._execute_scope_layers  s    * /0#&5__&
nn&	%3s8 	% 	%	 DcN 	 s 	  &E  "36DcN 36t 36 36j ""99   "A &D )

7#224 * r   r   r   c                    | j                   |   }|s|j                         s
t               S |j                         D ]	  }d|_         | j                  |       t               }|j                         D ]  }|j                  j                  |vs|j                  s)| j                  j                  d| d|j                  j                          |j                  |j                  j                          |S )a  
        Execute a single node within a cycle scope.

        Args:
            node_id: Node ID to execute
            scope_node_set: Nodes that belong to the current scoped cycle
            force_execute: If True, execute even if not triggered (for initial node in first iteration)

        Returns:
            Set of node IDs triggered outside the current scoped cycle
        Fr<   z triggered external node )r   r=   ri   r   rV   r   rz   rS   r   r   r   )r   r0   r   r   r.   r|   external_targetss          r   r   z1CycleExecutor._execute_single_cycle_node_in_scope$  s    " zz'" $$&u 113I"'I 4 	t$ &)U113I"".8Y=P=P  &&G9$=i>N>N>Q>Q=RS !$$Y%5%5%8%89 4  r   c                     | j                   |   }|j                  D ]@  }|j                  |v s|j                  |      }|s&|j                  s3|j
                  s@ y y)a2  
        Check if the initial node is retriggered by any internal edge (from within the cycle).

        Args:
            initial_node_id: Initial node ID
            cycle_nodes: List of nodes in the cycle

        Returns:
            True if the initial node is retriggered by an internal edge
        TF)r   rR   rS   rT   rU   rV   )r   rM   r_   rH   r[   r\   s         r   rh   z*CycleExecutor._is_initial_node_retriggeredN  sU     zz/2'44K~~,"55oFDLLT^^ 5 r   )r   Nry   )F)NF)__name__
__module____qualname____doc__r   r   r4   r	   r   r   r   r   r   r$   r!   r&   r6   r9   r:   rD   intr   rJ   rd   r   rr   boolre   rf   r   rh   r1   r   r   r   r      s   FF CIF  $DcN3	F
 $F $TFDL1F038T#s(^0D 8 8
d38n9M 
RV 
$	&S#X 	&4 	&]C ]D ]%>c3h %>D %>R-c -$s) -d
 -^BB #YB 	B
 B 
SBF #Y    
c#h	 J +/?#Y? #3-? 
c4i	?L $)P#YP P 3s8n	P
 !P 
d38n	Pn *.#(ptDcN34p p !I	p
 "#p !p 
Spl $	( (  C(  	( 
 
S( T #Y 
	r   r   )r   r   r   typingr   r   r   r   r   r   entity.configsr	   r
   utils.log_managerr   workflow.cycle_managerr   #workflow.executor.parallel_executorr   workflow.topology_builderr   r   r1   r   r   <module>r      s/    @   ; ; ) ( / @ :X	 X	r   