
    i,                         d Z ddlZddlmZ ddlmZmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z&m'Z'  G d d      Z(y)zCService responsible for executing workflows for WebSocket sessions.    N)Path)ListOptionalUnion)load_config)GraphConfig)Message)LogLevel)ValidationErrorWorkflowCancelledError)get_server_loggerLogType)TaskInputBuilder)GraphContext)AttachmentService)SessionExecutionController)SessionStatusWorkflowSessionStore)WebSocketGraphExecutor)validate_workflow_filename)WARE_HOUSE_DIRYAML_DIRc                       e Zd ZdedededdfdZddded	ee   de	fd
Z
ddddedededeee      dee   ddfdZdedededee   deddfdZdedededee   deee   ef   f
dZdedefdZy)WorkflowRunServicesession_storesession_controllerattachment_servicereturnNc                 j    || _         || _        || _        t        j                  t
              | _        y )N)r   r   r   logging	getLogger__name__logger)selfr   r   r   s       Q/Users/bowang/.openclaw/workspace/ChatDev/server/services/workflow_run_service.py__init__zWorkflowRunService.__init__   s.     +"4"4''1    )reason
session_idr(   c                   | j                   j                  |      }|sy|xs d}||_        |j                  j	                         s6|j                  j                          | j                  j                  d|       |j                  r	 |j                  j                  |       | j                   j                  |t        j                  |       y# t        $ r'}| j                  j                  d||       Y d }~Xd }~ww xY w)NFCancellation requestedz%Cancellation requested for session %sz7Failed to propagate cancellation to executor for %s: %serror_messageT)r   get_sessioncancel_reasoncancel_eventis_setsetr#   infoexecutorrequest_cancel	Exceptionwarningupdate_session_statusr   	CANCELLED)r$   r)   r(   sessioncancel_messageexcs         r%   r5   z!WorkflowRunService.request_cancel$   s    $$00<;#; .##**,  $$&KKDjQp  //? 	00]=T=Tdr0s	  p##$]_iknoops   	C 	DC<<D)attachments	log_level	yaml_filetask_promptr=   r>   c                  K   |xs dj                         }	 | j                  |      }|j                  }|xs g }|r|j                         s|st        ddt	        |      i      | j
                  j                  |       | j                  j                  ||||       | j                  j                  |t        j                         |j                  |d||dd       d {    | j                  ||||||       d {    y 7 #7 # t        $ r}	| j                  j                  t!        |	             t#               }
|
j                  d	t$        j&                  ||t)        |	d
d              | j                  j+                  |t!        |	             |j                  |ddt!        |	      id       d {  7   Y d }	~	y d }	~	wt,        $ r}	| j                  j                  d| d|	        t#               }
|
j/                  |	d||       | j                  j+                  |t!        |	             |j                  |ddd|	 id       d {  7   Y d }	~	y d }	~	ww xY ww)N zTask prompt cannot be emptytask_prompt_provideddetails)r?   r@   r)   r=   workflow_started)r?   r@   typedataWorkflow validation errorrE   )log_typer)   r?   validation_detailserrormessagez$Error starting workflow for session z: zError starting workflow)r)   r?   zFailed to start workflow: )strip_resolve_yaml_pathnamer   boolr   prepare_session_workspacer   create_sessionr8   r   RUNNINGsend_message_execute_workflow_asyncr#   rM   strr   r   WORKFLOWgetattrset_session_errorr6   log_exception)r$   r)   r?   r@   websocket_managerr=   r>   normalized_yaml_name	yaml_pathr<   r#   s              r%   start_workflowz!WorkflowRunService.start_workflow8   sx     !*R668C	//0DEI#,>> %+K{'8'8':K%13T+5FG 
 ##==jI--.'%'	 .  44ZAVAVW#00.*>{[   ..!    	KKc#h'&(FLL+ ))%.#*3	4#@   00SXF#00 9c#h*?@    	KK DZLPRSVRWXY&(F  )%.	 !  00SXF#00#&*DSE(JK  	s   I(CD &D
'D DD 	I(
D D 	I%B%G<F?=GI(I%BI II I( I%%I(r_   c                 
  K   | j                   j                  |      }|r|j                  nd }	 t        |      }	t	        j
                  |	j                  d| t        t        |      |	j                        }
|r||
_
        ||
j                  _
        t        |
      }t        ||| j                  | j                  || j                   |      }|rG||_        ||_        |j                  j#                         r|j%                  |j&                  xs d       | j)                  |||||j*                        }|j-                  |       d {    |r7|j#                         r'|r|j&                  nd}t/        ||j0                        |j3                         }| j                   j5                  ||       |j7                  |d||j9                         |j:                  j=                         dd	       d {    t?               }|jA                  d
