
    i;                         d Z ddlZddlmZmZmZmZ ddlm	Z	 ddl
mZ ddlmZmZ ddlmZmZ ddlmZ  G d	 d
      Zy)a  Dynamic edge executor for edge-level Map and Tree execution.

Handles dynamic node expansion based on edge-level dynamic configuration.
When a message passes through an edge with dynamic config, the target node
is virtually expanded into multiple instances based on split results.
    N)CallableDictListOptional)Node)DynamicEdgeConfig)MessageMessageRole)create_splitter_from_configgroup_messages)
LogManagerc                   l   e Zd ZdZdedeeee   gee   f   fdZ		 ddedede
d	eee      d
ee   f
dZ	 ddedee   de
d	eee      d
ee   f
dZ	 ddedeee      de
d	eee      d
ee   f
dZ	 ddedeee      de
d	eee      d
ee   f
dZdedee   ded
ee   fdZdedee   deded
ee   f
dZy)DynamicEdgeExecutora  Execute edge-level dynamic expansion.
    
    When an edge has dynamic configuration, this executor:
    1. Splits the payload passing through the edge
    2. Executes the target node for each split unit
    3. Collects and returns results (flat for Map, reduced for Tree)
    log_managernode_executor_funcc                      || _         || _        y)zInitialize the dynamic edge executor.
        
        Args:
            log_manager: Logger instance
            node_executor_func: Function to execute a node with inputs
        N)r   r   )selfr   r   s      T/Users/bowang/.openclaw/workspace/ChatDev/workflow/executor/dynamic_edge_executor.py__init__zDynamicEdgeExecutor.__init__   s     '"4    Ntarget_nodepayloaddynamic_configstatic_inputsreturnc                    |j                   }t        |      }|j                  |g      }|s+| j                  j                  d|j                   d       g S | j                  j                  d|j                   dt        |       d       |j                         r| j                  ||||      S |j                         r| j                  ||||      S t        d|j                         )a  Execute dynamic expansion for an edge.
        
        Args:
            target_node: The node to execute (will be used as template)
            payload: The message passing through the edge
            dynamic_config: Edge dynamic configuration
            static_inputs: Optional static inputs from non-dynamic edges
            
        Returns:
            List of output messages from all executions
        Dynamic edge ->  : no execution units after splitz: splitting into z parallel unitsUnknown dynamic type: )splitr   r   debugidinfolenis_map_execute_mapis_tree_execute_tree
ValueErrortype)r   r   r   r   r   split_configsplitterexecution_unitss           r   executezDynamicEdgeExecutor.execute)   s    $ &++ /|< #..'3""";>>"22RS I{~~..?O@T?UUde	
   "$$_nm  ##%%%_nm  5n6I6I5JKLLr   inputsc                 d   |j                   }|xs g }t        |      }|j                  |      }|s?| j                  j                  d|j                   d       |r| j                  ||      S g S | j                  j                  d|j                   dt        |       dt        |       d|j                   d	|rdt        |       dnd	z          |j                         r| j                  ||||      S |j                         r| j                  ||||      S t        d
|j                         )aC  Execute dynamic expansion using all collected inputs.
        
        This method is called from _execute_node when a node has incoming edges
        with dynamic configuration. All inputs are already collected and passed here.
        
        Args:
            target_node: The node to execute
            inputs: Dynamic edge inputs to be split
            dynamic_config: Edge dynamic configuration
            static_inputs: Non-dynamic edge inputs to be replicated to all units
            
        Returns:
            List of output messages from all executions
        zDynamic node r   z: splitting z dynamic inputs into z parallel units (z mode)z, with z! static inputs replicated to each r   )r    r   r   r!   r"   r   r#   r$   r*   r%   r&   r'   r(   r)   )r   r   r/   r   r   r+   r,   r-   s           r   execute_from_inputsz'DynamicEdgeExecutor.execute_from_inputsX   sW   * &++%+ /|< #..0""//OP ..{MJJIKNN+<F}DY?#$$5n6I6I5J&RR_]+,,MNegi	
   "$$_nm  ##%%%_nm  5n6I6I5JKLLr   r-   c                 :   |j                         }|j                  }g }|xs g }t        |      dk(  r7t        |      |d   z   }| j	                  ||d      }	|j                  |	       n0t        t        |      |      }
t        j                  j                  |
      5 }i }t        |      D ]6  \  }}t        |      |z   }|j                  | j                  |||      }|||<   8 i }t        j                  j                  |      D ]U  }||   }	 |j                         }|||<   | j                  j                  d|j                    d| dt        |       d       W t'        t        |            D ]  }||v s|j                  ||           	 d	d	d	       | j                  j)                  d|j                    d
t        |       d       |S # t"        $ r4}| j                  j%                  d|j                    d| d|         d	}~ww xY w# 1 sw Y   xY w)a]  Execute in Map mode (fan-out only).
        
        Args:
            target_node: Target node template
            execution_units: Split message units
            dynamic_config: Dynamic configuration
            static_inputs: Static inputs to copy to all units
            
        Returns:
            Flat list of all output messages
           r   max_workersr   #z: completed with  outputs: failed with error: Nz: Map completed with z total outputs)as_map_configmax_parallelr$   list_execute_unitextendmin
