Source code for kodeagent.kodeagent

"""A minimalistic approach to building AI agents.
Implements ReAct and CodeAct agents, supported by Planner and Observer.
"""

import asyncio
import copy
import inspect
import json
import random
import re
import uuid
import warnings
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator, Callable
from dataclasses import KW_ONLY, dataclass, field
from datetime import datetime
from json import JSONDecodeError
from typing import Any, ClassVar

import json_repair
import litellm
import pydantic as pyd
import rich
from dotenv import load_dotenv
from tenacity import RetryError

from . import kutils as ku
from . import tracer
from .code_runner import CODE_ENV_NAMES, CodeRunner
from .file_tracker import OutputInterceptor, install_interceptor
from .history_formatter import (
    CodeActHistoryFormatter,
    HistoryFormatter,
    ReActHistoryFormatter,
)
from .models import (
    AGENT_RESPONSE_TYPES,
    AgentResponse,
    ChatMessage,
    CodeActChatMessage,
    ReActChatMessage,
    Task,
)
from .orchestrator import Observer, Planner
from .tracer import TRACING_TYPES, create_tracer_manager
from .usage_tracker import UsageTracker

# Install the global interceptor patch
install_interceptor()

load_dotenv()

warnings.simplefilter('once', UserWarning)
warnings.filterwarnings('ignore', message='.*Pydantic serializer warnings.*')

logger = ku.get_logger()

REACT_SYSTEM_PROMPT = ku.read_prompt('system/react.txt')
CODE_ACT_SYSTEM_PROMPT = ku.read_prompt('system/codeact.txt')
SALVAGE_RESPONSE_PROMPT = ku.read_prompt('salvage_response.txt')


# Regex for message parsing from LLM response text (case-insensitive, multiline)
THOUGHT_MATCH = re.compile(
    r'Thought:\s*(.+?)(?=\n(?:Action|Answer|Code):|$)', re.DOTALL | re.IGNORECASE
)
ACTION_MATCH = re.compile(r'Action:\s*(\w+)', re.IGNORECASE)
ARGS_MATCH = re.compile(r'Args:\s*(\{.+?\})', re.DOTALL | re.IGNORECASE)
ANSWER_MATCH = re.compile(r'Answer:\s*(.+?)(?=\nSuccessful:|$)', re.DOTALL | re.IGNORECASE)
SUCCESS_MATCH = re.compile(r'Successful:\s*(true|false)', re.IGNORECASE)
CODE_MATCH = re.compile(r'Code:\s*```(?:python)?\s*(.+?)\s*```', re.DOTALL | re.IGNORECASE)

MAX_RESPONSE_PARSING_ATTEMPTS = 3
MAX_TASK_FILES = 10