tB        jD                  |t        |      tG        |tH              rtK        |      nd       | j                   j                  |      }|rd |_        d |_        | j                  j_                  |       ||j`                  vr| j                   jc                  |       y y 7 t7 # t.        $ r}t        |      }| j                   jM                  |tN        jP                  |       |j7                  |dd|id	       d {  7   t?               }|jA                  dtB        jD                  |t        |      |       Y d }~d }~wtR        $ r}| j                   jU                  |t        |             |j7                  |ddt        |      id	       d {  7   t?               }|jW                  dtB        jD                  |t        |      tY        |dd              Y d }~d }~wtZ        $ r~}| j                   jU                  |t        |             |j7                  |ddd| id	       d {  7   t?               }|j]                  |d| |t        |             Y d }~:d }~ww xY w# | j                   j                  |      }|rd |_        d |_        | j                  j_                  |       ||j`                  vr| j                   jc                  |       w w xY ww)Nsession_)rQ   output_rootsource_pathvars)config)r0   r+   )workflow_idworkflow_completed)resultssummarytoken_usagerG   z)Workflow execution completed successfullyr   )rK   r)   r_   result_countr,   workflow_cancelledrN   zWorkflow execution cancelled)rK   r)   r_   cancellation_reasonrM   rJ   rE   )rK   r)   r_   rL   zWorkflow execution error: z%Error executing workflow for session )r)   r_   )2r   r.   r0   r   r   from_definitiongraphr   rX   re   r>   
definitionr   r   r   r   r4   r1   r5   r/   _build_initial_task_inputattachment_storeexecute_graph_asyncr   rQ   get_resultscomplete_sessionrV   final_messagetoken_trackerget_token_usager   r3   r   rY   
isinstancedictlenr8   r   r9   r   r[   rM   rZ   r6   r\   cleanup_sessionactive_connectionspop_session)r$   r)   r_   r@   r]   r=   r>   r:   r0   designgraph_configgraph_contextr4   
task_inputr(   ri   r#   r<   session_refs                      r%   rW   z*WorkflowRunService._execute_workflow_async   s     $$00</6w++Dy	; +F&66
|,*	N[[L )2&4=''1(=M-''''!"")H  -#+ ''..0++G,A,A,]E]^77))J ..z:::  3 3 529..?W,VASASTT**,G//
GD#000#*#0#>#>#@'/'='='M'M'O
 
 
 '(FKK; ))%i.-7-FS\A  j ,,88DK'+$$(!##33J?!2!E!EE""..z: Fe ;
( & 	XF44ZAXAXhn4o#000&/   '(FKK. ))%i.$*     	00SXF#00 9c#h*?@   '(FLL+ ))%i.#*3	4#@     	00SXF#00 98RSVRW6X*YZ   '(F  7
|D%i.	 !  	 ,,88DK'+$$(!##33J?!2!E!EE""..z: Fs   ,TDK 
J>B+K 6K7AK A1T>K K 	RAM L <M R  R,AP4N75AP<R RARQ0RR RR A2TTr   promptattachment_idsc                 |    |s|S | j                   j                  |||      }t        |      j                  ||      S )N)target_store)r   build_attachment_blocksr   build_from_blocks)r$   r)   r   r   r   storeblockss          r%   rr   z,WorkflowRunService._build_initial_task_input  sK     M((@@ A 

  &88HHr'   yaml_filenamec                 p    t        |d      }t        |z  }|j                         st        dd|i      |S )z@Validate and resolve YAML paths inside the configured directory.T)require_yaml_extensionzYAML file not foundr?   rD   )r   r   existsr   )r$   r   	safe_namer_   s       r%   rP   z%WorkflowRunService._resolve_yaml_path   s@     /}UYZ	y(	!!"7+yAYZZr'   )r"   
__module____qualname__r   r   r   r&   rX   r   rR   r5   r   r
   r`   r   rW   r   r   r	   rr   rP    r'   r%   r   r      s[   	2+	2 7	2 .		2
 
	2 JN  # RV 6 ,0(,NN N 	N d3i(N H%N 
N`D;D; D; 	D; #YD; D; 
D;LII $I 	I
 S	I 
tG}c!	"I$  r'   r   ))__doc__r    pathlibr   typingr   r   r   check.checkr   entity.graph_configr   entity.messagesr	   entity.enumsr
   utils.exceptionsr   r   utils.structured_loggerr   r   utils.task_inputr   workflow.graph_contextr   "server.services.attachment_servicer   !server.services.session_executionr   server.services.session_storer   r   "server.services.websocket_executorr    server.services.workflow_storager   server.settingsr   r   r   r   r'   r%   <module>r      sJ    I   ( ( # + # ! D > - / @ H M E G 4O Or'   