
    i#                         d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
mZmZmZmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z"m#Z#  G d d      Z$y)z!Batch workflow execution helpers.    N)Path)AnyDictListOptional)load_config)LogLevel)GraphConfig)ValidationError)TaskInputBuilder)GraphExecutor)GraphContext)	BatchTask)validate_workflow_filename)WARE_HOUSE_DIRYAML_DIRc                       e Zd ZdZddZddddded	ed
ee   dedede	e
   ddfdZdedeeeef      ddfdZded	ededede	e
   deeef   fdZedefd       Zededefd       Zededefd       Zy)BatchRunServicez9Runs batch workflows and reports progress over WebSocket.returnNc                 @    t        j                  t              | _        y N)logging	getLogger__name__logger)selfs    N/Users/bowang/.openclaw/workspace/ChatDev/server/services/batch_run_service.py__init__zBatchRunService.__init__   s    ''1       batch)max_parallel	file_base	log_level
session_id	yaml_filetasksr"   r#   r$   c                   
K   }t        |      }	j                  d||	dd       d {    t        j                  |      dd
g t        j                         dt
        dd f
 f
ddt
        dd ffdt        j                  fd	|D          d {     j                         j                  d
||	
dd       d {    y 7 7 ;7 	w)Nbatch_started)batch_idtotaltypedatar   taskr   c                   
K   | j                   xs t        t        j                               }
j	                   d|       }j                  d| j                  ||dd       d {    	 t        j                  
j                  | |       d {   }dz  4 d {    	j                  | j                  ||d|d   |d   |d	   |d
   dd	       d d d       d {    j                  d| j                  |||d
   |d   |d   dd       d {    y 7 7 7 7 B# 1 d {  7  sw Y   RxY w7 "# t        $ r}dz  4 d {  7   	j                  | j                  ||dd d dd t        |      d	       d d d       d {  7   n# 1 d {  7  sw Y   nxY wj                  d| j                  ||t        |      dd       d {  7   Y d }~y d }~ww xY ww)N-batch_task_started)	row_indextask_idtask_dirr,      successduration_mstoken_usagegraph_outputresults )	r3   r4   r5   statusr8   r9   r:   r;   errorbatch_task_completed)r3   r4   r5   r;   r9   r8   failedbatch_task_failed)r3   r4   r5   r>   )r4   struuiduuid4_sanitize_labelsend_messager3   asyncio	to_thread_run_single_taskappend	Exception)r/   r4   r5   resultexcfailure_countr#   r$   result_lockresult_rowsr   r%   success_countwebsocket_managerr&   s        r   run_taskz+BatchRunService.run_batch.<locals>.run_task9   s    ll7c$**,&7G++yk7),DEH#000%)^^#*$,
 
 
A&00))   "&;&&)-'.(0&/+1-+@+1-+@,2>,B'-i'8%'
 '; (44 6)-'.(0'-i'8+1-+@+1-+@!
  I
 ';;;;  "&;&&)-'.(0&.+/+/,.'+%(X
 ';;;; (44 3)-'.(0%(X	!  !s   A(G7+D&,G71'E D(E (D*)E ,2D.E )D,*6E  E!E %G7(E *E ,E .E 4D75E <E 	G4G/EG//FG/FG/F1	%F(&F1	-7G/$G'%G/*G7/G44G7c                    K   4 d {     |        d {    d d d       d {    y 7 %7 7 	# 1 d {  7  sw Y   y xY wwr    )r/   rS   	semaphores    r   run_with_limitz1BatchRunService.run_batch.<locals>.run_with_limit   s3      ytn$$ !yy$ !yyysE   A0A626A4A6AA?AAc              3   .   K   | ]  } |        y wr   rU   ).0r/   rW   s     r   	<genexpr>z,BatchRunService.run_batch.<locals>.<genexpr>   s     F~d3s   batch_completed)r*   r+   	succeededr@   )lenrF   rG   	SemaphoreLockr   gather_write_batch_outputs)r   r%   r&   r'   rR   r"   r#   r$   r*   r+   rN   rO   rP   rS   rW   rV   rQ   s   ``` ` ``  @@@@@@@r   	run_batchzBatchRunService.run_batch    s     E
,,$8e.TU
 	
 	

 %%l3	,.llnR	 R	t R	 R	h	%y 	%T 	% nnFFGGG!!*k:,,) ("!.+	
 	
 	
O	
F 	H	
s4   )C0C*A7C00C,13C0$C.%C0,C0.C0rP   c                 B   t         d| z  }|j                  dd       |dz  }|dz  }g d}|j                  ddd	
      5 }t        j                  ||d      }|j                          |D ]Z  }	t        |	      }
t        j                  |
j                  d            |
d<   |
j                  dd      |
d<   |j                  |
       \ 	 d d d        |j                  dd	      5 }t        j                  ||dd       d d d        y # 1 sw Y   ?xY w# 1 sw Y   y xY w)Nsession_T)parentsexist_okzbatch_results.csvzbatch_manifest.json)r3   r4   r5   r=   r8   r9   r;   r>   wr<   zutf-8)newlineencodingignore)
fieldnamesextrasactionr9   r:   r;   )ri      )ensure_asciiindent)r   mkdiropencsv
DictWriterwriteheaderdictjsondumpsgetwriterowdump)r   r%   rP   output_rootcsv_path	json_pathrk   handlewriterrowrow_copys              r   ra   z$BatchRunService._write_batch_outputs   s   $*'>>$6!44"77		

 ]]3W]=^^FzPXYF "9*.**X\\-5P*Q'&.ll>2&F#)	 # > ^^C'^2fIIk6QG 32 >= 32s   BD	&D	DDr/   r5   c                    | j                  |      }t        ||j                  xs d       }t        d |j                  j
                  D              rt        dd|i      t        d| z  }t        j                  |j                  ||t        |      |j                        }	d|	j                  d	<   |r||	_        ||	j                  _        t        |	
      }
t!        j"                         }t%        |
|      }| j'                  |j(                  |      }|j+                  |       t-        t!        j"                         |z
  dz        }|j.                  |j0                  j3                         ||j5                         dS )N)vars_overridec              3   :   K   | ]  }|j                   d k(    yw)humanN)r-   )rY   nodes     r   rZ   z3BatchRunService._run_single_task.<locals>.<genexpr>   s     C0BtyyG#0Bs   z,Batch execution does not support human nodesr&   detailsrd   )namer{   source_pathvarsTfixed_output_dir)config)r%   i  )r;   r9   r8   r:   )_resolve_yaml_pathr   r   anygraphnodesr   r   r
   from_definitionrB   r   metadatar$   
definitionr   timeperf_counterr   _build_task_inputattachment_store_executeintoutputstoken_trackerget_token_usageget_final_output)r   r%   r&   r/   r5   r$   	yaml_pathdesignr{   graph_configgraph_context
start_timeexecutor
task_inputr8   s                  r   rI   z BatchRunService._run_single_task   sb    ++I6	Yd6H6H6PDQC0B0BCC!>$i0 
 %*'>>"22LL#I
 5901%.L"09L##-$L9&&(
 :F++H,E,EtL
*%4,,.;tCD  ''#11AAC&$557	
 	
r   c                     |j                   r1t        |       }|j                  |j                  |j                         S |j                  S r   )attachment_pathsr   build_from_file_pathstask_prompt)r   r/   builders      r   r   z!BatchRunService._build_task_input   sA      &'78G001A1A4CXCXYYr   valuec                 Z    t        j                  dd|       }|j                  d      xs dS )Nz[^a-zA-Z0-9._-]+_r/   )resubstrip)r   cleaneds     r   rE   zBatchRunService._sanitize_label   s)    &&,c59}}S!+V+r   yaml_filenamec                 p    t        | d      }t        |z  }|j                         st        dd|i      |S )NT)require_yaml_extensionzYAML file not foundr&   r   )r   r   existsr   )r   	safe_namer   s      r   r   z"BatchRunService._resolve_yaml_path   s>    .}UYZ	y(	!!"7+yAYZZr   )r   N)r   
__module____qualname____doc__r   rB   r   r   r   r   r	   rb   r   r   ra   rI   staticmethodr   rE   r   r   rU   r   r   r   r      sI   C2  (,@
@
 @
 I	@
 @
 @
 H%@
 
@
DHs Hd3PS8nAU HZ^ H<+
+
 +
 	+

 +
 H%+
 
c3h+
Z  )     ,s ,s , , # $  r   r   )%r   rG   rr   rv   r   r   r   rC   pathlibr   typingr   r   r   r   check.checkr   entity.enumsr	   entity.graph_configr
   utils.exceptionsr   utils.task_inputr   workflow.graphr   workflow.graph_contextr   server.services.batch_parserr    server.services.workflow_storager   server.settingsr   r   r   rU   r   r   <module>r      sP    '  
   	    , , # ! + , - ( / 2 G 4e er   