concurrentfuturesThreadPoolExecutor	enumeratesubmitas_completedresultr   r!   r"   	Exceptionerrorranger#   )r   r   r-   r   r   
map_configr;   all_outputsunit_inputsoutputseffective_workersexecutorrA   idxunitfutureresults_by_idxrF   es                      r   r&   z DynamicEdgeExecutor._execute_map   sA   $ $113
!..%'%+1$}-0BBK((k1EGw' !$C$8, G##66CT6UYa@B!*?!;IC"&}"5"<K%__**KcF '*GFO "< <>(00==gFF!&/C!'.4s+((...{~~.>au E..1&k](D G" !_!56Cn,#**>#+>? 79 V@ 	{~~. /""%k"2!3>C	

 # % ((...{~~.>au E2236 ) VUs8   A0HAGH;H	H/H		HHHc                    |j                         }|t        d|j                         |j                  }|j                  }|xs g }g }|D ]  }	|j                  |	        |sg S | j                  j                  d|j                   dt        |       d|        d}
d}t        |      dkD  r|
dz  }
t        ||      }| j                  j                  d|j                   d	|
 d
t        |       d       g }t        |      dk(  r;|d   }|rt        |      |z   }| j                  |||
d      }|j                  |       nt        t        |      |      }t        j                  j!                  |      5 }i }t#        |      D ];  \  }}|}|rt        |      |z   }|j%                  | j                  |||
|      }|||<   = i }t        j                  j'                  |      D ]  }||   }	 |j)                         }|||<    t/        t        |            D ]  }||v s|j                  ||           	 ddd       | j                  j                  d|j                   d	|
 dt        |       d       |}d}|
dkD  r*| j                  j-                  d|j                   d       nt        |      dkD  r| j                  j                  d|j                   d|
 dt        |       d       |S # t*        $ r7}| j                  j-                  d|j                   d|
 d| d|         d}~ww xY w# 1 sw Y   xY w)ay  Execute in Tree mode (fan-out + reduce).
        
        Args:
            target_node: Target node template
            execution_units: Split message units
            dynamic_config: Dynamic configuration
            static_inputs: Static inputs (used in first layer only)
            
        Returns:
            Single-element list with the final reduced result
        Nz'Invalid tree configuration for edge -> r   z: Tree starting with z inputs, group_size=r   Tr4   z layer z: processing z groupsr5   r7   -r9   z: produced r8   Fd   z: exceeded maximum layersz: Tree completed after z layers with z
 output(s))as_tree_configr)   r"   
group_sizer;   r>   r   r#   r$   r   r!   r<   _execute_groupr?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   )r   r   r-   r   r   tree_configrY   r;   current_messagesrQ   layeris_first_layergroupslayer_outputsgroup_inputsrM   rN   rO   rA   rP   grouprR   rS   rF   rT   s                            r   r(   z!DynamicEdgeExecutor._execute_tree   s   $ %335F{~~FVWXX ++
"//%+ +-#D##D) $  I{~~. /""%&6"7!88LZLZ	

  "#a'QJE $$4jAF""";>>"2'% A!&k]'3
 ,.M6{a%ay!#'#6#EL--k<PQR$$W- %(F\$B!''::GX:Y]eDFG&/&7
U',)+/+>+ML!) //lESV" +. '8 @BN","4"4"A"A'"J%fo"%+]]_F28N3/	 #K  %S[1.0)001DE  23 Z: """;>>"2'% A./x9
  -"N s{  &&&{~~&66OP E "#a'H 	{~~. /$$)7-<L8M7NjZ	

  ?  ) " ,,22"2;>>2B!E7!C5 Q667S!: ""% ZYs7   :A5L:0K7L:#L:7	L7 2L22L77L::MnoderL   
unit_indexc           
      ~   | j                   j                  d|j                   d| dt        |       d       |D cg c]  }|j	                          }}|D ]#  }t        |j                        }||d<   ||_        % | j                  ||      }|D ]#  }t        |j                        }||d<   ||_        % |S c c}w )zExecute a single map unit.r   r7   : executing with  inputsdynamic_edge_unit_index)r   r!   r"   r$   clonedictmetadatar   )r   rc   rL   rd   msgrk   rM   s          r   r=   z!DynamicEdgeExecutor._execute_unitM  s     	twwiq 5!+./w8	
 /::kssyy{k:CCLL)H2<H./#CL  ))$< CCLL)H2<H./#CL 
  ;s   B:ra   r]   group_indexc                    |j                    d| d| }| j                  j                  d| dt        |       d       |D cg c]  }|j	                          }}|D ](  }t        |j                        }||d<   ||d<   ||_        * | j                  ||      }|D ]B  }t        |j                        }||d<   ||d<   ||d<   ||_        t        j                  |_
        D |S c c}w )	zExecute a single tree group.r7   rV   r   rf   rg   dynamic_edge_tree_layerdynamic_edge_tree_groupdynamic_edge_instance_id)r"   r   r!   r$   ri   rj   rk   r   r
   USERrole)	r   rc   ra   r]   rm   instance_idrl   rk   rM   s	            r   rZ   z"DynamicEdgeExecutor._execute_groupl  s    	5';-8{m+<S=N<OwW	
 0<<|		|<CCLL)H27H./2=H./#CL	   ))$= CCLL)H27H./2=H./3>H/0#CL"''CH  ' =s   C%)N)__name__
__module____qualname____doc__r   r   r   r   r	   r   r   r   r.   r2   r&   r(   intr=   rZ    r   r   r   r      s   55 %dDM%:DM%IJ5& 26-M-M -M *	-M
  W.-M 
g-Mh 266M6M W6M *	6M
  W.6M 
g6Mz 26EE d7m,E *	E
  W.E 
gEX 26t t  d7m,t  *	t 
  W.t  
gt l '] 	
 
g>## 7m# 	#
 # 
g#r   r   )rx   concurrent.futuresr@   typingr   r   r   r   entity.configsr   'entity.configs.edge.dynamic_edge_configr   entity.messagesr	   r
   runtime.node.splitterr   r   utils.log_managerr   r   rz   r   r   <module>r      s0     1 1  E 0 M (} }r   