
    i,                    t   U d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m	Z	m
Z
 ddlmZmZmZ ddlZerddlmZ ddlmZ dd	lmZ  ej.                  e      Zd
ZdZdZdZdZ	 	 	 	 	 	 	 	 ddZ	 	 	 	 	 	 	 	 	 	 ddZ 	 	 	 	 	 	 	 	 	 	 	 	 ddZ! e"       Z#de$d<   i Z%de$d<   	 	 	 	 	 	 	 	 	 	 ddZ&ddZ'ddZ(y)a  Distributed notification queue for background task events (SEP-1686).

Enables distributed Docket workers to send MCP notifications to clients
without holding session references. Workers push to a Redis queue,
the MCP server process subscribes and forwards to the client's session.

Pattern: Fire-and-forward with retry
- One queue per session_id
- LPUSH/BRPOP for reliable ordered delivery
- Retry up to 3 times on delivery failure, then discard
- TTL-based expiration for stale messages

Note: Docket's execution.subscribe() handles task state/progress events via
Redis Pub/Sub. This module handles elicitation-specific notifications that
require reliable delivery (input_required prompts, cancel signals).
    )annotationsN)suppress)datetimetimezone)TYPE_CHECKINGAnycast)Docket)ServerSession)FastMCPz"fastmcp:notifications:{session_id}z)fastmcp:notifications:{session_id}:activei,        c                  K   |j                  t        j                  |             }t        j                  |dt        j                  t        j                        j                         d      }|j                         4 d{   }|j                  ||       d{    |j                  |t               d{    ddd      d{    y7 M7 57 7 # 1 d{  7  sw Y   yxY ww)a~  Push notification to session's queue (called from Docket worker).

    Used for elicitation-specific notifications (input_required, cancel)
    that need reliable delivery across distributed processes.

    Args:
        session_id: Target session's identifier
        notification: MCP notification dict (method, params, _meta)
        docket: Docket instance for Redis access
    
