
    iz                     T    d Z ddlmZmZmZ ddlmZ ddlmZ ddl	m
Z
  G d d      Zy)	z4Executor for DAG (Directed Acyclic Graph) workflows.    )DictListCallable)Node)
LogManager)ParallelExecutorc            
       h    e Zd ZdZdedeeef   deee      de	egdf   fdZ
dd	Zd
ee   ddfdZy)DAGExecutorzExecute DAG workflows.
    
    Features:
    - Execute layer by layer following the topology
    - Support parallel execution inside a layer
    - Serialize Human nodes automatically
    log_managernodeslayersexecute_node_funcNc                 ^    || _         || _        || _        || _        t	        ||      | _        y)a	  Initialize the executor.
        
        Args:
            log_manager: Logger instance
            nodes: Mapping of node ids to ``Node`` objects
            layers: Topological layers
            execute_node_func: Callable used to execute a single node
        N)r   r   r   r   r   parallel_executor)selfr   r   r   r   s        K/Users/bowang/.openclaw/workspace/ChatDev/workflow/executor/dag_executor.py__init__zDAGExecutor.__init__   s1     '
!2!1+u!E    returnc                     t        | j                        D ]7  \  }}| j                  j                  d| d|        | j	                  |       9 y)zExecute the DAG workflow.zExecuting Layer z with nodes: N)	enumerater   r   debug_execute_layer)r   	layer_idxlayer_nodess      r   executezDAGExecutor.execute(   sJ    &/&<"I{""%5i[k]#[\, '=r   r   c                 Z     dt         ddf fd} j                  j                  ||       y)z#Execute a single topological layer.node_idr   Nc                     j                   |    }|j                         rj                  |       y j                  j	                  d|  d       y )NzNode z skipped - not triggered)r   is_triggeredr   r   r   )r   noder   s     r   execute_if_triggeredz8DAGExecutor._execute_layer.<locals>.execute_if_triggered0   sI    ::g&D  "&&t,  &&wi7O'PQr   )strr   execute_nodes_parallel)r   r   r"   s   `  r   r   zDAGExecutor._execute_layer.   s2    	R# 	R$ 	R 	55kCWXr   )r   N)__name__
__module____qualname____doc__r   r   r#   r   r   r   r   r   r    r   r   r
   r
   
   sq    FF CIF T#Y	F
 $TFDL1F*-	Y$s) 	Y 	Yr   r
   N)r(   typingr   r   r   entity.configsr   utils.log_managerr   #workflow.executor.parallel_executorr   r
   r)   r   r   <module>r.      s#    : ' '  ( @-Y -Yr   