[docs] @dataclass class Agent(ABC): """An abstract agent. Base class for all types of agents.""" model_name: str _: KW_ONLY name: str | None = None description: str | None = None tools: list[Callable] = field(default_factory=list) litellm_params: dict = field(default_factory=dict) persona: str | None = None system_prompt: str | None = None max_iterations: int = 20 filter_tools_for_task: bool = False max_retries: int = ku.DEFAULT_MAX_LLM_RETRIES work_dir: str | None = None tracing_type: TRACING_TYPES | None = None # Internal state (not in __init__) id: uuid.UUID = field(init=False) tool_name_to_func: dict[str, Callable] = field(init=False) tool_names: set[str] = field(init=False) task: Task | None = field(init=False, default=None) chat_history: list[dict] = field(init=False, default_factory=list) usage_tracker: UsageTracker = field(init=False, default_factory=UsageTracker) tracer_manager: tracer.AbstractTracerManager = field(init=False) current_trace: tracer.AbstractObservation | None = field(init=False, default=None) planner: Planner | None = field(init=False, default=None) observer: Observer = field(init=False) is_visual_model: bool = field(init=False) task_output_files: list[str] = field(init=False, default_factory=list) msg_idx_of_new_task: int = field(init=False, default=0) final_answer_found: bool = field(init=False, default=False) _history_formatter: HistoryFormatter | None = field(init=False, default=None) _tool_descriptions_cache: dict[frozenset[str], str] = field(init=False, default_factory=dict) response_format_class: ClassVar[type[pyd.BaseModel]] = ChatMessage HISTORY_TRUNCATE_CHARS: ClassVar[int] = 1000 def __post_init__(self): """Initialize the agent's internal state after fields are set.""" self.id = uuid.uuid4() self.tool_name_to_func = {fn.__name__: fn for fn in self.tools} self.tool_names = set(self.tool_name_to_func) # Tracing initialization self.tracer_manager = create_tracer_manager(self.tracing_type) # Components initialization self.planner = Planner( model_name=self.model_name, litellm_params=self.litellm_params, max_retries=self.max_retries, usage_tracker=self.usage_tracker, tracer_manager=self.tracer_manager, ) self.observer = Observer( model_name=self.model_name, litellm_params=self.litellm_params, tool_names=self.tool_names, max_retries=self.max_retries, usage_tracker=self.usage_tracker, tracer_manager=self.tracer_manager, ) self.is_visual_model = llm_vision_support([self.model_name])[0] or False def __str__(self): """String representation of the agent.""" return f'Agent: {self.name} ({self.id}); LLM: {self.model_name}; Tools: {self.tools}' @property def current_plan(self) -> str | None: """Returns the current plan for the task.""" if not self.planner or not self.planner.plan: return None return self.planner.get_formatted_plan() @property def artifacts(self) -> list[str]: """Returns the list of output files generated during task execution.""" return self.task_output_files async def _augment_task_with_previous(self, current_task: str) -> str: """Augment current task with previous task context. Args: current_task: The current task description. Returns: Augmented task description with previous context. """ if not self.task: return current_task context_parts = [ f'\n## Previous Task Context\n\n**Previous Task**: {self.task.description}\n' ] # Add result if available, or salvage if not finished if not self.task.is_finished: # Task was interrupted or failed before finishing summary = await self.salvage_response() context_parts.append(f'**Summary of Progress**: {summary}\n') elif self.task.result: result_str = str(self.task.result) # Truncate if too long (> 2000 chars) if len(result_str) > 2000: result_str = result_str[:2000] + '... [TRUNCATED]' context_parts.append(f'**Result**: {result_str}\n') # Add status indicators if self.task.is_error: context_parts.append('**Status**: ❌ Failed\n') elif self.task.is_finished: context_parts.append('**Status**: ✅ Completed\n') # Add output files if any if self.task.output_files: files_str = ', '.join(self.task.output_files) context_parts.append(f'**Generated Files**: {files_str}\n') context_parts.append('\n---\n\n## Current Task\n\n') context_parts.append(current_task) return ''.join(context_parts) def _run_init( self, task: str, files: list[str] | None = None, task_id: str | None = None, chat_history: list[dict] | None = None, ) -> None: """Initialize the running of a task by an agent. Args: task: Task description. files: Optional files for the task. task_id: Optional task ID. chat_history: Optional pre-built OpenAI-compliant chat history to inject. When provided, this history becomes the base of ``self.chat_history`` (deep-copied to avoid mutating the caller's object). Raises: ValueError: If task is empty, files list is invalid, or the provided history fails OpenAI compliance validation. """ if not task or not task.strip(): raise ValueError('Task description cannot be empty!') if files and not isinstance(files, list): raise ValueError('Task files must be a list of file paths!') if files and len(files) > MAX_TASK_FILES: raise ValueError(f'Too many files provided for the task (max {MAX_TASK_FILES})!') # Validate and apply injected history if chat_history is not None: ku.validate_chat_history(chat_history, tool_names=self.tool_names) base = copy.deepcopy(chat_history) # If it has no system message at index 0, prepend the agent's own system prompt. if not (base and base[0].get('role') == 'system'): base.insert(0, {'role': 'system', 'content': self.get_system_prompt_content()}) self.chat_history = base # Track where injected context ends so new task messages are appended after self.msg_idx_of_new_task = len(self.chat_history) else: self.init_history() self.msg_idx_of_new_task = len(self.chat_history) self.task = Task(description=task, files=files) self.task_output_files = [] if task_id: self.task.id = task_id if self.planner: self.planner.reset() if self.observer: self.observer.reset() # Reset usage tracker for new task self.usage_tracker.reset() # Initialize root trace for the task self.current_trace = self.tracer_manager.start_trace( name=f'{self.__class__.__name__}', input_data={ 'task': task, 'files': files, 'task_id': str(self.task.id), }, ) self.final_answer_found = False
[docs] def add_output_file(self, path: str): """Record a file generated during task execution. Args: path: Absolute path to the generated file. """ if path not in self.task_output_files: self.task_output_files.append(path) if self.task: self.task.output_files = self.task.output_files + [path] logger.info('Recorded output file: %s', path)
[docs] @abstractmethod def parse_text_response(self, text: str) -> ChatMessage: """Parse a text response from the LLM into a ChatMessage."""
[docs] async def salvage_response(self) -> str: """When an agent fails to find an answer, salvage what information could be gathered.""" prompt = SALVAGE_RESPONSE_PROMPT.format( task=self.task.description, task_files='\n'.join(self.task.files) if self.task.files else '[None]', history=self.get_history(), ) salvaged_response = await ku.call_llm( model_name=self.model_name, litellm_params=self.litellm_params, messages=ku.make_user_message(prompt), trace_id=self.task.id, max_retries=self.max_retries, usage_tracker=self.usage_tracker, component_name='Agent.salvage', ) return salvaged_response
[docs] async def pre_run(self) -> AsyncIterator[AgentResponse]: """Hook intended to run before the main agent loop. This method acts as an asynchronous generator. Subclasses should override this to: - Initialize task-specific state or resources. - Validate inputs or preconditions. - Emit initial log messages or status updates (by yielding AgentResponse objects). If no setup is needed, the default implementation yields nothing. Returns: AsyncIterator[AgentResponse]: An iterator yielding agent responses (logs, steps, etc.). """ # This makes the method an async generator that yields nothing. # Required because the caller iterates over it with `async for`. if False: # pylint: disable=using-constant-test yield
[docs] async def post_run(self) -> AsyncIterator[AgentResponse]: """Hook intended to run after the main agent loop. This method acts as an asynchronous generator and is guaranteed to run (in a finally block). Subclasses should override this to: - Clean up resources (e.g. temporary files, connections). - Log final execution metrics or summaries. - Persist agent state. Returns: AsyncIterator[AgentResponse]: An iterator yielding agent responses. """ # This makes the method an async generator that yields nothing. # Required because the caller iterates over it with `async for`. if False: # pylint: disable=using-constant-test yield
[docs] @abstractmethod async def run( self, task: str, files: list[str] | None = None, task_id: str | None = None, max_iterations: int | None = None, recurrent_mode: bool = False, summarize_progress_on_failure: bool = True, chat_history: list[dict] | None = None, ) -> AsyncIterator[AgentResponse]: """Execute a task using the agent. Args: task: The task description. files: List of files associated with the task. task_id: Optional task ID. max_iterations: Optional maximum number of iterations. recurrent_mode: Whether to run in recurrent mode (augments task text with the previous task description and result). Mutually exclusive with ``chat_history``. summarize_progress_on_failure: Whether to summarize progress on failure. chat_history: Optional pre-built OpenAI-compliant history to inject as the base context for this task run. Mutually exclusive with ``recurrent_mode``. Returns: AsyncIterator[AgentResponse]: An iterator yielding agent responses. Raises: ValueError: If both ``recurrent_mode`` and ``chat_history`` are provided. """
[docs] def response( self, rtype: AGENT_RESPONSE_TYPES, value: Any, channel: str | None = None, metadata: dict[str, Any] | None = None, ) -> AgentResponse: """Prepare a response to be sent by the agent. Also, store the response in the task result if it is a final response. Args: rtype: Response type emitted by the agent. value: The current update from the agent. channel: The response channel. metadata: Any metadata associated with the update. Returns: A response from the agent. """ if rtype == 'final' and self.task: self.task.result = value return {'type': rtype, 'channel': channel, 'value': value, 'metadata': metadata}
async def _chat(self, response_format: type[pyd.BaseModel] | None = None) -> str | None: """Interact with the LLM using the agent's message history. Enhanced with retry logic for structured output failures. Args: response_format: Optional structured response format for the LLM. Returns: A chat response or an empty string. Raises: RetryError: If LLM call fails after max retries. Exception in case of error. """ formatted_messages = list(self.chat_history) # Avoid permanently polluting chat history for attempt in range(MAX_RESPONSE_PARSING_ATTEMPTS): try: chat_response: str = await ku.call_llm( model_name=self.model_name, litellm_params=self.litellm_params, messages=formatted_messages, response_format=response_format, trace_id=self.task.id if self.task else None, max_retries=self.max_retries, usage_tracker=self.usage_tracker, component_name='Agent', ) return chat_response or '' except RetryError: # LLM call failed after max retries; do not retry at this level raise except Exception as e: logger.warning( 'LLM call failed (attempt %d/%d): %s', attempt + 1, MAX_RESPONSE_PARSING_ATTEMPTS, str(e), ) if attempt < MAX_RESPONSE_PARSING_ATTEMPTS - 1: # Add feedback to help LLM correct itself await asyncio.sleep(random.uniform(0.5, 1.5)) feedback = ( f'Error: Previous response had issues: {str(e)}.' ' Please ensure your response follows the exact JSON schema provided.' f' [Timestamp={datetime.now()}]' ) formatted_messages.append({'role': 'user', 'content': feedback}) else: raise
[docs] def add_to_history(self, message: ChatMessage | dict): """Add a chat message to the agent's message history. Args: message: The chat message to add, either as a ChatMessage object or a dictionary. """ if isinstance(message, ChatMessage): # Use formatter if available if self._history_formatter: message_dict = self._history_formatter.pydantic_to_dict(message) else: message_dict = message.model_dump() else: message_dict = message # Merging consecutive user messages since most APIs expect a single user message # followed by an assistant response if ( self.chat_history and self.chat_history[-1].get('role') == 'user' and message_dict.get('role') == 'user' ): prev = self.chat_history[-1] prev_content = prev.get('content', '') curr_content = message_dict.get('content', '') # Convert to lists for merging if not isinstance(prev_content, list): prev_content = [{'type': 'text', 'text': str(prev_content)}] if not isinstance(curr_content, list): curr_content = [{'type': 'text', 'text': str(curr_content)}] prev['content'] = prev_content + curr_content # Update files metadata if present if message_dict.get('files'): prev_files = prev.get('files') or [] if not isinstance(prev_files, list): prev_files = [prev_files] prev['files'] = prev_files + message_dict['files'] else: self.chat_history.append(message_dict)
[docs] def get_tools_description(self, tools: list[Any] | None = None) -> str: """Generate a description of all the tools available to the agent. Required args are marked. Args: tools: Optional list of tools to describe. If None, describes all available tools. Returns: A formatted string describing each tool, its parameters, and example usage. """ # Create a cache key based on the tools being described tools_to_describe = tools if tools is not None else self.tools cache_key = frozenset(t.__name__ for t in tools_to_describe) if cache_key in self._tool_descriptions_cache: return self._tool_descriptions_cache[cache_key] description = '' filtered_tool_names = {t.__name__ for t in tools_to_describe} for t in self.tools: if t.__name__ in filtered_tool_names: description += ku.build_tool_schema(t, just_first_line=False, as_text=True) description += '\n---\n' self._tool_descriptions_cache[cache_key] = description return description
@property def purpose(self) -> str: """Describe the name, purpose of, and tools available to an agent.""" description = f'Name: {self.name}\nDescription: {self.description or "N/A"}' description += f'\nTools available to this agent (`{self.name}`):' description += f'\n{self.get_tools_description()}' return description
[docs] @staticmethod def normalize_content(content: Any) -> str: """Convert message content to a readable string for Observer. Args: content: The content to normalize, which can be of various types (str, list, dict). Returns: A string representation of the content suitable for logging and observation. """ if content is None: return '' if isinstance(content, str): return content # OpenAI-style multimodal list if isinstance(content, list): parts = [] for item in content: if isinstance(item, dict) and item.get('type') == 'text': parts.append(item.get('text', '')) else: parts.append(str(item)) return '\n'.join(parts) return str(content)
def _get_last_tool_call_id(self) -> str | None: """Extract the tool_call_id from the last message in chat_history. Returns: The tool call ID if present, otherwise None. """ if not self.chat_history: return None last_msg = self.chat_history[-1] tool_calls = last_msg.get('tool_calls', []) return tool_calls[0].get('id') if tool_calls else None
[docs] def get_history(self) -> str: """Get a formatted string of the agent's message history, excluding the system prompt and truncating long messages. Returns: A string representation of the message history for logging and observation. """ segments = [] for msg in self.chat_history[1:]: # Skip system prompt content = Agent.normalize_content(msg.get('content', '')) if len(content) > self.HISTORY_TRUNCATE_CHARS: content = content[: self.HISTORY_TRUNCATE_CHARS] + '... [TRUNCATED]' segments.append(f'[{msg.get("role")}]: {content}') return '\n'.join(segments)
[docs] def get_system_prompt_content(self) -> str: """Return the formatted system prompt string for this agent. Subclasses should override this to include additional format variables. Returns: The formatted system prompt. """ if not self.system_prompt: return '' return self.system_prompt.format( persona=self.persona or '', tools=self.get_tools_description() )
[docs] def clear_history(self): """Clear the agent's message history.""" self.chat_history = []
[docs] def init_history(self): """Initialize the agent's message history, e.g., with a system prompt.""" self.clear_history()
[docs] def get_usage_report(self, include_breakdown: bool = True) -> str: """Get a formatted report of LLM usage for the current/last task. Args: include_breakdown: Whether to include per-component breakdown. Returns: Formatted string with usage statistics. """ return self.usage_tracker.format_report(include_breakdown=include_breakdown)
[docs] def get_usage_metrics(self) -> dict: """Get raw usage metrics as a dictionary. Returns: Dictionary with total and per-component usage data. """ total = self.usage_tracker.get_total_usage() by_component = self.usage_tracker.get_usage_by_component() return { 'total': total.model_dump(), 'by_component': {k: v.model_dump() for k, v in by_component.items()}, }
[docs] @dataclass class ReActAgent(Agent): """Reasoning and Acting agent with thought-action-observation loop.""" _: KW_ONLY system_prompt: str = REACT_SYSTEM_PROMPT response_format_class: ClassVar[type[pyd.BaseModel]] = ReActChatMessage def __post_init__(self): """Perform ReAct-specific setup.""" super().__post_init__() if self.tools: logger.info('Created agent: %s; tools: %s', self.name, [t.__name__ for t in self.tools]) self._history_formatter = ReActHistoryFormatter() async def _update_plan(self): """Update the plan based on the last thought and observation.""" last_thought = None last_observation = None # Traverse backwards to find the most recent thought and observation for msg in reversed(self.chat_history): # Look for observation (Tool response) if last_observation is None and msg.get('role') == 'tool': last_observation = msg.get('content') # Look for thought (Assistant response with a 'thought' field) # We use distinct checks for role and attribute to be robust to custom message types if last_thought is None and msg.get('role') == 'assistant': # Use get for loose coupling - accepts any message object with a '_thought' thought = msg.get('_thought') if thought: last_thought = thought # If the message contains a final answer, treat it as the observation # This ensures the Planner sees the final result, allowing it to mark "Output..." # steps as done # Final answer messages have content but no tool calls if not msg.get('tool_calls'): final_answer = msg.get('content') if final_answer: last_observation = f'Final Answer: {final_answer}' if last_thought and last_observation: break # Only update plan if both thought and observation are found if not (last_thought and last_observation): logger.debug( 'Skipping plan update: missing thought=%s or observation=%s', bool(last_thought), bool(last_observation), ) return await self.planner.update_plan( thought=last_thought, observation=last_observation, task_id=self.task.id, parent_trace=self.current_trace, )
[docs] def init_history(self): """Initialize the agent's message history with the system prompt.""" self.chat_history = [{'role': 'system', 'content': self.get_system_prompt_content()}] self.final_answer_found = False
async def _create_initial_plan(self): """Helper method to create the initial plan. Raises: RuntimeError: If plan creation fails. """ try: await self.planner.create_plan( self.task, self.__class__.__name__, parent_trace=self.current_trace ) except RetryError as err: logger.warning('Max retries reached during plan creation.') error_msg = ( 'Unable to start solving the task: Rate limit exceeded while ' 'creating the initial plan. Please try again later.' ) self.add_to_history(ChatMessage(role='assistant', content=error_msg)) raise RuntimeError(error_msg) from err
[docs] async def pre_run(self) -> AsyncIterator[AgentResponse]: """Perform setup before the main run loop. - Initialize task and history (or ensure injected history has a system prompt). - Create initial plan. Returns: AsyncIterator[AgentResponse]: Iterator of agent responses. """ self.final_answer_found = False yield self.response( rtype='log', value=f'Solving task: `{self.task.description}`', channel='run' ) try: await self._create_initial_plan() except RuntimeError as e: yield self.response( rtype='final', value=str(e), channel='run', metadata={'final_answer_found': False, 'is_error': True}, ) return yield self.response( rtype='log', value=f'Plan:\n{self.planner.get_formatted_plan()}', channel='run' ) self.add_to_history( ChatMessage( role='user', content=f'New Task:\n{self.task.description}', files=self.task.files ) ) self.add_to_history( ChatMessage(role='user', content=f'Plan:\n{self.planner.get_formatted_plan()}') )
[docs] async def post_run(self) -> AsyncIterator[AgentResponse]: """Perform cleanup after the main run loop. - Calculate and log usage metrics. - End current trace/span. - Flush tracer. Returns: AsyncIterator[AgentResponse]: Iterator of agent responses. """ if self.task: usage_data = self.get_usage_metrics() self.task.total_llm_calls = usage_data['total']['call_count'] self.task.total_prompt_tokens = usage_data['total']['total_prompt_tokens'] self.task.total_completion_tokens = usage_data['total']['total_completion_tokens'] self.task.total_tokens = usage_data['total']['total_tokens'] self.task.total_cost = usage_data['total']['total_cost'] self.task.usage_by_component = usage_data['by_component'] # Log usage summary logger.info( 'Task %s usage: %s', self.task.id, self.get_usage_report(include_breakdown=False) ) # End the root trace if self.current_trace: is_error = getattr(self.task, 'is_error', False) self.current_trace.end( result=self.task.result, metadata={ 'is_error': is_error, 'total_tokens': self.task.total_tokens, 'total_cost': float(self.task.total_cost), 'steps_taken': getattr(self.task, 'steps_taken', 0), }, ) # Flush tracer manager if self.tracer_manager: self.tracer_manager.flush() yield self.response(rtype='log', value='Task execution finished', channel='run')
[docs] async def run( self, task: str, files: list[str] | None = None, task_id: str | None = None, max_iterations: int | None = None, recurrent_mode: bool = False, summarize_progress_on_failure: bool = True, chat_history: list[dict] | None = None, ) -> AsyncIterator[AgentResponse]: """Solve a task using ReAct's TAO loop (or CodeAct's TCO loop). Args: task: A task to be solved by the agent. files: An optional list of files related to the task. task_id: Optional task ID. max_iterations: Optional max iterations for the task. recurrent_mode: If True, augment task with previous task context. Mutually exclusive with ``chat_history``. summarize_progress_on_failure: Whether to summarize progress if the agent fails to solve the task in max iterations. chat_history: Optional pre-built OpenAI-compliant history to inject as the base context for this run. The new task message is appended on top. Mutually exclusive with ``recurrent_mode``. Returns: Step updates on the task and the final response. Raises: ValueError: If task is empty, too many files provided, both ``recurrent_mode`` and ``chat_history`` are set, or the provided history fails OpenAI compliance validation. RetryError: If LLM calls fail after max retries. """ if recurrent_mode and chat_history is not None: raise ValueError( 'recurrent_mode and chat_history are mutually exclusive.' ' Use one or the other, not both.' ) if recurrent_mode and self.task is not None: task = await self._augment_task_with_previous(task) logger.debug('Recurrent mode enabled: augmented task with previous context') # 1. run() calls _run_init() # 2. pre_run() calls init_history() (or applies injected history) and does # the logging/planning self._run_init(task, files, task_id, chat_history=chat_history) # Execute pre-run hook async for response in self.pre_run(): yield response # Check if pre_run encountered specific error signal if response['type'] == 'final' and response['metadata'].get('is_error'): return max_iterations = max_iterations or self.max_iterations steps_taken = 0 try: # Main Loop for idx in range(max_iterations): steps_taken = idx + 1 logger.debug('ITERATION %d/%d', steps_taken, max_iterations) yield self.response( rtype='log', channel='run', value=f'* Executing step {steps_taken}' ) try: async for update in self._think(): yield update async for update in self._act(): yield update except asyncio.CancelledError: logger.info('Task cancelled by consumer') raise except RetryError as e: logger.warning('Max retries reached for LLM call: %s', e) error_msg = 'Rate limit exceeded. Unable to proceed.' self.add_to_history(ChatMessage(role='assistant', content=error_msg)) yield self.response( rtype='final', value=error_msg, channel='run', metadata={'final_answer_found': False, 'is_error': True}, ) return if self.final_answer_found: break plan_before_update = None if self.planner.plan: plan_before_update = self.current_plan try: await self._update_plan() except RetryError: logger.warning('Max retries reached during plan update.') error_msg = 'Rate limit exceeded during plan update. Unable to proceed.' self.add_to_history(ChatMessage(role='assistant', content=error_msg)) yield self.response( rtype='final', value=error_msg, channel='run', metadata={'final_answer_found': False, 'is_error': True}, ) return self.add_to_history( ChatMessage( role='user', content=f'Plan progress:\n{self.planner.get_formatted_plan()}', ) ) try: correction_msg = await self.observer.observe( task=self.task, history=self.get_history(), plan_before=plan_before_update, plan_after=self.current_plan, iteration=idx + 1, parent_trace=self.current_trace, ) except RetryError: logger.warning('Observer failed due to rate limit. Skipping observation.') correction_msg = None if correction_msg: self.add_to_history( ChatMessage(role='user', content=f'Observation: {correction_msg}') ) yield self.response(rtype='log', value=correction_msg, channel='observer') # Loop iteration over if not self.final_answer_found: failure_msg = ( f'Sorry, I failed to get a complete answer even after {steps_taken} steps!' ) if summarize_progress_on_failure: try: progress_summary = await self.salvage_response() failure_msg += ( f"\n\nHere's a summary of progress for the task:\n{progress_summary}" ) except RetryError: logger.warning( 'Failed to salvage response due to rate limit.' ' Skipping progress summary.' ) yield self.response( rtype='final', value=failure_msg, channel='run', metadata={'final_answer_found': False}, ) self.add_to_history(ChatMessage(role='assistant', content=failure_msg)) else: if self.planner.plan: try: await self._update_plan() except RetryError: logger.warning( 'Failed to update plan on final iteration due to ' 'rate limit. Skipping final plan update.' ) except asyncio.CancelledError: logger.warning('Iteration cancelled') finally: if self.task: self.task.steps_taken = steps_taken # Execute post-run hook async for response in self.post_run(): yield response
async def _think(self) -> AsyncIterator[AgentResponse]: """Think about the next step using the new structured response format. Returns: AsyncIterator[AgentResponse]: An async iterator of AgentResponse objects. """ # Create a generation span for the LLM call with self.tracer_manager.start_generation( parent=self.current_trace, name='think', input_data={ 'model': self.model_name, 'messages_count': len(self.chat_history), }, ) as gen_span: thought = await self._record_thought(ReActChatMessage) if thought: gen_span.update( status='success', has_thought=bool(thought.thought), has_action=bool(getattr(thought, 'action', None)), has_final_answer=bool(getattr(thought, 'final_answer', None)), output={ 'thought': thought.thought, 'action': getattr(thought, 'action', None), }, ) else: gen_span.update( status='error', error='Failed to parse response', output='parse_failure', is_error=True, ) yield self.response(rtype='step', value=thought, channel='_think') async def _record_thought( self, response_format_class: type[pyd.BaseModel] ) -> ReActChatMessage | CodeActChatMessage | None: """Record the agent's thought with improved error handling and fallback parsing. Now returns ChatMessage directly instead of separate Response class. Args: response_format_class (type[pyd.BaseModel]): The response format class. Returns: Optional[Union[ReActChatMessage, CodeActChatMessage]]: The agent's thought. """ for attempt in range(3): try: thought_response: str = await self._chat(response_format=response_format_class) try: thought_response_cleaned = ku.clean_json_string(thought_response) try: parsed_json = json.loads(thought_response_cleaned) except JSONDecodeError as e: logger.warning( 'Initial JSON parse failed: %s. Attempting repair...', str(e) ) thought_response_cleaned = json_repair.repair_json(thought_response_cleaned) parsed_json = json.loads(thought_response_cleaned) if 'args' in parsed_json: if isinstance(parsed_json['args'], str): parsed_json['args'] = ku.clean_json_string(parsed_json['args']) elif isinstance(parsed_json['args'], dict): # Ensure deeply nested args are converted to string for Pydantic parsed_json['args'] = json.dumps(parsed_json['args']) # Validate and create message directly # CRITICAL: Always force role to 'assistant' for model responses. # Some models (like Gemini 2.5) may hallucinate "role": "user" in # structured JSON, which breaks subsequent tool response pairings. parsed_json['role'] = 'assistant' msg = response_format_class.model_validate(parsed_json) logger.debug('Successfully parsed structured JSON response') except (JSONDecodeError, pyd.ValidationError) as parse_error: logger.warning( 'Structured parsing failed: %s: %s. Falling back to text parsing...', type(parse_error).__name__, parse_error, ) try: msg = self.parse_text_response(thought_response) msg.role = 'assistant' logger.info('Successfully parsed response using text fallback') except Exception as text_error: logger.error('Text parsing also failed: %s', str(text_error)) raise ValueError( f'Both structured and text parsing failed. ' f'Structured error: {parse_error}. ' f'Text parsing error: {text_error}' ) from text_error self.add_to_history(msg) return msg except RetryError: raise except ValueError as ex: logger.error( 'Parsing error in _record_thought (attempt %d/3): %s', attempt + 1, str(ex) ) if attempt < 2: await asyncio.sleep(random.uniform(0.5, 1.0)) feedback_message = ( f'!Parsing Error: {str(ex)}. ' 'Please ensure your response follows the required format. ' f'[Timestamp={datetime.now()}]' ) self.add_to_history(ChatMessage(role='user', content=feedback_message)) else: logger.error('Failed to parse response after 3 attempts') return None except Exception as ex: logger.exception( 'Unexpected error in _record_thought (attempt %d/3): %s', attempt + 1, str(ex) ) if attempt < 2: await asyncio.sleep(random.uniform(0.5, 1.0)) feedback_message = ( f'!Error: {type(ex).__name__}: {str(ex)}. [Timestamp={datetime.now()}]' ) self.add_to_history(ChatMessage(role='user', content=feedback_message)) else: logger.error('Failed after 3 attempts due to unexpected errors') return None return None def _handle_final_answer( self, final_answer: str, task_successful: bool, act_span: Any, ) -> None: """Mark the task as finished and close the tracing span for a final answer. Args: final_answer: The final answer text. task_successful: Whether the task was completed successfully. act_span: The active tracing span to close. """ self.final_answer_found = True self.task.is_finished = True self.task.is_error = not task_successful act_span.update( status='success', operation='final_answer', task_successful=task_successful, output=final_answer, metadata={'task_successful': task_successful}, ) def _handle_missing_thought(self, act_span: Any) -> None: """Log an error and close the act span when the thought field is missing. Args: act_span: The active tracing span to close. """ act_span.update( status='error', error='Missing or empty thought field', output='malformed_response', ) self.add_to_history( ChatMessage( role='user', content=( '* Error: Response must have a valid `thought` field. ' 'Please respond strictly following the schema.' ), ) ) async def _act(self) -> AsyncIterator[AgentResponse]: """Take action based on the agent's previous thought. Now handles explicit FINISH action with proper error handling and hierarchical tracing of the act operation, tool execution, and errors. """ prev_msg_dict = self.chat_history[-1] # Extract fields directly from dictionary to avoid strict Pydantic validation failures # on malformed or incomplete messages thought = prev_msg_dict.get('_thought') action = prev_msg_dict.get('_action') args = prev_msg_dict.get('_args') task_successful = prev_msg_dict.get('_task_successful', True) # Determine final answer based on action or content presence final_answer = prev_msg_dict.get('content') if action == 'FINISH' else None # Start root span for the entire act operation with self.tracer_manager.start_span( parent=self.current_trace, name='act', input_data={'thought': thought}, ) as act_span: # Check for malformed response if not thought: self._handle_missing_thought(act_span) return if final_answer: self._handle_final_answer(final_answer, task_successful, act_span) yield self.response( rtype='final', value=final_answer, channel='_act', metadata={'final_answer_found': task_successful}, ) else: # Tool execution tool_name = action tool_args = args tool_args_dict = {} # Validate tool call has required fields if not tool_name or not tool_args: error_msg = 'Error: Both action and args must be provided for tool calls.' act_span.update( status='error', operation='tool_validation_failed', error=error_msg, output='validation_error', is_error=True, ) # CRITICAL: Use role='tool' instead of role='user' to maintain # conversation format self.add_to_history(ChatMessage(role='tool', content=error_msg)) yield self.response( rtype='step', value=error_msg, channel='_act', metadata={'is_error': True}, ) return try: # CRITICAL: Parse the JSON string into a dictionary # The args field is a JSON string, not a dict if isinstance(tool_args, str): tool_args = tool_args.strip().strip('`').strip() if tool_args.startswith('json'): tool_args = tool_args[4:].strip() try: tool_args_dict = json.loads(tool_args) except JSONDecodeError: logger.warning('JSON decode failed, attempting repair...') tool_args_dict = json_repair.loads(tool_args) elif isinstance(tool_args, dict): # Handle case where args might already be a dict (defensive) tool_args_dict = tool_args # Validate it's actually a dictionary if not isinstance(tool_args_dict, dict): error_msg = f'Tool args must be a dict, got {type(tool_args_dict).__name__}' act_span.update( status='error', operation='args_validation_failed', error=error_msg, output='args_error', is_error=True, ) self.add_to_history(ChatMessage(role='tool', content=error_msg)) yield self.response( rtype='step', value=error_msg, channel='_act', metadata={'is_error': True}, ) return # Execute tool if tool_name in self.tool_names: logger.debug( '🛠 Running tool: %s with args: %s', tool_name, tool_args_dict, ) # Create nested span for tool execution with self.tracer_manager.start_span( parent=act_span, name=tool_name, input_data=tool_args_dict, ) as tool_span: # Intercept file creation during tool execution with OutputInterceptor() as interceptor: result = self.tool_name_to_func[tool_name](**tool_args_dict) # Record any files captured by the interceptor generated_files = interceptor.get_manifest() for f in generated_files: self.add_output_file(f) tool_span.update( status='success', file_count=len(generated_files), output=str(result), generated_files=generated_files, ) # Get tool call ID for correct pairing in history tool_call_id = self._get_last_tool_call_id() # Always use role='tool' for tool results self.add_to_history( { 'role': 'tool', 'content': str(result), 'tool_call_id': tool_call_id, } ) act_span.update( status='success', operation='tool_execution', tool=tool_name, output=str(result), metadata={ 'tool': tool_name, 'args': tool_args_dict, 'generated_files': generated_files, }, ) yield self.response( rtype='step', value=result, channel='_act', metadata={ 'tool': tool_name, 'args': tool_args_dict, 'generated_files': generated_files, }, ) else: result = ( f'Error: Tool "{tool_name}" not found! ' f'Available tools: {", ".join(sorted(self.tool_names))}. ' 'Please use an exact tool name from the list.' ) act_span.update( status='error', operation='tool_not_found', tool=tool_name, error=result, output='tool_not_found', is_error=True, ) # Get tool call ID for correct pairing in history tool_call_id = self._get_last_tool_call_id() # Use role='tool' for tool errors too self.add_to_history( { 'role': 'tool', 'content': result, 'tool_call_id': tool_call_id, } ) yield self.response( rtype='step', value=result, channel='_act', metadata={'is_error': True}, ) except Exception as ex: error_msg = ( f'*** Error: Tool execution failed: {type(ex).__name__}: ' f'{str(ex)}\n' f'Tool: {tool_name}\n' f'Args provided: {tool_args_dict}\n' f'Please check the tool signature and try again with ' f'correct arguments.' ) logger.error(error_msg) act_span.update( status='error', operation='tool_execution_exception', tool=tool_name, error_type=type(ex).__name__, error_message=str(ex), output='exception', is_error=True, error=error_msg, ) # Use role='tool' to maintain proper conversation structure self.add_to_history(ChatMessage(role='tool', content=error_msg)) yield self.response( rtype='step', value=error_msg, channel='_act', metadata={'is_error': True}, )
[docs] def parse_text_response(self, text: str) -> ReActChatMessage: """Parse text-based response when structured output fails. Uses regex to extract components from free-form text. This is a FALLBACK for when JSON parsing completely fails. It only handles text format like: Thought: ... Action: ... Args: {...} OR Thought: ... Answer: ... Successful: true/false Args: text: The raw text response from the LLM. Returns: Parsed structured response. Raises: ValueError: If unable to parse action or final_answer fields. """ logger.info('Falling back to text-based regex parsing') # Extract thought - REQUIRED thought_match = THOUGHT_MATCH.search(text) if not thought_match: raise ValueError( f"Could not extract 'Thought:' field from response. " f'Response must start with reasoning. Text: {text[:200]}...' ) thought = thought_match.group(1).strip() # Parse ReAct response action_match = ACTION_MATCH.search(text) action = action_match.group(1).strip() if action_match else None args = None # Try to extract and validate args args_match = ARGS_MATCH.search(text) if args_match: args_str = args_match.group(1).strip() args = ku.clean_json_string(args_str) # Validate it's actually valid JSON try: json.loads(args) except (JSONDecodeError, Exception) as e: logger.warning('Args extraction failed, invalid JSON: %s', str(e)) args = None # Extract final answer and success status answer_match = ANSWER_MATCH.search(text) success_match = SUCCESS_MATCH.search(text) final_answer = answer_match.group(1).strip() if answer_match else None task_successful = success_match.group(1).lower() == 'true' if success_match else False # Validation: Must have either (action + args) OR final_answer if final_answer: # This is a final answer return ReActChatMessage( role='assistant', thought=thought, action='FINISH', args=None, final_answer=final_answer, task_successful=task_successful, ) if action: # This is a tool call if action == 'FINISH': raise ValueError( f"Action is 'FINISH' but no Answer field found. Text: {text[:200]}..." ) if not args: raise ValueError( f"Action '{action}' specified but no valid Args found. Text: {text[:200]}..." ) return ReActChatMessage( role='assistant', thought=thought, action=action, args=args, final_answer=None, task_successful=False, ) raise ValueError( f'Could not extract valid Action or Answer from response. Text: {text[:300]}...' )
[docs] @dataclass class CodeActAgent(ReActAgent): """CodeAct agent using Thought-Code-Observation loop.""" _: KW_ONLY run_env: CODE_ENV_NAMES = 'host' allowed_imports: list[str] | None = None pip_packages: str | None = None timeout: int = 30 env_vars_to_set: dict[str, str] | None = None system_prompt: str = CODE_ACT_SYSTEM_PROMPT response_format_class: ClassVar[type[pyd.BaseModel]] = CodeActChatMessage def __post_init__(self): """Initialize CodeAct agent.""" super().__post_init__() self._history_formatter = CodeActHistoryFormatter() self.tools_source_code: str = 'from typing import *\n\n' if self.tools: for t in self.tools: self.tools_source_code += inspect.getsource(t).replace('@tool\n', '', 1) + '\n' if not self.allowed_imports: self.allowed_imports = [] self.allowed_imports = self.allowed_imports + ['datetime', 'typing', 'mimetypes'] self.code_runner = CodeRunner( env=self.run_env, allowed_imports=self.allowed_imports, model_name=self.model_name, pip_packages=self.pip_packages, timeout=self.timeout, env_vars_to_set=self.env_vars_to_set, litellm_params=self.litellm_params, work_dir=self.work_dir, usage_tracker=self.usage_tracker, tool_names=self.tool_names, )
[docs] def parse_text_response(self, text: str) -> CodeActChatMessage: """Uses regex to extract components from free-form text and parse into messages. Returns: Parsed structured response. Raises: ValueError: If unable to parse either code or final_answer fields. """ logger.info('Falling back to text-based regex parsing for CodeAct') # NOTE: We intentionally do NOT call super().parse_text_response(text) here. # The parent ReActAgent.parse_text_response() enforces the presence of either # "Action" (for tools) or "Answer" (final response). # # A valid CodeAct response might contain "Code" but NO "Action" or "Answer". # If we called super(), it would recognize the "Thought" but fail to find # "Action"/"Answer" and raise a ValueError before we could check for "Code". # Therefore, we independently parse Thought + Code + Answer here. # Extract thought - REQUIRED thought_match = THOUGHT_MATCH.search(text) if not thought_match: raise ValueError( f"Could not extract 'Thought:' field from response. " f'Response must start with reasoning. Text: {text[:200]}...' ) thought = thought_match.group(1).strip() # Parse CodeAct response code_match = CODE_MATCH.search(text) code = code_match.group(1).strip() if code_match else None # Also try to find code without markdown blocks if not code: code_alt_match = re.search( r'Code:\s*(.+?)(?=\n(?:Thought|Answer):|$)', text, re.DOTALL | re.IGNORECASE ) if code_alt_match: code_raw = code_alt_match.group(1).strip() code = code_raw.strip('`').strip() # Extract final answer and success status answer_match = ANSWER_MATCH.search(text) success_match = SUCCESS_MATCH.search(text) final_answer = answer_match.group(1).strip() if answer_match else None task_successful = success_match.group(1).lower() == 'true' if success_match else False # Validation: Must have either code OR final_answer if final_answer: return CodeActChatMessage( role='assistant', thought=thought, code=None, final_answer=final_answer, task_successful=task_successful, ) if code: return CodeActChatMessage( role='assistant', thought=thought, code=code, final_answer=None, task_successful=False, ) raise ValueError( f'Could not extract valid Code or Answer from response. Text: {text[:300]}...' )
[docs] def get_system_prompt_content(self) -> str: """Return the formatted system prompt string for CodeAct agent.""" return self.system_prompt.format( persona=self.persona or '', tools=self.get_tools_description(), authorized_imports='\n'.join([f'- {imp}' for imp in (self.allowed_imports or [])]), )
[docs] def init_history(self): """Initialize message history with system prompt for CodeAct agent.""" super().init_history()
async def _think(self) -> AsyncIterator[AgentResponse]: """Think step for CodeAct agent. Creates a generation span for LLM code generation, tracking model and code output with full hierarchical tracing. """ # Create generation span for code generation with self.tracer_manager.start_generation( parent=self.current_trace, name='think_code', input_data={ 'model': self.model_name, 'messages_count': len(self.chat_history), }, ) as gen_span: msg = await self._record_thought(CodeActChatMessage) msg_dict = self.chat_history[-1] if (msg and self.chat_history) else None if msg_dict: gen_span.update( status='success', has_thought=bool(msg_dict.get('_thought')), has_code=bool(msg_dict.get('_code')), has_final_answer=bool(msg_dict.get('content')) if not msg_dict.get('_code') else False, ) else: gen_span.update( status='error', error='Failed to parse response', output='parse_failure', is_error=True, ) yield self.response(rtype='step', value=msg_dict, channel='_think') async def _act(self) -> AsyncIterator[AgentResponse]: """Execute code based on CodeActAgent's previous thought. Creates hierarchical spans for code execution with full tracing of stdout, stderr, exit status, and generated files. """ prev_msg_dict = self.chat_history[-1] # Extract fields directly from dictionary thought = prev_msg_dict.get('_thought') code = prev_msg_dict.get('_code') task_successful = prev_msg_dict.get('_task_successful', True) # Final answer is content if no code block final_answer = prev_msg_dict.get('content') if not code else None # Start root span for the entire act operation with self.tracer_manager.start_span( parent=self.current_trace, name='act', input_data={'thought': thought}, ) as act_span: if not thought: self._handle_missing_thought(act_span) return if final_answer: self._handle_final_answer(final_answer, task_successful, act_span) yield self.response( rtype='final', value=final_answer, channel='_act', metadata={'final_answer_found': task_successful}, ) else: try: code_to_run = code.strip() if code else '' code_to_run = code_to_run.replace('```py', '') code_to_run = code_to_run.replace('```python', '') code_to_run = code_to_run.replace('```', '').strip() logger.debug( '🛠 Running code [truncated]: ... %s', code_to_run[-100:] if code_to_run else '', ) # Create nested span for code execution with self.tracer_manager.start_span( parent=act_span, name='code_execution', input_data={'code_length': len(code_to_run)}, ) as code_span: stdout, stderr, exit_status, generated_files = await self.code_runner.run( self.tools_source_code, code_to_run, self.task.id ) # Download files from remote environment if necessary if generated_files: files = await self.code_runner.download_files_from_remote( generated_files ) for f in files: self.add_output_file(f) code_span.update( status='success' if exit_status == 0 else 'error', exit_status=exit_status, has_stdout=bool(stdout), has_stderr=bool(stderr), file_count=len(generated_files), output={ 'exit_status': exit_status, 'stdout_lines': len(stdout.split('\n')) if stdout else 0, 'stderr_lines': len(stderr.split('\n')) if stderr else 0, }, generated_files=generated_files, is_error=exit_status != 0, ) observation = f'{stdout}\n{stderr}'.strip() # Get tool call ID for correct pairing in history tool_call_id = self._get_last_tool_call_id() msg = {'role': 'tool', 'content': observation, 'tool_call_id': tool_call_id} self.add_to_history(msg) act_span.update( status='success' if exit_status == 0 else 'warning', operation='code_execution', exit_status=exit_status, output=observation[:500], metadata={ 'is_error': exit_status != 0, 'generated_files': generated_files, 'exit_status': exit_status, }, ) yield self.response( rtype='step', value=observation, channel='_act', metadata={ 'is_error': exit_status != 0, 'generated_files': generated_files, }, ) except Exception as ex: error_msg = f'*** Error running code: {type(ex).__name__}: {str(ex)}' logger.error(error_msg) act_span.update( status='error', operation='code_execution_exception', error_type=type(ex).__name__, error_message=str(ex), output='exception', is_error=True, error=error_msg, ) # Respond as the pseudo "tool" # Get tool call ID for correct pairing in history tool_call_id = self._get_last_tool_call_id() tool_msg = {'role': 'tool', 'content': error_msg, 'tool_call_id': tool_call_id} self.add_to_history(tool_msg) yield self.response( rtype='step', value=error_msg, channel='_act', metadata={'is_error': True}, )
[docs] def llm_vision_support(model_names: list[str]) -> list[bool]: """Check whether images can be used with given LLMs. Args: model_names (list[str]): List of LLM names. Returns: list[bool]: List of booleans indicating whether each LLM supports vision. """ status = [litellm.supports_vision(model=model) for model in model_names] for model, value in zip(model_names, status): print(f'- Vision supported by {model}: {value}') return status