﻿"""Graph orchestration adapted to ChatDev design_0.4.0 workflows."""

import threading
from typing import Any, Callable, Dict, List, Optional

from runtime.node.agent.memory import MemoryBase, MemoryFactory, MemoryManager
from runtime.node.agent.thinking import ThinkingManagerBase, ThinkingManagerFactory
from entity.configs import Node, EdgeLink, AgentConfig, ConfigError
from entity.configs.edge import EdgeConditionConfig
from entity.configs.node.memory import SimpleMemoryConfig
from entity.messages import Message, MessageRole
from runtime.node.executor.base import ExecutionContext
from runtime.node.executor.factory import NodeExecutorFactory
from utils.logger import WorkflowLogger
from utils.exceptions import ValidationError, WorkflowExecutionError, WorkflowCancelledError
from utils.structured_logger import get_server_logger
from utils.human_prompt import (
    CliPromptChannel,
    HumanPromptService,
    resolve_prompt_channel,
)
from workflow.cycle_manager import CycleManager
from workflow.graph_context import GraphContext
from workflow.graph_manager import GraphManager
from workflow.executor.resource_manager import ResourceManager
from workflow.runtime import (
    RuntimeBuilder,
    ResultArchiver,
    DagExecutionStrategy,
    CycleExecutionStrategy,
    MajorityVoteStrategy,
)
from workflow.runtime.runtime_context import RuntimeContext
from runtime.edge.conditions import (
    ConditionFactoryContext,
    build_edge_condition_manager,
)
from runtime.edge.processors import (
    ProcessorFactoryContext as PayloadProcessorFactoryContext,
    build_edge_processor as build_edge_payload_processor,
)
from workflow.executor.dynamic_edge_executor import DynamicEdgeExecutor


# ------------------------------------------------------------------
#  Executor class (includes all Memory and Thinking logic)
# ------------------------------------------------------------------

class ExecutionError(RuntimeError):
    """Raised when the workflow graph cannot be executed."""