session_idr   )notificationattemptenqueued_atN)keyNOTIFICATION_QUEUE_KEYformatjsondumpsr   nowr   utc	isoformatredislpushexpireNOTIFICATION_TTL_SECONDS)r   r   docketr   messager   s         r/Users/bowang/.openclaw/workspace/ChatDev/.venv/lib/python3.12/site-packages/fastmcp/server/tasks/notifications.pypush_notificationr$   0   s      **+22j2I
JCjj(#<<5??A	
G ||~~kk#w'''ll3 8999 ~~'9 ~~~sl   BC/CC/CCC<C=CC/CC/CCC/C, C#!C,(C/c                  K   |j                  t        j                  |             }|j                  t        j                  |             }t        j                  d|        	 	 |j                         4 d{   }|j                  |dt        dz         d{    t        t        |j                  |gt                     d{   }|s	 ddd      d{    |\  }}	t        j                  |	      }
|
d   }|
j                  d	d
      }	 t        ||| ||       d{    t        j                  d| |dz          ddd      d{    7 7 7 7 7 7# t         $ r}|t"        dz
  k  r`|dz   |
d	<   t%        |      |
d<   |j'                  |t        j(                  |
             d{  7   t        j                  d| |dz   |       nt        j+                  d| t"        |       Y d}~d}~ww xY w7 # 1 d{  7  sw Y   xY w# t,        j.                  $ r t        j                  d|        Y yt         $ r@}t        j                  d| |       t-        j0                  d       d{  7   Y d}~.d}~ww xY ww)a~  Subscribe to notification queue and forward to session.

    Runs in the MCP server process. Bridges distributed workers to clients.

    This loop:
    1. Maintains a heartbeat (active subscriber marker for debugging)
    2. Blocks on BRPOP waiting for notifications
    3. Forwards notifications to the client's session
    4. Retries failed deliveries, then discards (no dead-letter queue)

    Args:
        session_id: Session identifier to subscribe to
        session: MCP ServerSession for sending notifications
        docket: Docket instance for Redis access
        fastmcp: FastMCP server instance (for elicitation relay)
    r   z/Starting notification subscriber for session %sN1   )ex)timeoutr   r   r   z1Delivered notification to session %s (attempt %d)   
last_errorz5Requeued notification for session %s (attempt %d): %sz<Discarding notification for session %s after %d attempts: %sz0Notification subscriber cancelled for session %sz0Notification subscriber error for session %s: %s)r   r   r   NOTIFICATION_ACTIVE_KEYloggerdebugr   setSUBSCRIBER_TIMEOUT_SECONDSr	   r   brpopr   loadsget_send_mcp_notification	ExceptionMAX_DELIVERY_ATTEMPTSstrr   r   warningasyncioCancelledErrorsleep)r   sessionr!   fastmcp	queue_key
active_keyr   result_message_bytesr"   notification_dictr   
send_errores                  r#   notification_subscriber_looprF   L   sJ    , 

188J8OPI3::j:QRJ
LLBJO
:	#||~~ii
C4NQR4RiSSS  $i[:TU    &~~ $* =**]3$+N$;!!++i30!2J   LLK"!- &~ %S && ! !6!::-4q[	*03J-#kk)TZZ5HIIIS&#aK&	 Z&1&	7 &~~~b %% 	LLKZX 	#LLBJPQ --"""		#s  A"J%H 9E:H =HE-H
EHH EH "J#1HE!'E(E!H G?H JH HHH E!!	G<*A
G74F75=G72H7G<<H?H HH
HH )J	?JJ		/J8I;9J>JJ		Jc           	     4  K   |j                  dd      }|dk7  rt        d|       t        j                  j                  j                  d|j                  di       |j                  d      d      }t        j                  j                  |      }| j                  |       d{    |j                  di       }|j                  d      d	k(  r|j                  di       }	|	j                  d
i       }
|
j                  d      }|r|j                  d      }|st        j                  d       yddl
m} t        j                   || ||||      d|dd        }t        j                  |       |j!                  t        j"                         yyy7 w)a0  Reconstruct MCP notification from dict and send to session.

    For input_required notifications with elicitation metadata, also sends
    a standard elicitation/create request to the client and relays the
    response back to the worker via Redis.

    Args:
        session: MCP ServerSession
        notification_dict: Notification as dict (method, params, _meta)
        session_id: Session identifier (for elicitation relay)
        docket: Docket instance (for notification delivery)
        fastmcp: FastMCP server instance (for elicitation relay)
    methodznotifications/tasks/statusz0Unsupported notification method for subscriber: params_meta)rH   rI   rJ   Nstatusinput_requiredz$io.modelcontextprotocol/related-taskelicitationtaskIdz:input_required notification missing taskId, skipping relayr   )relay_elicitationzelicitation-relay-   name)r3   
ValueErrormcptypesTaskStatusNotificationmodel_validateServerNotificationsend_notificationr-   r8    fastmcp.server.tasks.elicitationrO   r9   create_task_background_tasksaddadd_done_callbackdiscard)r<   rC   r   r!   r=   rH   r   server_notificationrI   metarelated_taskrM   task_idrO   tasks                  r#   r4   r4      s    ( ""8-IJF--KF8TUU9933BB2'++Hb9&**73	
L ))66|D

#
#$7
888 ""8R0Fzz(// $$Wb1xx FK"&&}5jj*GP J&&!':wWU)'"1+7D !!$'""#4#<#<= 	 0 9s   B%F'F(C/Fzset[asyncio.Task[None]]r\   z@dict[str, tuple[asyncio.Task[None], weakref.ref[ServerSession]]]_active_subscribersc                  K   | t         v rwt         |    \  }}|j                         s |       y|j                         s<|j                          t        t        j
                        5  | d{    ddd       t         | = t	        j                  t        | |||      d| dd        }|t        j                  |      ft         | <   t        j                  d|        y7 q# 1 sw Y   pxY ww)a  Start notification subscriber if not already running (idempotent).

    Subscriber is created on first task submission and cleaned up on disconnect.
    Safe to call multiple times for the same session.

    Args:
        session_id: Session identifier
        session: MCP ServerSession
        docket: Docket instance
        fastmcp: FastMCP server instance (for elicitation relay)
    Nznotification-subscriber-rP   rQ   z.Started notification subscriber for session %s)re   donecancelr   r9   r:   r[   rF   weakrefrefr-   r.   )r   r<   r!   r=   rd   session_refs         r#   ensure_subscriber_runningrl      s     $ ((/
;kyy{{}8 yy{KKM'001

 2
+ $Z&'J'
2A'78D (,W[[-A&B
#
LLA:N  21s1   A&C.(C".C /C"3A-C. C""C+'C.c                .  K   | t         vryt         j                  |       \  }}|j                         s<|j                          t	        t
        j                        5  | d{    ddd       t        j                  d|        y7 ## 1 sw Y   "xY ww)zStop notification subscriber for a session.

    Called when session disconnects. Pending messages remain in queue
    for delivery if client reconnects (with TTL expiration).

    Args:
        session_id: Session identifier
    Nz.Stopped notification subscriber for session %s)	re   poprg   rh   r   r9   r:   r-   r.   )r   rd   rA   s      r#   stop_subscriberro     sq      ,,!%%j1GD!99;g,,-JJ .
LLA:N  .-s0   ABB	#B$B	(BB		BBc                      t        t              S )z2Get number of active subscribers (for monitoring).)lenre        r#   get_subscriber_countrt   *  s    "##rs   )r   r7   r   dict[str, Any]r!   r
   returnNone)
r   r7   r<   r   r!   r
   r=   r   rv   rw   )r<   r   rC   ru   r   r7   r!   r
   r=   r   rv   rw   )r   r7   rv   rw   )rv   int))__doc__
__future__r   r9   r   loggingri   
contextlibr   r   r   typingr   r   r	   	mcp.typesrT   r!   r
   mcp.server.sessionr   fastmcp.server.serverr   	getLogger__name__r-   r   r,   r    r6   r0   r$   rF   r4   r/   r\   __annotations__re   rl   ro   rt   rr   rs   r#   <module>r      st  " #      ' + + 0-			8	$ > E     :: : : 
	:8V#V#V# V# 	V#
 
V#r8>8>%8> 8> 	8>
 8> 
8>@ .1U * 2    
%O%O%O %O 	%O
 
%OPO($rs   