"""Function Calling Agent. This module is optimized for Small Language Models (SLMs).
The FC agent uses native function calling to solve tasks. Some of the data structures are
reproduced here to keep this module self-contained and optimized.
"""
import asyncio
import copy
import json
import logging
import re
from collections.abc import AsyncIterator, Callable
from typing import Any, Literal
import litellm
from . import kutils as ku
from . import tools as dtools
from .models import AgentResponse, Task
from .orchestrator import Planner
logger = logging.getLogger(__name__)
FCA_SYSTEM_PROMPT = ku.read_prompt('system/function_calling.txt')
FINAL_ANSWER_TOOL_NAME = 'final_answer'
[docs]
def final_answer(result: str) -> str:
"""Provide the final answer to the user's task and end the conversation.
Always call this tool when you have enough information to answer.
Args:
result: The final answer or result of the task.
Returns:
The final answer text in user-readable format.
"""
if isinstance(result, dict):
# Drop keys, combine the values
return '\n\n'.join(str(v) for v in result.values())
if isinstance(result, list):
return '\n\n'.join(str(item) for item in result)
return str(result) if not isinstance(result, str) else result
[docs]
class FunctionCallingAgent:
"""An agent that uses native function calling to solve tasks,
optimized for Small Language Models (SLMs). If you're using Ollama, make sure to select
a model that supports function calling, such as 'ollama/qwen3:8b-q8_0'.
"""
[docs]
def __init__(
self,
model_name: str,
tools: list[Callable] | None = None,
system_prompt: str = FCA_SYSTEM_PROMPT,
loop_detection_threshold: int = 3,
litellm_params: dict | None = None,
max_tool_result_chars: int = 3000,
one_line_doc: bool = True,
tool_timeout: float = 30.0,
):
"""Initialize the FunctionCallingAgent.
Args:
model_name: Model identifier for LiteLLM.
tools: Optional list of callable tools.
system_prompt: System prompt for the agent.
loop_detection_threshold: Number of consecutive same tool calls
before triggering loop detection. Default is 3.
litellm_params: Optional dictionary of parameters to pass to LiteLLM calls.
max_tool_result_chars: Maximum number of characters to store in chat history
for each tool result. Longer results are truncated with a note. Full results
are preserved separately for final answer preparation.
one_line_doc: If True, use only the first line of the tool's docstring for the schema.
This can help reduce token usage for SLMs. If False, the full docstring is used,
which may provide more context but at the cost of more tokens.
tool_timeout: Seconds to wait for a single tool call before cancelling it and
returning an error result. Default is 30.0.
"""
self.model_name = model_name
self.tools = tools or []
# Ensure final_answer is always available as a tool
tool_names = [fn.__name__ for fn in self.tools]
if FINAL_ANSWER_TOOL_NAME not in tool_names:
self.tools.append(final_answer)
self.tool_schemas = [
ku.build_tool_schema(fn, just_first_line=one_line_doc, as_text=False)
for fn in self.tools
]
# Exclude final_answer from tool_map — it is executed directly via _execute_tool
# but its result is extracted separately at the end of the run loop.
self.tool_map = {fn.__name__: fn for fn in self.tools}
self.litellm_params = litellm_params
self.tool_timeout = tool_timeout
self.system_prompt = system_prompt
self.chat_history: list[dict[str, Any]] = []
self.task: Task | None = None
self.final_answer_found = False
self.max_tool_result_chars = max_tool_result_chars
# Keyed by tool_call_id (unique per invocation) for exact per-call lookup
self.full_tool_results: dict[str, str] = {}
self.loop_detection_threshold = loop_detection_threshold
self.nudge_count = 0
[docs]
def response(
self,
rtype: Literal['step', 'final', 'log'],
value: Any,
channel: str | None = None,
metadata: dict[str, Any] | None = None,
) -> AgentResponse:
"""Prepare a response to be sent by the agent.
Args:
rtype: Response type emitted by the agent.
value: The current update from the agent.
channel: The response channel.
metadata: Any metadata associated with the update.
Returns:
A response from the agent.
"""
if rtype == 'final' and self.task:
self.task.result = value
return {'type': rtype, 'channel': channel, 'value': value, 'metadata': metadata}
def _validate_tool_args(self, name: str, args: dict[str, Any]) -> str | None:
"""Validate tool arguments against the schema.
Args:
name: Name of the tool being called.
args: Arguments provided for the tool call.
Returns:
An error message if validation fails, or None if validation succeeds.
"""
schema = next((s for s in self.tool_schemas if s['function']['name'] == name), None)
if schema is None:
return None # Can't validate, let execution handle it
parameters = schema['function']['parameters']
required = parameters.get('required', [])
properties = parameters.get('properties', {})
missing = [r for r in required if r not in args]
if missing:
return (
f'Error: Missing required arguments for `{name}`: {missing}. '
f'Required arguments are: {required}.'
)
unexpected = [k for k in args if k not in properties]
if unexpected:
return (
f'Error: Unexpected arguments for `{name}`: {unexpected}. '
f'Valid arguments are: {list(properties.keys())}.'
)
return None
async def _execute_tool(self, tool_call: Any) -> dict[str, str]:
"""Safely executes a specific tool call and returns the message object.
Args:
tool_call: The tool call to execute.
Returns:
A dictionary representing the tool result message.
"""
name = tool_call.function.name
args_str = tool_call.function.arguments
try:
args = json.loads(args_str)
logger.info('Agent executing tool: %s with args: %s', name, args)
if name not in self.tool_map:
result = f'Error: Tool `{name}` is not defined.'
elif name == FINAL_ANSWER_TOOL_NAME:
# Robust extraction for SLMs that might hallucinate arg keys (relaxed validation)
if not isinstance(args, dict):
result = (
'Error: `final_answer` arguments must be a JSON object'
' with the key `result`.'
)
else:
tool_result = None
for key in ('result', 'reason', 'answer', 'response', 'output', 'reply'):
value = args.get(key)
if value is not None:
tool_result = value
break
if tool_result is None:
result = 'Error: `final_answer` called without a result.'
elif isinstance(tool_result, (dict, list)):
try:
result = json.dumps(tool_result, ensure_ascii=False, indent=2)
except (TypeError, ValueError):
result = str(tool_result)
else:
result = str(tool_result)
else:
validation_error = self._validate_tool_args(name, args)
if validation_error:
result = validation_error
else:
fn = self.tool_map[name]
loop = asyncio.get_running_loop()
if asyncio.iscoroutinefunction(fn):
coro = fn(**args)
else:
coro = loop.run_in_executor(None, lambda: fn(**args))
tool_result = await asyncio.wait_for(coro, timeout=self.tool_timeout)
if tool_result is None:
result = (
f'Error: Tool `{name}` returned no result. '
'Verify your input arguments and try a different approach.'
)
elif isinstance(tool_result, str) and not tool_result.strip():
result = (
f'Error: Tool `{name}` returned an empty result. '
'The query may have returned no data.'
)
elif isinstance(tool_result, (dict, list)):
try:
result = json.dumps(tool_result, ensure_ascii=False, indent=2)
except (TypeError, ValueError):
result = str(tool_result)
else:
result = str(tool_result)
except json.JSONDecodeError:
result = 'Error: Model provided malformed JSON arguments.'
except TypeError as e:
result = f'Error: Wrong arguments passed to `{name}`: {str(e)}'
except asyncio.TimeoutError:
result = f'Error: Tool `{name}` timed out after {self.tool_timeout}s.'
except Exception as e:
result = f'Error executing `{name}`: {str(e)}'
return {
'tool_call_id': tool_call.id,
'role': 'tool',
'name': name,
'content': result,
}
def _detect_tool_loop(self, nudge_hard_stop: int = 2) -> bool:
"""Detect if the agent is stuck in a tool calling loop.
Analyzes chat history to identify when the same tool is being called
consecutively without progress. Supports two-stage nudge escalation
before signalling hard termination.
Args:
nudge_hard_stop: Number of nudges after which hard termination is signalled.
Callers should pass their ``loop_threshold`` value here.
Returns:
True if a loop was detected (nudge added or hard termination signalled),
False otherwise.
"""
recent_tool_calls: list[str] = []
for msg in reversed(self.chat_history):
if msg.get('role') == 'assistant' and msg.get('tool_calls'):
for tool_call in msg.get('tool_calls', []):
tool_name = (
tool_call.get('function', {}).get('name')
if isinstance(tool_call, dict)
else getattr(getattr(tool_call, 'function', None), 'name', None)
)
if tool_name:
recent_tool_calls.append(tool_name)
if (
len(recent_tool_calls) >= self.loop_detection_threshold
and len(set(recent_tool_calls[: self.loop_detection_threshold])) == 1
):
loop_tool = recent_tool_calls[0]
# Hard termination once nudge limit has been reached
if self.nudge_count >= nudge_hard_stop:
return True
available_tools = [t for t in self.tool_map if t != loop_tool]
# Stage 1: gentle nudge
if self.nudge_count == 0:
nudge_message = (
f'Loop detected: The tool "{loop_tool}" has been called'
f' {self.loop_detection_threshold} consecutive times without progress.'
' This approach is not working.'
)
# Stage 2: strong nudge
else:
nudge_message = (
f'[CRITICAL: STOP REPEATING] You are still calling "{loop_tool}"'
' despite the previous warning. You MUST change your strategy or call'
' final_answer with your best answer now.'
)
if available_tools:
nudge_message += (
f' Consider using one of these tools instead: {", ".join(available_tools)}.'
)
nudge_message += ' If you have gathered enough information, call final_answer now.'
self.nudge_count += 1
self.chat_history.append({'role': 'user', 'content': nudge_message})
return True
return False
@staticmethod
def _is_error(text: str) -> bool:
"""Robustly detect if a string represents an error message.
Matches variations like "Error:", "*** ERROR:", "errrror", etc.
Args:
text: Input string to check.
Returns:
True if it starts with an error indicator, False otherwise.
"""
if not text:
return False
return bool(re.match(r'^\s*(?:\*+\s*)?err+or\b', text, re.IGNORECASE))
@staticmethod
def _extract_urls(text: str) -> list[str]:
"""Extract all URLs from a text string.
Args:
text: The input text to search for URLs.
Returns:
A list of URLs found in the text.
"""
return re.findall(r'https?://[^\s<>"\')\],]+', text)
async def _run_init(
self,
task_desc: str,
task_files: list[str] | None = None,
task_id: str | None = None,
use_planning: bool = True,
recurrent_mode: bool = False,
chat_history: list[dict] | None = None,
) -> None:
"""Initialize the running of a task.
Args:
task_desc: Task description.
task_files: Optional files for the task.
task_id: Optional task ID.
use_planning: If True, generate a simple plan at the beginning of the task.
recurrent_mode: If True, the agent will continue to run on the same task
until it decides to stop, allowing for more dynamic interactions.
chat_history: Optional pre-built OpenAI-compliant chat history to inject
as the base context. When provided, the new task message is appended
at the end of this history. Mutually exclusive with ``recurrent_mode``.
"""
if not task_desc or not task_desc.strip():
raise ValueError('Task description cannot be empty!')
if task_files and not isinstance(task_files, list):
raise ValueError('Task files must be a list of file paths!')
if recurrent_mode and self.task is not None:
task_description = (
f'## Previous Task\n{self.task.description}\n\n'
f'## Previous Task Result\n{self.task.result}\n\n'
f'## New Task:\n{task_desc}'
)
else:
task_description = f'## New Task:\n{task_desc}'
user_task_msg = ku.combine_user_messages(ku.make_user_message(task_description, task_files))
self.nudge_count = 0
task_kwargs: dict = {'description': task_desc}
if task_id is not None:
task_kwargs['id'] = task_id
self.task = Task(**task_kwargs)
if chat_history is not None:
# Validate and deep-copy the injected history
ku.validate_chat_history(
chat_history,
tool_names=set(self.tool_map.keys()),
)
base_history = copy.deepcopy(chat_history)
# Ensure a system message is present at index 0
if not (base_history and base_history[0].get('role') == 'system'):
base_history.insert(0, {'role': 'system', 'content': self.system_prompt})
self.chat_history = base_history + list(user_task_msg)
else:
self.chat_history = [
{'role': 'system', 'content': self.system_prompt},
*user_task_msg,
]
if use_planning:
planner = Planner(
model_name=self.model_name,
litellm_params=self.litellm_params,
)
await planner.create_plan(task=self.task, agent_type='fca')
formatted_plan = planner.get_formatted_plan()
self.chat_history.append(
{
'role': 'user',
'content': f'Here is a plan for this task:\n{formatted_plan}',
}
)
logger.info('Task plan:\n%s\n', formatted_plan)
def _format_history_as_text(self) -> str:
"""Format chat history as readable text.
Converts the chat history into a human-readable format, excluding
tool call IDs and other non-essential metadata.
Returns:
Formatted chat history as a string.
"""
formatted = []
for msg in self.chat_history[1:]:
role = msg.get('role', 'unknown')
content = msg.get('content')
if role == 'user':
if content:
formatted.append(f'User: {content}')
elif role == 'assistant':
if content:
formatted.append(f'Assistant: {content}')
tool_calls = msg.get('tool_calls')
if tool_calls:
tool_names = []
for tool_call in tool_calls:
if isinstance(tool_call, dict):
tool_name = tool_call.get('function', {}).get('name')
else:
tool_name = getattr(getattr(tool_call, 'function', None), 'name', None)
if tool_name:
tool_names.append(tool_name)
if tool_names:
formatted.append(f'Assistant: [Called tools: {", ".join(tool_names)}]')
elif role == 'tool':
tool_name = msg.get('name', 'unknown')
tool_call_id = msg.get('tool_call_id')
# Exact lookup by tool_call_id — avoids wrong-result for repeated same-tool calls
full_content = (
self.full_tool_results.get(tool_call_id) if tool_call_id else None
) or content
if full_content:
formatted.append(f'Tool ({tool_name}): {full_content}')
return '\n'.join(formatted)
def _maybe_truncate(self, content: str) -> str:
"""Truncate or summarise a tool result for storage in chat history.
Full results are always preserved in self.full_tool_results for use
by _prepare_final_answer. Only the history-facing version is reduced.
Note: summarise_long_results=True requires an await — callers must
use _maybe_truncate_async instead when that flag is set.
Args:
content: Full tool result content.
Returns:
Possibly truncated content with a note if cut.
"""
if len(content) <= self.max_tool_result_chars:
return content
truncated = content[: self.max_tool_result_chars]
return (
f'{truncated}\n\n'
f'[Truncated — {len(content)} total chars. '
f'Showing first {self.max_tool_result_chars}.]'
)
[docs]
async def run(
self,
task: str,
files: list[str] | None = None,
max_iterations: int = 10,
refine_final_answer: bool = True,
use_planning: bool = True,
recurrent_mode: bool = False,
loop_threshold: int = 3,
chat_history: list[dict] | None = None,
) -> AsyncIterator[AgentResponse]:
"""Main loop for the agent to process input and execute tools until finished.
If an SLM call fails, it tries up to 3 times with a backoff before terminating.
Args:
task: Task description to process. Add URLs or file contents in the task description.
files: An optional list of files related to the task.
max_iterations: Maximum number of iterations to run. Failed SLM retries do
not count against this budget — only successful reasoning steps do.
refine_final_answer: If True, calls an additional SLM step to produce a
clean final answer when the model exits without calling final_answer.
Recommended for models <=4B that may not use final_answer reliably.
Note: not invoked after a hard-stop due to repeated SLM failures.
use_planning: If True, generate a simple plan at the beginning of the task.
recurrent_mode: If True, the agent continues from the previous task result,
allowing for multi-turn workflows. Mutually exclusive with ``chat_history``.
loop_threshold: Number of consecutive same tool calls before triggering loop detection.
chat_history: Optional pre-built OpenAI-compliant history to inject as the
base context for this run. The new task message is appended at the end.
Mutually exclusive with ``recurrent_mode``.
Yields:
AgentResponse: Streaming log, step, and final responses.
Raises:
ValueError: If both ``recurrent_mode`` and ``chat_history`` are set, or if
the provided history fails OpenAI compliance validation.
"""
if recurrent_mode and chat_history is not None:
raise ValueError(
'recurrent_mode and chat_history are mutually exclusive. '
'Use one or the other, not both.'
)
await self._run_init(
task,
files,
use_planning=use_planning,
recurrent_mode=recurrent_mode,
chat_history=chat_history,
)
n_turns = 0
self.final_answer_found = False
consecutive_errors = 0
executed_tool_calls: dict[tuple[str, str], str] = {}
self.full_tool_results: dict[str, str] = {} # keyed by tool_call_id for exact lookup
# Hard-stop flag set when SLM fails 3 consecutive times.
# Prevents _prepare_final_answer from issuing a further LLM call
# against an already-failing model.
llm_hard_stop = False
yield self.response(
rtype='log', value=f'Solving task: `{self.task.description}`', channel='run'
)
for turn in range(max_iterations):
logger.info('Turn %d/%d for model %s', turn + 1, max_iterations, self.model_name)
n_turns += 1
yield self.response(rtype='log', channel='run', value=f'* Executing step {n_turns}')
# --- Inner retry loop for transient SLM failures ---
# Failed attempts do NOT consume outer-loop iterations (n_turns is not
# incremented here; it was incremented once above for this reasoning step).
response = None
consecutive_llm_errors = 0
while consecutive_llm_errors < 3:
try:
response = await litellm.acompletion(
model=self.model_name,
messages=self.chat_history,
tools=self.tool_schemas,
tool_choice='auto',
**(self.litellm_params or {}),
)
# Success — exit the retry loop
break
except Exception as e:
consecutive_llm_errors += 1
logger.error(
'SLM call failed (attempt %d/3) on turn %d: %s',
consecutive_llm_errors,
turn + 1,
e,
)
yield self.response(
rtype='log',
value=f'SLM call failed ({consecutive_llm_errors}/3): {e}',
channel='run',
)
if consecutive_llm_errors >= 3:
logger.error('Too many consecutive SLM failures. Terminating.')
yield self.response(
rtype='log',
value='Too many consecutive SLM failures. Terminating.',
channel='run',
)
llm_hard_stop = True
break
# Non-blocking backoff before next attempt
await asyncio.sleep(5 * consecutive_llm_errors)
# Hard-stop: exit the outer loop without falling through to
# _prepare_final_answer — the model is not responding.
if llm_hard_stop:
break
message = response.choices[0].message
self.chat_history.append(message.model_dump())
# Model chose to respond without a tool call — treat as done
if not message.tool_calls:
self.final_answer_found = True
break
# Process all tool calls in this turn
for tool_call in message.tool_calls:
name = tool_call.function.name
args_str = tool_call.function.arguments
call_key = (name, args_str)
# Deduplicate: return cached result instead of re-executing
if call_key in executed_tool_calls:
prev_result = executed_tool_calls[call_key]
tool_result_message = {
'tool_call_id': tool_call.id,
'role': 'tool',
'name': name,
'content': (
f'You already called `{name}` with these arguments '
f'and got: {prev_result}. Use that result.'
),
}
else:
tool_result_message = await self._execute_tool(tool_call)
raw_content = tool_result_message['content']
if not self._is_error(raw_content):
# Populate deduplication cache with the raw result
executed_tool_calls[call_key] = raw_content
# Store full result keyed by tool_call_id for exact per-call lookup
self.full_tool_results[tool_call.id] = raw_content
# Truncate for history
tool_result_message = {
**tool_result_message,
'content': self._maybe_truncate(raw_content),
}
# Track consecutive tool errors for early exit
if self._is_error(tool_result_message['content']):
consecutive_errors += 1
else:
consecutive_errors = 0
self.chat_history.append(tool_result_message)
yield self.response(
rtype='log',
value=(
f'Executed tool: {name}. Result: {tool_result_message["content"][:100]}...'
),
channel='tool',
)
if name == FINAL_ANSWER_TOOL_NAME:
self.final_answer_found = True
if self.final_answer_found:
logger.info('Final answer tool called, ending loop.')
break
if consecutive_errors >= 3:
logger.error('Too many consecutive tool errors. Terminating loop early.')
yield self.response(
rtype='log',
value='Too many consecutive tool errors. Terminating loop early.',
channel='run',
)
break
loop_detected = self._detect_tool_loop(nudge_hard_stop=loop_threshold)
if loop_detected:
if self.nudge_count >= loop_threshold:
logger.error('Loop persisted after nudges. Terminating for safety.')
yield self.response(
rtype='log',
value='Loop persisted after nudges. Terminating for safety.',
channel='run',
)
break
logger.info('Loop detection triggered, nudging agent.')
yield self.response(
rtype='log', value='Loop detected, nudging agent...', channel='run'
)
self.task.steps_taken = n_turns
# Prefer the final_answer tool result if present.
# for...else: the else block runs only if no break was hit, i.e. no
# final_answer tool message was found in history.
result = 'No response generated.'
for msg in reversed(self.chat_history):
if msg.get('role') == 'tool' and msg.get('name') == FINAL_ANSWER_TOOL_NAME:
# Use the full (untruncated) stored result if available
tool_call_id = msg.get('tool_call_id')
result = (
self.full_tool_results.get(tool_call_id) if tool_call_id else None
) or msg['content']
break
else:
# Do not invoke _prepare_final_answer after an LLM hard-stop —
# the model is already unresponsive and a further call would also fail.
if not llm_hard_stop and refine_final_answer:
result = await self._prepare_final_answer()
else:
for msg in reversed(self.chat_history):
if msg.get('role') == 'assistant' and msg.get('content'):
result = msg['content']
break
yield self.response(rtype='final', value=result, channel='run')
async def _prepare_final_answer(self) -> str:
"""Summarise the conversation into a clean final answer via a separate SLM call.
In case of error, it will return the last assistant message. If unable to do so,
it will return a generic message.
Used as a fallback when the model exits the loop without calling final_answer,
which is common for models <= 4B. Formats the full conversation history as
plain text and asks the model to produce a concise response.
Returns:
A user-readable string summarising the result.
"""
formatted_history = self._format_history_as_text()
try:
response = await litellm.acompletion(
model=self.model_name,
messages=[
{
'role': 'system',
'content': (
'You are a helpful assistant. Based on the conversation history '
"below, provide a clear and concise final answer to the user's task."
),
},
{
'role': 'user',
'content': f'Conversation history:\n{formatted_history}',
},
],
**(self.litellm_params or {}),
)
final_answer_text = (response.choices[0].message.content or '').strip()
logger.debug('Raw _prepare_final_answer response: %s', final_answer_text)
return final_answer_text
except Exception as e:
logger.error('_prepare_final_answer SLM call failed: %s', e)
# Fall back to the last assistant text message already in history
for msg in reversed(self.chat_history):
if msg.get('role') == 'assistant' and msg.get('content'):
return msg['content']
return 'No response generated due to SLM failure.'
[docs]
async def main():
"""Example usage of the FunctionCallingAgent."""
model_name = 'gemini/gemini-2.0-flash-lite'
# Some smaller models with 8-bit quantization or higher can perform well with function calling
# model_name='ollama/qwen3:8b-q8_0'
# model_name = 'ollama/qwen3:4b-instruct-2507-fp16'
# model_name = 'ollama/granite4:7b-a1b-h'
# model_name = 'ollama/phi4-mini:3.8b-q8_0'
agent = FunctionCallingAgent(
model_name=model_name,
tools=[
dtools.search_web,
dtools.calculator,
dtools.read_webpage,
dtools.transcribe_youtube,
dtools.search_wikipedia,
],
# Temp = 2 to make Gemini "creative"; set to 0 for SLMs
litellm_params={'temperature': 2, 'timeout': 90},
)
print(f'Using model: {model_name}')
tasks = [
('What is 5 times 7?', None),
('What is this page about?', ['https://en.wikipedia.org/wiki/Artificial_intelligence']),
(
'Caption this image',
[
'https://cdn.prod.website-files.com/61a05ff14c09ecacc06eec05'
'/66e8522cbe3d357b8434826a_ai-agents.jpg'
],
),
(
'Find the current stock price of NVIDIA & calculate how many shares'
' I can buy with $5000.',
None,
),
(
'Get the transcript of this YouTube video: https://www.youtube.com/watch?v=aircAruvnKk'
'\nIdentify the main topic, then search Wikipedia for that topic and give me'
' a brief summary of what Wikipedia says about it (give Wikipedia page link).',
None,
),
]
for idx, (task, files) in enumerate(tasks, start=1):
print(f'\nTask #{idx}: {task}')
async for response in agent.run(task, files, max_iterations=10, use_planning=False):
print(response)
print(f'>>> {agent.task.result=}')
print(f'>>> {agent.task.steps_taken=}')
print('-' * 80)
if __name__ == '__main__':
asyncio.run(main())