class GraphExecutor:
    """Executes ChatDev_new graph workflows with integrated memory and thinking management."""

    def __init__(
        self,
        graph: GraphContext,
        *,
        session_id: Optional[str] = None,
        workspace_hook_factory: Optional[Callable[[RuntimeContext], Any]] = None,
        cancel_event: Optional[threading.Event] = None,
    ) -> None:
        """Initialize executor with graph context instance."""
        self.majority_result = None
        self.graph: GraphContext = graph
        self.outputs = {}
        self.logger = self._create_logger()
        self._cancel_event = cancel_event or threading.Event()
        self._cancel_reason: Optional[str] = None
        runtime = RuntimeBuilder(graph).build(logger=self.logger, session_id=session_id)
        if workspace_hook_factory:
            runtime.workspace_hook = workspace_hook_factory(runtime)
        self.runtime_context = runtime
        self.tool_manager = runtime.tool_manager
        self.function_manager = runtime.function_manager
        self.edge_processor_function_manager = runtime.edge_processor_function_manager
        self.log_manager = runtime.log_manager
        self.resource_manager = ResourceManager(self.log_manager)

        # Memory and Thinking management (moved from Graph)
        self.thinking_managers: Dict[str, ThinkingManagerBase] = {}
        self.global_memories: Dict[str, MemoryBase] = {}
        self.agent_memory_managers: Dict[str, MemoryManager] = {}

        # Token tracking
        self.token_tracker = runtime.token_tracker

        # Workspace roots
        self.code_workspace = runtime.code_workspace
        self.attachment_store = runtime.attachment_store
        
        # Cycle management
        self.cycle_manager: Optional[CycleManager] = None
        
        # Node executors (new strategy pattern implementation)
        self.__execution_context: Optional[ExecutionContext] = None
        self.node_executors: Dict[str, Any] = {}
        self._human_prompt_service: Optional[HumanPromptService] = None

        # for majority voting mode
        self.initial_task_messages: List[Message] = []

    def request_cancel(self, reason: Optional[str] = None) -> None:
        """Signal the executor to stop as soon as possible."""
        if reason:
            self._cancel_reason = reason
        elif not self._cancel_reason:
            self._cancel_reason = "Workflow execution cancelled"
        self._cancel_event.set()
        self.logger.info(f"Cancellation requested for workflow {self.graph.name}")

    def is_cancelled(self) -> bool:
        return self._cancel_event.is_set()

    def _raise_if_cancelled(self) -> None:
        if self.is_cancelled():
            message = self._cancel_reason or "Workflow execution cancelled"
            raise WorkflowCancelledError(message, workflow_id=self.graph.name)

    def _create_logger(self) -> WorkflowLogger:
        """Create and return a logger instance."""
        return WorkflowLogger(self.graph.name, self.graph.log_level)

    @classmethod
    def execute_graph(
        cls,
        graph: GraphContext,
        task_prompt: Any,
        *,
        cancel_event: Optional[threading.Event] = None,
    ) -> "GraphExecutor":
        """Convenience method to execute a graph with a task prompt."""
        executor = cls(graph, cancel_event=cancel_event)
        executor._execute(task_prompt)
        return executor

    def _execute(self, task_prompt: Any):
        self._raise_if_cancelled()
        results = self.run(task_prompt)
        self.graph.record(results)

    def _build_memories_and_thinking(self) -> None:
        """Initialize all memory and thinking managers before execution."""
        self._build_global_memories()
        self._build_thinking_managers()
        self._build_agent_memories()
        self._build_node_executors()

    def _build_global_memories(self) -> None:
        """Build global memories from config."""
        memory_config = self.graph.config.get_memory_config()
        if not memory_config:
            return

        for store in memory_config:
            if store.name in self.global_memories:
                error_msg = f"Duplicated memory name detected: {store.name}"
                self.log_manager.error(error_msg)
                raise ValidationError(error_msg, details={"memory_name": store.name})

            simple_cfg = store.as_config(SimpleMemoryConfig)
            if simple_cfg and (not simple_cfg.memory_path or simple_cfg.memory_path == "auto"):
                path = self.graph.directory / f"memory_{store.name}.json"
                simple_cfg.memory_path = str(path)

            try:
                memory_instance = MemoryFactory.create_memory(store)
                self.global_memories[store.name] = memory_instance
                memory_instance.load()
                self.log_manager.info(
                    f"Global memory '{store.name}' built successfully",
                    details={"memory_name": store.name},
                )
            except Exception as e:
                error_msg = f"Failed to create memory '{store.name}': {str(e)}"
                self.log_manager.error(error_msg, details={"memory_name": store.name})
                logger = get_server_logger()
                logger.log_exception(e, error_msg, memory_name=store.name)
                raise WorkflowExecutionError(error_msg, details={"memory_name": store.name})

    def _build_thinking_managers(self) -> None:
        """Build thinking managers for nodes that require them."""
        for node_id, node in self.graph.nodes.items():
            agent_config = node.as_config(AgentConfig)
            if agent_config and agent_config.thinking:
                self.thinking_managers[node_id] = ThinkingManagerFactory.get_thinking_manager(
                    agent_config.thinking
                )

    def _build_agent_memories(self) -> None:
        """Build memory managers for agent nodes referencing global stores."""
        for node_id, node in self.graph.nodes.items():
            agent_config = node.as_config(AgentConfig)
            if not (agent_config and agent_config.memories):
                continue
            try:
                self.agent_memory_managers[node_id] = MemoryManager(agent_config.memories, self.global_memories)
                self.log_manager.info(
                    f"Memory manager built for node {node_id}",
                    node_id=node_id,
                    details={"memory_refs": [mem.name for mem in agent_config.memories]},
                )
            except Exception as e:
                error_msg = f"Failed to create memory manager for node {node_id}: {str(e)}"
                self.log_manager.error(error_msg, node_id=node_id)
                logger = get_server_logger()
                logger.log_exception(e, error_msg, node_id=node_id)
                raise WorkflowExecutionError(error_msg, node_id=node_id)

    def _get_execution_context(self) -> ExecutionContext:
        if self.__execution_context is None:
            global_state = dict(self.runtime_context.global_state)
            global_state.setdefault("attachment_store", self.attachment_store)
            prompt_service = self._ensure_human_prompt_service()
            global_state.setdefault("human_prompt", prompt_service)
            self.__execution_context = ExecutionContext(
                tool_manager=self.tool_manager,
                function_manager=self.function_manager,
                log_manager=self.log_manager,
                memory_managers=self.agent_memory_managers,
                thinking_managers=self.thinking_managers,
                token_tracker=self.token_tracker,
                global_state=global_state,
                workspace_hook=self.runtime_context.workspace_hook,
                human_prompt_service=prompt_service,
                cancel_event=self._cancel_event,
            )
        return self.__execution_context
    
    def _build_node_executors(self) -> None:
        """Build node executors using strategy pattern."""

        # Create node executors
        self.node_executors = NodeExecutorFactory.create_executors(
            self._get_execution_context(),
            self.graph.subgraphs
        )

    def _ensure_human_prompt_service(self) -> HumanPromptService:
        if self._human_prompt_service:
            return self._human_prompt_service

        channel = resolve_prompt_channel(self.runtime_context.workspace_hook)
        if channel is None:
            channel = CliPromptChannel()

        self._human_prompt_service = HumanPromptService(
            log_manager=self.log_manager,
            channel=channel,
            session_id=self.runtime_context.session_id,
        )
        return self._human_prompt_service

    def _save_memories(self) -> None:
        """Save all memories after execution."""
        for memory in self.global_memories.values():
            memory.save()

    def run(self, task_prompt: Any) -> Dict[str, Any]:
        """Execute the graph based on topological layers structure or cycle-aware execution."""
        self._raise_if_cancelled()
        graph_manager = GraphManager(self.graph)
        try:
            graph_manager.build_graph()
        except ConfigError as err:
            error_msg = f"Graph configuration error: {str(err)}"
            self.log_manager.logger.error(error_msg)
            raise err

        self._prepare_edge_conditions()

        if not self.graph.layers:
            raise ExecutionError("Graph not built. Call GraphManager.build_graph() first.")

        # Record workflow start
        self.log_manager.record_workflow_start(self.graph.metadata)

        # Initialize memory and thinking before execution
        self._build_memories_and_thinking()

        # Initialize cycle manager if graph has cycles
        if self.graph.has_cycles:
            self.cycle_manager = graph_manager.get_cycle_manager()

        self.initial_task_messages = [msg.clone() for msg in self._normalize_task_input(task_prompt)]

        start_node_ids = set(self.graph.start_nodes)

        # Reset all trigger states and initialize configured start nodes
        for node_id, node in self.graph.nodes.items():
            self._raise_if_cancelled()
            node.reset_triggers()
            if node_id in start_node_ids:
                node.start_triggered = True
                node.clear_input()
                for message in self.initial_task_messages:
                    node.append_input(message.clone())

        # Execute based on graph type (using strategy objects)
        if self.graph.is_majority_voting:
            strategy = MajorityVoteStrategy(
                log_manager=self.log_manager,
                nodes=self.graph.nodes,
                initial_messages=self.initial_task_messages,
                execute_node_func=self._execute_node,
                payload_to_text_func=self._payload_to_text,
            )
            self.majority_result = strategy.run()
        elif self.graph.has_cycles:
            strategy = CycleExecutionStrategy(
                log_manager=self.log_manager,
                nodes=self.graph.nodes,
                cycle_execution_order=self.graph.cycle_execution_order,
                cycle_manager=self.cycle_manager,
                execute_node_func=self._execute_node,
            )
            strategy.run()
        else:
            strategy = DagExecutionStrategy(
                log_manager=self.log_manager,
                nodes=self.graph.nodes,
                layers=self.graph.layers,
                execute_node_func=self._execute_node,
            )
            strategy.run()

        self._raise_if_cancelled()

        # Collect final outputs and save memories
        self._collect_all_outputs()
        
        # Get the final result according to the new logic
        final_result = self.get_final_output()
        
        self._save_memories()

        # Export runtime artifacts
        archiver = ResultArchiver(self.graph, self.log_manager, self.token_tracker)
        archiver.export(final_result)

        return self.outputs
    
    def _prepare_edge_conditions(self) -> None:
        """Compile registered edge condition types into callable evaluators."""
        context = ConditionFactoryContext(function_manager=self.function_manager, log_manager=self.log_manager)
        processor_context = PayloadProcessorFactoryContext(
            function_manager=self.edge_processor_function_manager,
            log_manager=self.log_manager,
        )
        for node in self.graph.nodes.values():
            for edge_link in node.iter_outgoing_edges():
                condition_config = edge_link.condition_config
                if not isinstance(condition_config, EdgeConditionConfig):
                    raw_value = edge_link.config.get("condition", "true")
                    condition_config = EdgeConditionConfig.from_dict(raw_value, path=f"{node.path}.edges")
                    edge_link.condition_config = condition_config
                try:
                    manager = build_edge_condition_manager(condition_config, context, self._get_execution_context())
                except Exception as exc:  # pragma: no cover - defensive logging
                    error_msg = f"Failed to prepare condition '{condition_config.display_label()}': {exc}"
                    self.log_manager.error(error_msg)
                    logger = get_server_logger()
                    logger.log_exception(exc, error_msg, condition_type=condition_config.type)
                    raise WorkflowExecutionError(error_msg) from exc
                edge_link.condition_manager = manager
                label = getattr(manager, "label", None) or condition_config.display_label()
                metadata = getattr(manager, "metadata", {}) or {}
                edge_link.condition = label
                edge_link.condition_metadata = metadata
                edge_link.condition_type = condition_config.type

                process_config = edge_link.process_config
                if process_config:
                    try:
                        processor = build_edge_payload_processor(process_config, processor_context)
                    except Exception as exc:  # pragma: no cover
                        error_msg = (
                            f"Failed to prepare processor '{process_config.display_label()}': {exc}"
                        )
                        self.log_manager.error(error_msg)
                        logger = get_server_logger()
                        logger.log_exception(exc, error_msg, processor_type=process_config.type)
                        raise WorkflowExecutionError(error_msg) from exc
                    edge_link.payload_processor = processor
                    edge_link.process_type = process_config.type
                    edge_link.process_metadata = getattr(processor, "metadata", {}) or {}
                    processor_label = getattr(processor, "label", None)
                    if processor_label:
                        edge_link.config["process_label"] = processor_label
                else:
                    edge_link.payload_processor = None
                    edge_link.process_metadata = {}
                    edge_link.process_type = None

    def _process_edge_output(
            self,
            edge_link: EdgeLink,
            source_result: Message,
            from_node: Node
    ) -> None:
        """Perform edge instantiation behavior.
        
        Edges with dynamic configuration still pass messages normally to the target
        node's input queue. Dynamic execution happens when the target node executes.
        """
        # All edges (including dynamic ones) use standard processing to pass messages
        # Dynamic execution will happen in _execute_node when the target node runs
        
        # Standard edge processing (no dynamic config)
        manager = edge_link.condition_manager
        if manager is None:
            raise WorkflowExecutionError(
                f"Edge {from_node.id}->{edge_link.target.id} is missing a condition manager"
            )
        try:
            manager.process(
                edge_link,
                source_result,
                from_node,
                self.log_manager,
            )
        except Exception as exc:  # pragma: no cover - defensive logging
            error_msg = (
                f"Edge manager failed for {from_node.id} -> {edge_link.target.id}: {exc}"
            )
            self.log_manager.error(
                error_msg,
                details={
                    "condition_type": edge_link.condition_type,
                    "condition_metadata": edge_link.condition_metadata,
                },
            )
            logger = get_server_logger()
            logger.log_exception(
                exc,
                error_msg,
                condition_type=edge_link.condition_type,
                condition_metadata=edge_link.condition_metadata,
            )
            raise WorkflowExecutionError(error_msg) from exc


    def _get_dynamic_config_for_node(self, node: Node):
        """Get the dynamic configuration for a node from its incoming edges.
        
        If multiple incoming edges have dynamic config, they must be identical
        (same type and parameters). Otherwise raises an error.
        
        Returns the dynamic config if found, or None.
        """
        from entity.configs.edge.dynamic_edge_config import DynamicEdgeConfig
        
        found_configs = []  # List of (source_node_id, dynamic_config)
        
        for predecessor in node.predecessors:
            for edge_link in predecessor.iter_outgoing_edges():
                if edge_link.target is node and edge_link.dynamic_config is not None:
                    found_configs.append((predecessor.id, edge_link.dynamic_config))
        
        if not found_configs:
            return None
        
        if len(found_configs) == 1:
            return found_configs[0][1]
        
        # Multiple dynamic configs found - verify they are consistent
        first_source, first_config = found_configs[0]
        for source_id, config in found_configs[1:]:
            # Check type consistency
            if config.type != first_config.type:
                raise WorkflowExecutionError(
                    f"Node '{node.id}' has inconsistent dynamic configurations on incoming edges: "
                    f"edge from '{first_source}' has type '{first_config.type}', "
                    f"but edge from '{source_id}' has type '{config.type}'. "
                    f"All dynamic edges to the same node must use the same configuration."
                )
            # Check split config consistency
            if (config.split.type != first_config.split.type or
                config.split.pattern != first_config.split.pattern or
                config.split.json_path != first_config.split.json_path):
                raise WorkflowExecutionError(
                    f"Node '{node.id}' has inconsistent split configurations on incoming edges: "
                    f"edges from '{first_source}' and '{source_id}' have different split settings. "
                    f"All dynamic edges to the same node must use the same configuration."
                )
            # Check mode-specific config consistency
            if config.max_parallel != first_config.max_parallel:
                raise WorkflowExecutionError(
                    f"Node '{node.id}' has inconsistent max_parallel on incoming edges: "
                    f"edge from '{first_source}' has max_parallel={first_config.max_parallel}, "
                    f"but edge from '{source_id}' has max_parallel={config.max_parallel}."
                )
            if config.type == "tree" and config.group_size != first_config.group_size:
                raise WorkflowExecutionError(
                    f"Node '{node.id}' has inconsistent group_size on incoming edges: "
                    f"edge from '{first_source}' has group_size={first_config.group_size}, "
                    f"but edge from '{source_id}' has group_size={config.group_size}."
                )
        
        return first_config

    def _execute_with_dynamic_config(
        self,
        node: Node,
        inputs: List[Message],
        dynamic_config,
    ) -> List[Message]:
        """Execute a node with dynamic configuration from incoming edges.
        
        Args:
            node: Target node to execute
            inputs: All input messages collected for this node
            dynamic_config: Dynamic configuration from the incoming edge
            
        Returns:
            Output messages from dynamic execution
        """
        # Separate inputs: dynamic edge inputs vs static (non-dynamic) edge inputs
        # Dynamic edge inputs will be split, static inputs will be replicated to all units
        dynamic_inputs: List[Message] = []
        static_inputs: List[Message] = []
        
        for msg in inputs:
            if msg.metadata.get("_from_dynamic_edge"):
                dynamic_inputs.append(msg)
            else:
                static_inputs.append(msg)
        
        self.log_manager.info(
            f"Executing node {node.id} with edge dynamic config ({dynamic_config.type} mode): "
            f"{len(dynamic_inputs)} dynamic inputs, {len(static_inputs)} static inputs"
        )
        
        # Create node executor function
        def node_executor_func(n: Node, inp: List[Message]) -> List[Message]:
            return self._process_result(n, inp)
        
        # Execute with dynamic edge executor
        dynamic_executor = DynamicEdgeExecutor(self.log_manager, node_executor_func)
        
        # Pass dynamic inputs for splitting, static inputs for replication
        return dynamic_executor.execute_from_inputs(
            node, dynamic_inputs, dynamic_config, static_inputs=static_inputs
        )

    def _execute_node(self, node: Node) -> None:
        """Execute a single node."""
        self._raise_if_cancelled()
        with self.resource_manager.guard_node(node):
            input_results = node.input

            # Clear incoming triggers so future iterations wait for fresh signals
            node.reset_triggers()

            serialized_inputs = [message.to_dict(include_data=False) for message in input_results]

            # Record node start
            self.log_manager.record_node_start(node.id, serialized_inputs, node.node_type, {
                "input_count": len(input_results),
                "predecessors": [p.id for p in node.predecessors],
                "successors": [s.id for s in node.successors]
            })

            self.log_manager.debug(f"Processing {len(input_results)} inputs together for node {node.id}")

            # Check if any incoming edge has dynamic configuration
            dynamic_config = self._get_dynamic_config_for_node(node)
            
            # Process all inputs together in a single executor call
            with self.log_manager.node_timer(node.id):
                if dynamic_config is not None:
                    raw_outputs = self._execute_with_dynamic_config(node, input_results, dynamic_config)
                else:
                    raw_outputs = self._process_result(node, input_results)

            # Process all output messages
            output_messages: List[Message] = []
            for raw_output in raw_outputs:
                msg = self._ensure_source_output(raw_output, node.id)
                node.append_output(msg)
                output_messages.append(msg)

            # Use first output for context trace handling (backward compat)
            unified_output = output_messages[0] if output_messages else None

            context_trace_payload = None
            context_restored = False
            if unified_output is not None and isinstance(unified_output.metadata, dict):
                context_trace_payload = unified_output.metadata.get("context_trace")
            if node.context_window != 0 and context_trace_payload:
                context_restored = self._restore_context_trace(node, context_trace_payload)

            if node.context_window != -1:
                preserved_inputs = node.clear_input(preserve_kept=True, context_window=node.context_window)
                if preserved_inputs:
                    self.log_manager.debug(
                        f"Node {node.id} cleaned up its input context after execution (preserved {preserved_inputs} keep-marked inputs)"
                    )
                else:
                    self.log_manager.debug(
                        f"Node {node.id} cleaned up its input context after execution"
                    )

            if output_messages:
                self.log_manager.debug(
                    f"Node {node.id} processed {len(input_results)} inputs into {len(output_messages)} output(s)"
                )
            else:
                self.log_manager.debug(
                    f"Node {node.id} produced no output; downstream edges suppressed"
                )

            # Record node end
            output_text = ""
            if output_messages:
                if len(output_messages) == 1:
                    output_text = unified_output.text_content()
                else:
                    for idx, msg in enumerate(output_messages):
                        output_text += f"===== OUTPUT {idx} =====\n\n" + msg.text_content() + "\n\n"
                output_role = unified_output.role.value
                output_source = unified_output.metadata.get("source")
            else:
                output_text = ""
                output_role = "none"
                output_source = None

            self.log_manager.record_node_end(node.id, output_text if node.log_output else "", {
                "output_size": len(output_text),
                "output_count": len(output_messages),
                "output_role": output_role,
                "output_source": output_source
            })

            # Pass results to successor nodes via edges
            # For each output message, process all edges
            for output_msg in output_messages:
                for edge_link in node.iter_outgoing_edges():
                    self._process_edge_output(edge_link, output_msg, node)
            
            if output_messages and node.context_window != 0 and not context_restored:
                # Use first output for pseudo edge
                pseudo_condition = EdgeConditionConfig.from_dict("true", path=f"{node.path}.pseudo_edge")
                pseudo_link = EdgeLink(target=node, trigger=False)
                pseudo_link.condition_config = pseudo_condition
                pseudo_context = ConditionFactoryContext(
                    function_manager=self.function_manager,
                    log_manager=self.log_manager,
                )
                pseudo_link.condition_manager = build_edge_condition_manager(pseudo_condition, pseudo_context, self._get_execution_context())
                pseudo_link.condition = pseudo_condition.display_label()
                pseudo_link.condition_type = pseudo_condition.type
                for output_msg in output_messages:
                    self._process_edge_output(pseudo_link, output_msg, node)

    def _process_result(self, node: Node, input_payload: List[Message]) -> List[Message]:
        """Process a single input result using strategy pattern executors.

        This method delegates to specific node executors based on node type.
        Returns a list of messages (maybe empty if node suppresses output).
        """
        if not self.node_executors:
            raise RuntimeError("Node executors not initialized. Call _build_memories_and_thinking() first.")
        
        if node.type not in self.node_executors:
            raise ValueError(f"Unsupported node type: {node.type}")
        
        executor = self.node_executors[node.type]
        hook = self.runtime_context.workspace_hook
        workspace = self.runtime_context.code_workspace
        if hook:
            try:
                hook.before_node(node, workspace)
            except Exception:
                self.log_manager.warning("workspace hook before_node failed for %s", node.id)
        success = False
        try:
            result = executor.execute(node, input_payload)
            success = True
            return result
        finally:
            if hook:
                try:
                    hook.after_node(node, workspace, success=success)
                except Exception:
                    self.log_manager.warning("workspace hook after_node failed for %s", node.id)


    def _collect_all_outputs(self) -> None:
        """Collect final outputs from all nodes, especially sink nodes."""
        all_outputs = {}

        # For majority voting, we might want to collect differently
        if self.graph.is_majority_voting:
            # In majority voting mode, collect all outputs and the final majority result
            for node_id, node in self.graph.nodes.items():
                if node.output:
                    node_output = {
                        "node_id": node_id,
                        "node_type": node.node_type,
                        "predecessors_num": len(node.predecessors),
                        "successors_num": len(node.successors),
                        "results": [self._serialize_output_payload(item) for item in node.output]
                    }
                    all_outputs[f"node_{node_id}"] = node_output
            
            # Add the majority result
            if hasattr(self, 'majority_result'):
                all_outputs["majority_result"] = self.majority_result
        else:
            # Collect outputs from all nodes normally
            for node_id, node in self.graph.nodes.items():
                if node.output:
                    node_output = {
                        "node_id": node_id,
                        "node_type": node.node_type,
                        "predecessors_num": len(node.predecessors),
                        "successors_num": len(node.successors),
                        "results": [self._serialize_output_payload(item) for item in node.output]
                    }
                    all_outputs[f"node_{node_id}"] = node_output

        # Add graph summary
        all_outputs["graph_summary"] = {
            "total_nodes": len(self.graph.nodes),
            "total_edges": len(self.graph.edges),
            "total_transmissions": len([k for k in self.outputs.keys() if "->" in k]),
            "layers": len(self.graph.layers),
            "execution_completed": True,
            "is_majority_voting": self.graph.is_majority_voting
        }

        self.outputs.update(all_outputs)

    def get_final_output(self) -> str:
        final_message = self.get_final_output_message()
        return final_message.text_content() if final_message else ""

    def get_final_output_message(self) -> Message | None:
        if self.graph.is_majority_voting:
            if self.majority_result is None:
                return None
            if isinstance(self.majority_result, Message):
                return self.majority_result.clone()
            return self._create_message(MessageRole.ASSISTANT, str(self.majority_result), "MAJORITY_VOTE")

        final_node = self._get_final_node()
        if not final_node:
            return None
        if final_node.output:
            value = final_node.output[-1]
            if isinstance(value, Message):
                return value.clone()
            return self._create_message(MessageRole.ASSISTANT, str(value), final_node.id)
        return None

    def get_final_output_messages(self) -> List[Message]:
        """Return all messages from the final node."""
        if self.graph.is_majority_voting:
            msg = self.get_final_output_message()
            return [msg] if msg else []

        final_node = self._get_final_node()
        if not final_node:
            return []
        
        results = []
        for value in final_node.output:
            if isinstance(value, Message):
                results.append(value.clone())
            else:
                results.append(self._create_message(MessageRole.ASSISTANT, str(value), final_node.id))
        return results

    def _get_final_node(self) -> Node:
        """Return the explicitly configured end node, or sink node as fallback."""
        end_node_ids = self.graph.config.definition.end_nodes

        if end_node_ids:
            for end_node_id in end_node_ids:
                if end_node_id in self.graph.nodes:
                    node = self.graph.nodes[end_node_id]
                    # Check if node has output
                    if node.output:
                        return node

        # Fallback to default behavior - return sink node
        sink_node = [node for node in self.graph.nodes.values() if not node.successors]
        return sink_node[0] if sink_node else None

    def _restore_context_trace(self, node: Node, trace_payload: Any) -> bool:
        if not isinstance(trace_payload, list):
            return False

        restored = 0
        for entry in trace_payload:
            if not isinstance(entry, dict):
                continue
            try:
                message = Message.from_dict(entry)
                if message.role not in [MessageRole.USER, MessageRole.ASSISTANT]:
                    continue
            except Exception as exc:
                self.log_manager.warning(
                    f"Failed to deserialize context trace for node {node.id}: {exc}"
                )
                continue
            node.append_input(self._ensure_source(message, node.id))
            restored += 1

        if restored:
            self.log_manager.debug(
                f"Node {node.id} preserved {restored} messages from its tool execution trace"
            )
        return restored > 0

    def _payload_to_text(self, payload: Any) -> str:
        if isinstance(payload, Message):
            return payload.text_content()
        if payload is None:
            return ""
        return str(payload)

    def _serialize_output_payload(self, payload: Any) -> Any:
        if isinstance(payload, Message):
            return {"type": "message", "payload": payload.to_dict(include_data=False)}
        return {"type": "text", "payload": str(payload)}

    def _normalize_task_input(self, raw_input: Any) -> List[Message]:
        if isinstance(raw_input, list):
            messages: List[Message] = []
            for item in raw_input:
                if isinstance(item, Message):
                    messages.append(self._ensure_source(item, "TASK"))
                elif isinstance(item, str):
                    messages.append(self._create_message(MessageRole.USER, item, "TASK"))
            return messages or [self._create_message(MessageRole.USER, "", "TASK")]
        if isinstance(raw_input, Message):
            return [self._ensure_source(raw_input, "TASK")]
        return [self._create_message(MessageRole.USER, str(raw_input), "TASK")]

    def _ensure_source(self, message: Message, default_source: str) -> Message:
        cloned = message.clone()
        metadata = dict(cloned.metadata)
        metadata.setdefault("source", default_source)
        cloned.metadata = metadata
        return cloned

    def _create_message(self, role: MessageRole, content: str, source: str) -> Message:
        return Message(role=role, content=content, metadata={"source": source})

    def _ensure_source_output(self, message: Any, node_id: str) -> Message:
        if not isinstance(message, Message):
            return self._create_message(MessageRole.ASSISTANT, str(message), node_id)
        cloned = message.clone()
        metadata = dict(message.metadata)
        metadata.setdefault("source", node_id)
        cloned.metadata = metadata
        return cloned
