
    i*                     H    d Z ddlZddlmZmZmZmZ ddlm	Z	  G d d      Z
y)z:Parallel execution helpers that eliminate duplicated code.    N)AnyCallableListTuple)
LogManagerc                       e Zd ZdZdedefdZ	 ddee   de	de	ege
f   d	e	egef   dz  d
df
dZdee
   de	e
gdf   d
dfdZdee   d	e	egef   dz  d
eee   ee   f   fdZdee   de	de	ege
f   d
dfdZdee   de	de	ege
f   d
dfdZy)ParallelExecutorzManage parallel execution for workflow nodes.
    
    Provides shared logic for parallel batches and serializes Human nodes when needed.
    log_manager
nodes_dictc                      || _         || _        y)zInitialize the parallel executor.
        
        Args:
            log_manager: Logger instance
            nodes_dict: Mapping of ``node_id`` to ``Node``
        N)r
   r   )selfr
   r   s      P/Users/bowang/.openclaw/workspace/ChatDev/workflow/executor/parallel_executor.py__init__zParallelExecutor.__init__   s     '$    Nitemsexecutor_funcitem_desc_funchas_blocking_funcreturnc                     | j                  ||      \  }}|r| j                  |||       |r| j                  |||       yy)aa  Execute a list of items in parallel when possible.
        
        Args:
            items: Items to execute
            executor_func: Callable that executes a single item
            item_desc_func: Callable for logging a human-readable description
            has_blocking_func: Optional callable to decide if an item requires serialization
        N)_partition_blocking_items_execute_parallel_batch_execute_sequential_batch)r   r   r   r   r   blocking_itemsparallel_itemss          r   execute_items_parallelz'ParallelExecutor.execute_items_parallel   sK     *.)G)GO`)a&((W**>=.Y r   node_idsc                 l    dt         dt         fd}dt         dt        fd}| j                  ||||       y)a  Execute a list of nodes in parallel.
        
        Convenience wrapper around ``execute_items_parallel`` specialized for nodes.
        
        Args:
            node_ids: List of node identifiers
            executor_func: Callable that executes a single node
        node_idr   c                     d|  S )Nznode  r   s    r   r   z?ParallelExecutor.execute_nodes_parallel.<locals>.item_desc_func=   s    7)$$r   c                      y)NFr!   r"   s    r   r   zBParallelExecutor.execute_nodes_parallel.<locals>.has_blocking_func@   s    r   N)strboolr   )r   r   r   r   r   s        r   execute_nodes_parallelz'ParallelExecutor.execute_nodes_parallel0   sD    	%C 	%C 	%	s 	t 	 	##		
r   c                 z    g }g }|D ]/  }|r ||      r|j                  |       |j                  |       1 ||fS )z3Split items into blocking and parallelizable lists.)append)r   r   r   r   r   items         r   r   z*ParallelExecutor._partition_blocking_itemsJ   sL     D %6t%<%%d+%%d+	  ~--r   c           
      .   | j                   j                  dt        |       d       t        j                  j                  t        |            5 }g }|D ]'  }|j                  ||      }|j                  ||f       ) |D ]:  \  }}	 |j                          | j                   j                   ||       d       < 	 ddd       y# t        $ r5}| j                   j                   ||       dt        |               d}~ww xY w# 1 sw Y   yxY w)zExecute a batch of items in parallel.
        
        Args:
            items: Items to execute
            executor_func: Callable per item
            item_desc_func: Callable returning a readable description
        
Executing z items in parallel)max_workers completed successfully	 failed: N)r
   debuglen
concurrentfuturesThreadPoolExecutorsubmitr(   result	Exceptionerrorr$   )	r   r   r   r   executorr2   r)   futurees	            r   r   z(ParallelExecutor._execute_parallel_batch[   s    	CJ<7IJK22s5z2JhG!=f~. 
 !(fMMO$$**nT.B-CCZ+[\ !( KJ ! $$**nT.B-C9SQRVH+UV KJs0   7D
4C
>D
	D0DDDDc           	      8   |D ]T  }| j                   j                  d ||       d       	  ||       | j                   j                   ||       d       V y# t        $ r5}| j                   j                   ||       dt	        |               d}~ww xY w)zExecute a batch of items sequentially.
        
        Args:
            items: Items to execute
            executor_func: Callable per item
            item_desc_func: Callable returning a readable description
        r+   z (sequential)r-   r.   N)r
   r/   r6   r7   r$   )r   r   r   r   r)   r:   s         r   r   z*ParallelExecutor._execute_sequential_batchy   s     D""Zt0D/E]#STd#  &&.*>)??V'WX	 
    &&.*>)?yQ'QRs   ,A	B$0BB)N)__name__
__module____qualname____doc__r   dictr   r   r   r   r$   r%   r   r&   r   r   r   r   r!   r   r   r	   r	   	   sd   
%J %D % ;?ZCyZ  Z !#,	Z
 $SE4K047Z 
Z.
s)
  t,
 
	
4.Cy. $SE4K047. 
tCy$s)#	$	."Cy   !#,	
 
<Cy   !#,	
 
r   r	   )r?   concurrent.futuresr1   typingr   r   r   r   utils.log_managerr   r	   r!   r   r   <module>rD      s     @  - - (D Dr   