Source code for kodeagent.fca

"""Function Calling Agent. This module is optimized for Small Language Models (SLMs).
The FC agent uses native function calling to solve tasks. Some of the data structures are
reproduced here to keep this module self-contained and optimized.
"""

import asyncio
import copy
import json
import logging
import re
from collections.abc import AsyncIterator, Callable
from typing import Any, Literal

import litellm

from . import kutils as ku
from . import tools as dtools
from .models import AgentResponse, Task
from .orchestrator import Planner

logger = logging.getLogger(__name__)


FCA_SYSTEM_PROMPT = ku.read_prompt('system/function_calling.txt')
FINAL_ANSWER_TOOL_NAME = 'final_answer'


[docs] def final_answer(result: str) -> str: """Provide the final answer to the user's task and end the conversation. Always call this tool when you have enough information to answer. Args: result: The final answer or result of the task. Returns: The final answer text in user-readable format. """ if isinstance(result, dict): # Drop keys, combine the values return '\n\n'.join(str(v) for v in result.values()) if isinstance(result, list): return '\n\n'.join(str(item) for item in result) return str(result) if not isinstance(result, str) else result
[docs] class FunctionCallingAgent: """An agent that uses native function calling to solve tasks, optimized for Small Language Models (SLMs). If you're using Ollama, make sure to select a model that supports function calling, such as 'ollama/qwen3:8b-q8_0'. """
[docs] def __init__( self, model_name: str, tools: list[Callable] | None = None, system_prompt: str = FCA_SYSTEM_PROMPT, loop_detection_threshold: int = 3, litellm_params: dict | None = None, max_tool_result_chars: int = 3000, one_line_doc: bool = True, tool_timeout: float = 30.0, ): """Initialize the FunctionCallingAgent. Args: model_name: Model identifier for LiteLLM. tools: Optional list of callable tools. system_prompt: System prompt for the agent. loop_detection_threshold: Number of consecutive same tool calls before triggering loop detection. Default is 3. litellm_params: Optional dictionary of parameters to pass to LiteLLM calls. max_tool_result_chars: Maximum number of characters to store in chat history for each tool result. Longer results are truncated with a note. Full results are preserved separately for final answer preparation. one_line_doc: If True, use only the first line of the tool's docstring for the schema. This can help reduce token usage for SLMs. If False, the full docstring is used, which may provide more context but at the cost of more tokens. tool_timeout: Seconds to wait for a single tool call before cancelling it and returning an error result. Default is 30.0. """ self.model_name = model_name self.tools = tools or [] # Ensure final_answer is always available as a tool tool_names = [fn.__name__ for fn in self.tools] if FINAL_ANSWER_TOOL_NAME not in tool_names: self.tools.append(final_answer) self.tool_schemas = [ ku.build_tool_schema(fn, just_first_line=one_line_doc, as_text=False) for fn in self.tools ] # Exclude final_answer from tool_map — it is executed directly via _execute_tool # but its result is extracted separately at the end of the run loop. self.tool_map = {fn.__name__: fn for fn in self.tools} self.litellm_params = litellm_params self.tool_timeout = tool_timeout self.system_prompt = system_prompt self.chat_history: list[dict[str, Any]] = [] self.task: Task | None = None self.final_answer_found = False self.max_tool_result_chars = max_tool_result_chars # Keyed by tool_call_id (unique per invocation) for exact per-call lookup self.full_tool_results: dict[str, str] = {} self.loop_detection_threshold = loop_detection_threshold self.nudge_count = 0
[docs] def response( self, rtype: Literal['step', 'final', 'log'], value: Any, channel: str | None = None, metadata: dict[str, Any] | None = None, ) -> AgentResponse: """Prepare a response to be sent by the agent. Args: rtype: Response type emitted by the agent. value: The current update from the agent. channel: The response channel. metadata: Any metadata associated with the update. Returns: A response from the agent. """ if rtype == 'final' and self.task: self.task.result = value return {'type': rtype, 'channel': channel, 'value': value, 'metadata': metadata}
def _validate_tool_args(self, name: str, args: dict[str, Any]) -> str | None: """Validate tool arguments against the schema. Args: name: Name of the tool being called. args: Arguments provided for the tool call. Returns: An error message if validation fails, or None if validation succeeds. """ schema = next((s for s in self.tool_schemas if s['function']['name'] == name), None) if schema is None: return None # Can't validate, let execution handle it parameters = schema['function']['parameters'] required = parameters.get('required', []) properties = parameters.get('properties', {}) missing = [r for r in required if r not in args] if missing: return ( f'Error: Missing required arguments for `{name}`: {missing}. ' f'Required arguments are: {required}.' ) unexpected = [k for k in args if k not in properties] if unexpected: return ( f'Error: Unexpected arguments for `{name}`: {unexpected}. ' f'Valid arguments are: {list(properties.keys())}.' ) return None async def _execute_tool(self, tool_call: Any) -> dict[str, str]: """Safely executes a specific tool call and returns the message object. Args: tool_call: The tool call to execute. Returns: A dictionary representing the tool result message. """ name = tool_call.function.name args_str = tool_call.function.arguments try: args = json.loads(args_str) logger.info('Agent executing tool: %s with args: %s', name, args) if name not in self.tool_map: result = f'Error: Tool `{name}` is not defined.' elif name == FINAL_ANSWER_TOOL_NAME: # Robust extraction for SLMs that might hallucinate arg keys (relaxed validation) if not isinstance(args, dict): result = ( 'Error: `final_answer` arguments must be a JSON object' ' with the key `result`.' ) else: tool_result = None for key in ('result', 'reason', 'answer', 'response', 'output', 'reply'): value = args.get(key) if value is not None: tool_result = value break if tool_result is None: result = 'Error: `final_answer` called without a result.' elif isinstance(tool_result, (dict, list)): try: result = json.dumps(tool_result, ensure_ascii=False, indent=2) except (TypeError, ValueError): result = str(tool_result) else: result = str(tool_result) else: validation_error = self._validate_tool_args(name, args) if validation_error: result = validation_error else: fn = self.tool_map[name] loop = asyncio.get_running_loop() if asyncio.iscoroutinefunction(fn): coro = fn(**args) else: coro = loop.run_in_executor(None, lambda: fn(**args)) tool_result = await asyncio.wait_for(coro, timeout=self.tool_timeout) if tool_result is None: result = ( f'Error: Tool `{name}` returned no result. ' 'Verify your input arguments and try a different approach.' ) elif isinstance(tool_result, str) and not tool_result.strip(): result = ( f'Error: Tool `{name}` returned an empty result. ' 'The query may have returned no data.' ) elif isinstance(tool_result, (dict, list)): try: result = json.dumps(tool_result, ensure_ascii=False, indent=2) except (TypeError, ValueError): result = str(tool_result) else: result = str(tool_result) except json.JSONDecodeError: result = 'Error: Model provided malformed JSON arguments.' except TypeError as e: result = f'Error: Wrong arguments passed to `{name}`: {str(e)}' except asyncio.TimeoutError: result = f'Error: Tool `{name}` timed out after {self.tool_timeout}s.' except Exception as e: result = f'Error executing `{name}`: {str(e)}' return { 'tool_call_id': tool_call.id, 'role': 'tool', 'name': name, 'content': result, } def _detect_tool_loop(self, nudge_hard_stop: int = 2) -> bool: """Detect if the agent is stuck in a tool calling loop. Analyzes chat history to identify when the same tool is being called consecutively without progress. Supports two-stage nudge escalation before signalling hard termination. Args: nudge_hard_stop: Number of nudges after which hard termination is signalled. Callers should pass their ``loop_threshold`` value here. Returns: True if a loop was detected (nudge added or hard termination signalled), False otherwise. """ recent_tool_calls: list[str] = [] for msg in reversed(self.chat_history): if msg.get('role') == 'assistant' and msg.get('tool_calls'): for tool_call in msg.get('tool_calls', []): tool_name = ( tool_call.get('function', {}).get('name') if isinstance(tool_call, dict) else getattr(getattr(tool_call, 'function', None), 'name', None) ) if tool_name: recent_tool_calls.append(tool_name) if ( len(recent_tool_calls) >= self.loop_detection_threshold and len(set(recent_tool_calls[: self.loop_detection_threshold])) == 1 ): loop_tool = recent_tool_calls[0] # Hard termination once nudge limit has been reached if self.nudge_count >= nudge_hard_stop: return True available_tools = [t for t in self.tool_map if t != loop_tool] # Stage 1: gentle nudge if self.nudge_count == 0: nudge_message = ( f'Loop detected: The tool "{loop_tool}" has been called' f' {self.loop_detection_threshold} consecutive times without progress.' ' This approach is not working.' ) # Stage 2: strong nudge else: nudge_message = ( f'[CRITICAL: STOP REPEATING] You are still calling "{loop_tool}"' ' despite the previous warning. You MUST change your strategy or call' ' final_answer with your best answer now.' ) if available_tools: nudge_message += ( f' Consider using one of these tools instead: {", ".join(available_tools)}.' ) nudge_message += ' If you have gathered enough information, call final_answer now.' self.nudge_count += 1 self.chat_history.append({'role': 'user', 'content': nudge_message}) return True return False @staticmethod def _is_error(text: str) -> bool: """Robustly detect if a string represents an error message. Matches variations like "Error:", "*** ERROR:", "errrror", etc. Args: text: Input string to check. Returns: True if it starts with an error indicator, False otherwise. """ if not text: return False return bool(re.match(r'^\s*(?:\*+\s*)?err+or\b', text, re.IGNORECASE)) @staticmethod def _extract_urls(text: str) -> list[str]: """Extract all URLs from a text string. Args: text: The input text to search for URLs. Returns: A list of URLs found in the text. """ return re.findall(r'https?://[^\s<>"\')\],]+', text) async def _run_init( self, task_desc: str, task_files: list[str] | None = None, task_id: str | None = None, use_planning: bool = True, recurrent_mode: bool = False, chat_history: list[dict] | None = None, ) -> None: """Initialize the running of a task. Args: task_desc: Task description. task_files: Optional files for the task. task_id: Optional task ID. use_planning: If True, generate a simple plan at the beginning of the task. recurrent_mode: If True, the agent will continue to run on the same task until it decides to stop, allowing for more dynamic interactions. chat_history: Optional pre-built OpenAI-compliant chat history to inject as the base context. When provided, the new task message is appended at the end of this history. Mutually exclusive with ``recurrent_mode``. """ if not task_desc or not task_desc.strip(): raise ValueError('Task description cannot be empty!') if task_files and not isinstance(task_files, list): raise ValueError('Task files must be a list of file paths!') if recurrent_mode and self.task is not None: task_description = ( f'## Previous Task\n{self.task.description}\n\n' f'## Previous Task Result\n{self.task.result}\n\n' f'## New Task:\n{task_desc}' ) else: task_description = f'## New Task:\n{task_desc}' user_task_msg = ku.combine_user_messages(ku.make_user_message(task_description, task_files)) self.nudge_count = 0 task_kwargs: dict = {'description': task_desc} if task_id is not None: task_kwargs['id'] = task_id self.task = Task(**task_kwargs) if chat_history is not None: # Validate and deep-copy the injected history ku.validate_chat_history( chat_history, tool_names=set(self.tool_map.keys()), ) base_history = copy.deepcopy(chat_history) # Ensure a system message is present at index 0 if not (base_history and base_history[0].get('role') == 'system'): base_history.insert(0, {'role': 'system', 'content': self.system_prompt}) self.chat_history = base_history + list(user_task_msg) else: self.chat_history = [ {'role': 'system', 'content': self.system_prompt}, *user_task_msg, ] if use_planning: planner = Planner( model_name=self.model_name, litellm_params=self.litellm_params, ) await planner.create_plan(task=self.task, agent_type='fca') formatted_plan = planner.get_formatted_plan() self.chat_history.append( { 'role': 'user', 'content': f'Here is a plan for this task:\n{formatted_plan}', } ) logger.info('Task plan:\n%s\n', formatted_plan) def _format_history_as_text(self) -> str: """Format chat history as readable text. Converts the chat history into a human-readable format, excluding tool call IDs and other non-essential metadata. Returns: Formatted chat history as a string. """ formatted = [] for msg in self.chat_history[1:]: role = msg.get('role', 'unknown') content = msg.get('content') if role == 'user': if content: formatted.append(f'User: {content}') elif role == 'assistant': if content: formatted.append(f'Assistant: {content}') tool_calls = msg.get('tool_calls') if tool_calls: tool_names = [] for tool_call in tool_calls: if isinstance(tool_call, dict): tool_name = tool_call.get('function', {}).get('name') else: tool_name = getattr(getattr(tool_call, 'function', None), 'name', None) if tool_name: tool_names.append(tool_name) if tool_names: formatted.append(f'Assistant: [Called tools: {", ".join(tool_names)}]') elif role == 'tool': tool_name = msg.get('name', 'unknown') tool_call_id = msg.get('tool_call_id') # Exact lookup by tool_call_id — avoids wrong-result for repeated same-tool calls full_content = ( self.full_tool_results.get(tool_call_id) if tool_call_id else None ) or content if full_content: formatted.append(f'Tool ({tool_name}): {full_content}') return '\n'.join(formatted) def _maybe_truncate(self, content: str) -> str: """Truncate or summarise a tool result for storage in chat history. Full results are always preserved in self.full_tool_results for use by _prepare_final_answer. Only the history-facing version is reduced. Note: summarise_long_results=True requires an await — callers must use _maybe_truncate_async instead when that flag is set. Args: content: Full tool result content. Returns: Possibly truncated content with a note if cut. """ if len(content) <= self.max_tool_result_chars: return content truncated = content[: self.max_tool_result_chars] return ( f'{truncated}\n\n' f'[Truncated — {len(content)} total chars. ' f'Showing first {self.max_tool_result_chars}.]' )
[docs] async def run( self, task: str, files: list[str] | None = None, max_iterations: int = 10, refine_final_answer: bool = True, use_planning: bool = True, recurrent_mode: bool = False, loop_threshold: int = 3, chat_history: list[dict] | None = None, ) -> AsyncIterator[AgentResponse]: """Main loop for the agent to process input and execute tools until finished. If an SLM call fails, it tries up to 3 times with a backoff before terminating. Args: task: Task description to process. Add URLs or file contents in the task description. files: An optional list of files related to the task. max_iterations: Maximum number of iterations to run. Failed SLM retries do not count against this budget — only successful reasoning steps do. refine_final_answer: If True, calls an additional SLM step to produce a clean final answer when the model exits without calling final_answer. Recommended for models <=4B that may not use final_answer reliably. Note: not invoked after a hard-stop due to repeated SLM failures. use_planning: If True, generate a simple plan at the beginning of the task. recurrent_mode: If True, the agent continues from the previous task result, allowing for multi-turn workflows. Mutually exclusive with ``chat_history``. loop_threshold: Number of consecutive same tool calls before triggering loop detection. chat_history: Optional pre-built OpenAI-compliant history to inject as the base context for this run. The new task message is appended at the end. Mutually exclusive with ``recurrent_mode``. Yields: AgentResponse: Streaming log, step, and final responses. Raises: ValueError: If both ``recurrent_mode`` and ``chat_history`` are set, or if the provided history fails OpenAI compliance validation. """ if recurrent_mode and chat_history is not None: raise ValueError( 'recurrent_mode and chat_history are mutually exclusive. ' 'Use one or the other, not both.' ) await self._run_init( task, files, use_planning=use_planning, recurrent_mode=recurrent_mode, chat_history=chat_history, ) n_turns = 0 self.final_answer_found = False consecutive_errors = 0 executed_tool_calls: dict[tuple[str, str], str] = {} self.full_tool_results: dict[str, str] = {} # keyed by tool_call_id for exact lookup # Hard-stop flag set when SLM fails 3 consecutive times. # Prevents _prepare_final_answer from issuing a further LLM call # against an already-failing model. llm_hard_stop = False yield self.response( rtype='log', value=f'Solving task: `{self.task.description}`', channel='run' ) for turn in range(max_iterations): logger.info('Turn %d/%d for model %s', turn + 1, max_iterations, self.model_name) n_turns += 1 yield self.response(rtype='log', channel='run', value=f'* Executing step {n_turns}') # --- Inner retry loop for transient SLM failures --- # Failed attempts do NOT consume outer-loop iterations (n_turns is not # incremented here; it was incremented once above for this reasoning step). response = None consecutive_llm_errors = 0 while consecutive_llm_errors < 3: try: response = await litellm.acompletion( model=self.model_name, messages=self.chat_history, tools=self.tool_schemas, tool_choice='auto', **(self.litellm_params or {}), ) # Success — exit the retry loop break except Exception as e: consecutive_llm_errors += 1 logger.error( 'SLM call failed (attempt %d/3) on turn %d: %s', consecutive_llm_errors, turn + 1, e, ) yield self.response( rtype='log', value=f'SLM call failed ({consecutive_llm_errors}/3): {e}', channel='run', ) if consecutive_llm_errors >= 3: logger.error('Too many consecutive SLM failures. Terminating.') yield self.response( rtype='log', value='Too many consecutive SLM failures. Terminating.', channel='run', ) llm_hard_stop = True break # Non-blocking backoff before next attempt await asyncio.sleep(5 * consecutive_llm_errors) # Hard-stop: exit the outer loop without falling through to # _prepare_final_answer — the model is not responding. if llm_hard_stop: break message = response.choices[0].message self.chat_history.append(message.model_dump()) # Model chose to respond without a tool call — treat as done if not message.tool_calls: self.final_answer_found = True break # Process all tool calls in this turn for tool_call in message.tool_calls: name = tool_call.function.name args_str = tool_call.function.arguments call_key = (name, args_str) # Deduplicate: return cached result instead of re-executing if call_key in executed_tool_calls: prev_result = executed_tool_calls[call_key] tool_result_message = { 'tool_call_id': tool_call.id, 'role': 'tool', 'name': name, 'content': ( f'You already called `{name}` with these arguments ' f'and got: {prev_result}. Use that result.' ), } else: tool_result_message = await self._execute_tool(tool_call) raw_content = tool_result_message['content'] if not self._is_error(raw_content): # Populate deduplication cache with the raw result executed_tool_calls[call_key] = raw_content # Store full result keyed by tool_call_id for exact per-call lookup self.full_tool_results[tool_call.id] = raw_content # Truncate for history tool_result_message = { **tool_result_message, 'content': self._maybe_truncate(raw_content), } # Track consecutive tool errors for early exit if self._is_error(tool_result_message['content']): consecutive_errors += 1 else: consecutive_errors = 0 self.chat_history.append(tool_result_message) yield self.response( rtype='log', value=( f'Executed tool: {name}. Result: {tool_result_message["content"][:100]}...' ), channel='tool', ) if name == FINAL_ANSWER_TOOL_NAME: self.final_answer_found = True if self.final_answer_found: logger.info('Final answer tool called, ending loop.') break if consecutive_errors >= 3: logger.error('Too many consecutive tool errors. Terminating loop early.') yield self.response( rtype='log', value='Too many consecutive tool errors. Terminating loop early.', channel='run', ) break loop_detected = self._detect_tool_loop(nudge_hard_stop=loop_threshold) if loop_detected: if self.nudge_count >= loop_threshold: logger.error('Loop persisted after nudges. Terminating for safety.') yield self.response( rtype='log', value='Loop persisted after nudges. Terminating for safety.', channel='run', ) break logger.info('Loop detection triggered, nudging agent.') yield self.response( rtype='log', value='Loop detected, nudging agent...', channel='run' ) self.task.steps_taken = n_turns # Prefer the final_answer tool result if present. # for...else: the else block runs only if no break was hit, i.e. no # final_answer tool message was found in history. result = 'No response generated.' for msg in reversed(self.chat_history): if msg.get('role') == 'tool' and msg.get('name') == FINAL_ANSWER_TOOL_NAME: # Use the full (untruncated) stored result if available tool_call_id = msg.get('tool_call_id') result = ( self.full_tool_results.get(tool_call_id) if tool_call_id else None ) or msg['content'] break else: # Do not invoke _prepare_final_answer after an LLM hard-stop — # the model is already unresponsive and a further call would also fail. if not llm_hard_stop and refine_final_answer: result = await self._prepare_final_answer() else: for msg in reversed(self.chat_history): if msg.get('role') == 'assistant' and msg.get('content'): result = msg['content'] break yield self.response(rtype='final', value=result, channel='run')
async def _prepare_final_answer(self) -> str: """Summarise the conversation into a clean final answer via a separate SLM call. In case of error, it will return the last assistant message. If unable to do so, it will return a generic message. Used as a fallback when the model exits the loop without calling final_answer, which is common for models <= 4B. Formats the full conversation history as plain text and asks the model to produce a concise response. Returns: A user-readable string summarising the result. """ formatted_history = self._format_history_as_text() try: response = await litellm.acompletion( model=self.model_name, messages=[ { 'role': 'system', 'content': ( 'You are a helpful assistant. Based on the conversation history ' "below, provide a clear and concise final answer to the user's task." ), }, { 'role': 'user', 'content': f'Conversation history:\n{formatted_history}', }, ], **(self.litellm_params or {}), ) final_answer_text = (response.choices[0].message.content or '').strip() logger.debug('Raw _prepare_final_answer response: %s', final_answer_text) return final_answer_text except Exception as e: logger.error('_prepare_final_answer SLM call failed: %s', e) # Fall back to the last assistant text message already in history for msg in reversed(self.chat_history): if msg.get('role') == 'assistant' and msg.get('content'): return msg['content'] return 'No response generated due to SLM failure.'
[docs] async def main(): """Example usage of the FunctionCallingAgent.""" model_name = 'gemini/gemini-2.0-flash-lite' # Some smaller models with 8-bit quantization or higher can perform well with function calling # model_name='ollama/qwen3:8b-q8_0' # model_name = 'ollama/qwen3:4b-instruct-2507-fp16' # model_name = 'ollama/granite4:7b-a1b-h' # model_name = 'ollama/phi4-mini:3.8b-q8_0' agent = FunctionCallingAgent( model_name=model_name, tools=[ dtools.search_web, dtools.calculator, dtools.read_webpage, dtools.transcribe_youtube, dtools.search_wikipedia, ], # Temp = 2 to make Gemini "creative"; set to 0 for SLMs litellm_params={'temperature': 2, 'timeout': 90}, ) print(f'Using model: {model_name}') tasks = [ ('What is 5 times 7?', None), ('What is this page about?', ['https://en.wikipedia.org/wiki/Artificial_intelligence']), ( 'Caption this image', [ 'https://cdn.prod.website-files.com/61a05ff14c09ecacc06eec05' '/66e8522cbe3d357b8434826a_ai-agents.jpg' ], ), ( 'Find the current stock price of NVIDIA & calculate how many shares' ' I can buy with $5000.', None, ), ( 'Get the transcript of this YouTube video: https://www.youtube.com/watch?v=aircAruvnKk' '\nIdentify the main topic, then search Wikipedia for that topic and give me' ' a brief summary of what Wikipedia says about it (give Wikipedia page link).', None, ), ] for idx, (task, files) in enumerate(tasks, start=1): print(f'\nTask #{idx}: {task}') async for response in agent.run(task, files, max_iterations=10, use_planning=False): print(response) print(f'>>> {agent.task.result=}') print(f'>>> {agent.task.steps_taken=}') print('-' * 80)
if __name__ == '__main__': asyncio.run(main())