Source code for kodeagent.tools

"""This module defines a set of built-in tools for KodeAgent.
All tools import necessary dependencies within their function bodies to ensure they are
self-contained and can operate in isolated environments. Similarly, all variables are declared
locally within the functions.
"""

DEFAULT_TOOLS_IMPORTS = [
    'ast',
    'operator',
    're',
    'time',
    'random',
    'ddgs',
    'pathlib',
    'tempfile',
    'requests',
    'markitdown',
    'bs4',
    'wikipedia',
    'arxiv',
    'youtube_transcript_api',
    'urllib.parse',
    'os',
    'base64',
    'litellm',
]
"""List of default modules (stdlib and third-party) to be available in tools."""


[docs] def calculator(expression: str) -> float | None: """Evaluate a single arithmetic expression and return the numeric result. Call this tool once per arithmetic operation. Do NOT try to compute multi-step problems in one call; chain multiple calls instead. Supported operators: +, -, *, /, ** (exponent), parentheses. Args: expression: A single arithmetic expression, e.g. "7 + 5" or "12 * 4". Use ** for exponents, not ^. Returns None for invalid input. Returns: The numeric result as a float, or None if the expression is invalid. """ import ast import operator import re # Clean the expression expression = expression.replace("'", '').replace('^', '**') # Define a regex pattern for valid mathematical expressions calculator_regex = re.compile(r'^[\d+\-*/().\s]+$') if calculator_regex.match(expression) is None: return None try: # Parse the expression into an AST node = ast.parse(expression, mode='eval').body # Define allowed operations allowed_operators = { ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv, ast.Pow: operator.pow, ast.USub: operator.neg, ast.UAdd: operator.pos, } def eval_node(node): """Recursively evaluate the AST node.""" if isinstance(node, ast.Constant): # Python 3.8+ return node.value if isinstance(node, ast.BinOp): # Binary operation op_type = type(node.op) if op_type not in allowed_operators: raise ValueError(f'Operator {op_type} not allowed') left = eval_node(node.left) right = eval_node(node.right) return allowed_operators[op_type](left, right) if isinstance(node, ast.UnaryOp): # Unary operation op_type = type(node.op) if op_type not in allowed_operators: raise ValueError(f'Operator {op_type} not allowed') operand = eval_node(node.operand) return allowed_operators[op_type](operand) raise ValueError(f'Unsupported node type: {type(node)}') result = eval_node(node) return float(result) except Exception: return None
[docs] def search_web(query: str, max_results: int = 10) -> str: """Search the web using DuckDuckGo and return top results with titles and URLs. Use this to find current information, news, or general web content. To get full page content from a result URL, call `read_webpage` tool next (if available). Args: query: Search terms (2-5 words work best). max_results: Number of results to return (default 10, min 1, max 20). Returns: Markdown formatted search results with titles, URLs, and snippets, or an error message. """ import random import time from datetime import datetime today = datetime.now().strftime('%Y-%m-%d') try: from ddgs import DDGS except ImportError: return 'ERROR: Required library `ddgs` not installed. Install with: `pip install ddgs`' # Validate inputs if not query or not query.strip(): return 'ERROR: Search query cannot be empty.' query = query.strip() if max_results < 1: max_results = 1 elif max_results > 20: max_results = 20 # Cap at reasonable limit try: # Use verify=True for security, but handle SSL errors gracefully try: engine = DDGS(timeout=20) results = engine.text(query, max_results=max_results) except Exception as ssl_error: # Fallback to verify=False only if SSL fails if 'SSL' in str(ssl_error) or 'certificate' in str(ssl_error).lower(): engine = DDGS(timeout=20, verify=False) results = engine.text(query, max_results=max_results) else: raise # Small random delay to be respectful time.sleep(random.uniform(0.5, 1.5)) if not results or len(results) == 0: return ( f"No results found for '{query}'. Try:\n" '- Using fewer, more common words\n' '- Removing special characters\n' '- Being less specific' ) # Format results as clean Markdown output = f'# Search Discovery (System Date: {today})\n\n' output += f'Found {len(results)} result(s)\n\n' for i, result in enumerate(results, 1): title = result.get('title', 'No title') url = result.get('href', '') body = result.get('body', '') # Clean up title and body title = title.replace('\n', ' ').strip() body = body.replace('\n', ' ').strip() output += f'## {i}. {title}\n' output += f'**URL:** {url}\n' output += f'**Snippet:** {body}\n\n' # output += "\n**Next Step:** Use the 'read_webpage' tool with any URL above to get full page content." output += '> **Note:** These are summaries. Use `read_webpage` with a URL to verify facts.' return output except ImportError: return 'ERROR: Required library `ddgs` not installed. Install with: `pip install ddgs`' except Exception as e: error_msg = str(e).lower() if 'ratelimit' in error_msg: return 'ERROR: DuckDuckGo rate limit reached. Please wait 30s before searching again.' if 'timeout' in error_msg: return 'ERROR: Search request timed out. Please try again with a simpler query.' return f'ERROR: Search failed - {error_msg}'
[docs] def download_file(url: str, save_name: str | None = None, save_dir: str | None = None) -> dict: """Download a file from the internet and save it locally. Use this for downloading images, PDFs, data files, or any binary content. For reading webpage content as text, use 'read_webpage' instead. For extracting content from PDFs/DOCX/XLSX, use 'extract_as_markdown' instead. Examples: - Download an image: url="https://example.com/photo.jpg" - Download a dataset: url="https://example.com/data.csv", save_dir="./data" - Download a PDF: url="https://example.com/paper.pdf", save_name="research.pdf" Args: url: The complete URL of the file to download (must start with http:// or https://). save_name: Optional filename to save with. If not provided, uses the filename from URL. save_dir: Optional directory (path) to save the file. If not provided, saves to a temporary file in a temporary directory. Recommended to specify absolute path. Returns: A dictionary with the following fields: - path: str or None -- Final path to the downloaded file. - orig_name: str or None -- Original filename. - size: str or None -- Formatted file size. - content_type: str or None -- Content type of the file. - error: str or None -- Error message if download fails (mutually exclusive with others). """ import re import tempfile from pathlib import Path from urllib.parse import unquote, urlparse result: dict[str, str | None] = { 'path': None, 'orig_name': None, 'size': None, 'content_type': None, 'error': None, } try: import requests except ImportError: result['error'] = ( 'ERROR: Required lib `requests` not installed. Install with: `pip install requests`' ) return result # Validate URL if not url or not url.strip(): result['error'] = 'ERROR: URL cannot be empty.' return result url = url.strip() if not url.startswith(('http://', 'https://')): result['error'] = 'ERROR: URL must start with http:// or https://' return result # Validate URL format try: parsed = urlparse(url) if not parsed.netloc: result['error'] = 'ERROR: Invalid URL format - missing domain name.' return result except Exception as e: result['error'] = f'ERROR: Invalid URL format - {str(e)}' return result # Determine filename if save_name: # Sanitize custom filename save_name = re.sub(r'[<>:"/\\|?*]', '_', save_name) else: # Extract from URL path = unquote(parsed.path) save_name = Path(path).name if not save_name or save_name == '/': save_name = 'downloaded_file' # Browser-like headers to avoid 403 errors headers = { 'User-Agent': ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ), 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', } try: # Make request with streaming for large files response = requests.get(url, headers=headers, timeout=30, stream=True, allow_redirects=True) # Check for HTTP errors if response.status_code == 403: result['error'] = ( f'ERROR: Access forbidden (403) for: {url}\n' 'The website is blocking automated access. Possible reasons:\n' '- Website requires login/authentication\n' '- Website blocks bots/scrapers\n' '- Geographic restrictions\n' 'Try accessing the URL in a browser first to verify it works.' ) return result if response.status_code == 404: result['error'] = f'ERROR: File not found (404) at {url}' return result if response.status_code == 429: result['error'] = ( 'ERROR: Too many requests (429). The server is rate limiting. Wait and retry.' ) return result if response.status_code >= 400: result['error'] = f'ERROR: HTTP {response.status_code} - {response.reason}\nURL: {url}' return result response.raise_for_status() # Check content length content_length = response.headers.get('Content-Length') if content_length: size_mb = int(content_length) / (1024 * 1024) if size_mb > 100: result['error'] = ( f'ERROR: File too large ({size_mb:.1f} MB). Maximum supported size is 100 MB.' ) return result final_path = None f = None if save_dir: try: p = Path(save_dir) p.mkdir(parents=True, exist_ok=True) final_path = p / save_name f = open(final_path, 'wb') except Exception: # pylint: disable=broad-exception-caught final_path = None if not final_path: # Create temp file with proper extension file_ext = Path(save_name).suffix # pylint: disable=consider-using-with f = tempfile.NamedTemporaryFile(delete=False, suffix=file_ext, prefix='kodeagent_') final_path = Path(f.name) try: downloaded_size = 0 chunk_size = 8192 for chunk in response.iter_content(chunk_size=chunk_size): if chunk: f.write(chunk) downloaded_size += len(chunk) # Safety check during download if downloaded_size > 100 * 1024 * 1024: # 100 MB f.close() if final_path.exists(): final_path.unlink() result['error'] = 'ERROR: File exceeded 100 MB during download. Aborted.' return result finally: if f: f.close() # Normalize path for cross-platform compatibility final_path_str = str(final_path.as_posix()) # Get actual file size actual_size = final_path.stat().st_size size_str = ( f'{actual_size / 1024:.1f} KB' if actual_size < 1024 * 1024 else f'{actual_size / (1024 * 1024):.1f} MB' ) result['path'] = final_path_str result['orig_name'] = save_name result['size'] = size_str result['content_type'] = response.headers.get('Content-Type', 'unknown') return result except requests.exceptions.Timeout: result['error'] = ( 'ERROR: Download timed out after 30 seconds.\n' 'The file may be too large or the server is slow. Try again.' ) return result except requests.exceptions.ConnectionError as e: result['error'] = ( f'ERROR: Connection failed - {str(e)}\nPossible causes:\n' '- No internet connection\n' '- Invalid domain name\n' '- Server is down' ) return result except requests.exceptions.RequestException as e: result['error'] = f'ERROR: Download failed - {str(e)}' return result except Exception as e: result['error'] = f'ERROR: Unexpected error - {type(e).__name__}: {str(e)}' return result
[docs] def extract_as_markdown(url_or_path: str, max_length: int = 20000) -> str: """Extract content from documents (PDF, DOCX, XLSX, PPTX) as Markdown text. Works with both URLs and local file paths. Supported formats: - PDF files (.pdf) - Word documents (.docx) - Excel spreadsheets (.xlsx) - PowerPoint presentations (.pptx) For reading HTML web pages, use 'read_webpage' instead (faster and cleaner). Examples: - Extract from PDF: "https://example.com/paper.pdf" - Extract from local file: "/tmp/document.docx" - Extract from Excel: "https://example.com/data.xlsx" Args: url_or_path: URL or file path to a PDF, DOCX, XLSX, or PPTX file. max_length: Optional limit on output length in characters. Use this to truncate very long documents (may lose information). Returns: Document content as Markdown text, or an error message if extraction fails. """ import re from pathlib import Path from urllib.parse import urlparse try: from markitdown import MarkItDown except ImportError: return 'ERROR: Required lib `markitdown` is missing. Install with: `pip install markitdown`' # Validate input if not url_or_path or not url_or_path.strip(): return 'ERROR: url_or_path cannot be empty.' url_or_path = url_or_path.strip() # Check if it's a URL or file path is_url = url_or_path.startswith(('http://', 'https://')) if is_url: # Validate URL format try: parsed = urlparse(url_or_path) if not parsed.netloc: return 'ERROR: Invalid URL format - missing domain name.' except Exception as e: return f'ERROR: Invalid URL format - {str(e)}' else: # Validate file path path_obj = Path(url_or_path) if not path_obj.exists(): return f'ERROR: File not found at path: {url_or_path}' if not path_obj.is_file(): return f'ERROR: Path is not a file: {url_or_path}' # Determine file type if is_url: file_ext = Path(urlparse(url_or_path).path).suffix.lower() else: file_ext = Path(url_or_path).suffix.lower() # Check supported formats supported_formats = {'.pdf', '.docx', '.xlsx', '.pptx'} if file_ext not in supported_formats: return ( f'ERROR: Unsupported file format `{file_ext}`\n' f'Supported formats: {", ".join(supported_formats)}\n\n' 'For webpages use `read_webpage`. For other file types, no extraction tool' ' is available. Use `download_file` to download the file first.' ) # Validate max_length if max_length < 100: max_length = 100 elif max_length > 1000000: max_length = 1000000 # Cap at 1M chars try: # Initialize MarkItDown md = MarkItDown() # Convert document try: result = md.convert(url_or_path) text = result.text_content except Exception as convert_error: error_str = str(convert_error).lower() # Provide helpful error messages if '403' in error_str or 'forbidden' in error_str: return ( 'ERROR: Access forbidden (403) when trying to download from URL.\n' 'The server is blocking automated access.\n\n' 'Solution: Use `download_file` tool first to save it locally,' ' then use this tool with the local file path.' ) if '404' in error_str or 'not found' in error_str: return f'ERROR: File not found (404) at URL: {url_or_path}' if 'timeout' in error_str: return 'ERROR: Request timed out. The file may be too large or server is slow.' if 'pdf' in error_str and file_ext == '.pdf': return ( 'ERROR: Failed to extract PDF content.\nThe PDF may be:\n' '- Scanned images without OCR text\n' '- Password protected\n' '- Corrupted or malformed\n\n' f'Original error: {str(convert_error)}' ) raise # Re-raise for generic handling below if not text: return ( f'ERROR: No content could be extracted from the {file_ext} file.\n' 'The file may be empty, corrupted, or contain only images.' ) # Clean up the text text = text.strip() # Handle PDF-specific issues (CID characters) if file_ext == '.pdf': # Remove common PDF artifacts text = re.sub(r'\(cid:\d+\)', '', text) # Remove CID references text = re.sub(r'\x00', '', text) # Remove null bytes # Remove excessive whitespace text = re.sub(r'\n{4,}', '\n\n\n', text) # Max 3 newlines text = re.sub(r' {3,}', ' ', text) # Max 2 spaces # Get file info for output if is_url: source_info = f'URL: {url_or_path}' else: path_obj = Path(url_or_path) file_size = path_obj.stat().st_size size_str = ( f'{file_size / 1024:.1f} KB' if file_size < 1024 * 1024 else f'{file_size / (1024 * 1024):.1f} MB' ) source_info = f'File: {path_obj.name} ({size_str})' # Truncate if needed original_length = len(text) if max_length and len(text) > max_length: text = text[:max_length] truncation_msg = ( f'\n\n---\n**[Content truncated from {original_length:,}' f' to {max_length:,} characters]**' ) else: truncation_msg = '' # Format output output = f'# Extracted Content\n\n**Source:** {source_info}\n' output += f'**Format:** {file_ext.upper()}\n' output += f'**Length:** {original_length:,} characters\n\n---\n\n' output += text output += truncation_msg return output except ImportError as e: return ( f'ERROR: Missing required library for {file_ext} files.\n' f'Install with: `pip install markitdown[pdf]`\nDetails: {str(e)}' ) except MemoryError: return ( 'ERROR: File too large to process (out of memory).\n' 'Try using max_length parameter to limit output size.' ) except Exception as e: error_type = type(e).__name__ return f'ERROR: {error_type} - {str(e)}'
[docs] def read_webpage(url: str, max_length: int = 20000) -> str: """Fetch and return the main text content from an HTML webpage as clean Markdown. Use this after search_web to read articles, blogs, or documentation. For PDF, DOCX, or XLSX files, use `extract_as_markdown` tool instead. Args: url: The complete URL of the webpage (must start with http:// or https://). max_length: Maximum characters to return (default 20000). Returns: Clean webpage content as Markdown text, or an error message. """ import re from urllib.parse import urlparse try: import requests from bs4 import BeautifulSoup except ImportError as e: missing_lib = 'requests' if 'requests' in str(e) else 'beautifulsoup4' return ( f'ERROR: Required library `{missing_lib}` not installed.\n' f'Install with: `pip install {missing_lib}`' ) # Validate URL if not url or not url.strip(): return 'ERROR: URL cannot be empty.' url = url.strip() if not url.startswith(('http://', 'https://')): return 'ERROR: URL must start with http:// or https://' try: parsed = urlparse(url) if not parsed.netloc: return 'ERROR: Invalid URL format - missing domain name.' except Exception as e: return f'ERROR: Invalid URL format - {str(e)}' # Check if URL points to a document file path_lower = parsed.path.lower() doc_extensions = ('.pdf', '.docx', '.xlsx', '.pptx', '.doc', '.xls', '.ppt') if any(path_lower.endswith(ext) for ext in doc_extensions): ext = next(ext for ext in doc_extensions if path_lower.endswith(ext)) return ( f'ERROR: URL points to a document file ({ext}), not a webpage.\n' 'Use `extract_as_markdown` tool instead for document files. Try `download_file`' ' to verify the file can be accessed and downloaded.' ) # Validate max_length if max_length < 100: max_length = 100 elif max_length > 50000: max_length = 50000 # Cap at 50K chars # Browser-like headers to avoid 403 errors headers = { 'User-Agent': ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', 'Referer': 'https://www.google.com/', } try: response = requests.get(url, headers=headers, timeout=20, allow_redirects=True) # Handle HTTP errors if response.status_code == 403: return ( f'ERROR: Access forbidden (403) for {url}\n\n' 'The website is blocking automated access. This could be because:\n' '1. The site requires login/authentication\n' '2. The site uses anti-bot protection (Cloudflare, etc.)\n' '3. Geographic restrictions apply\n' '4. The site blocks all automated tools\n\n' 'Suggestions:\n' '- Verify the URL works in your browser\n' '- Check if the site has an API\n' '- Try a different source for the same information' ) if response.status_code == 404: return f'ERROR: Page not found (404). The URL may be incorrect:\n{url}' if response.status_code == 429: return 'ERROR: Rate limited (429). Too many requests. Wait before retrying.' if response.status_code == 503: return 'ERROR: Service unavailable (503). The website may be down. Try again later.' if response.status_code >= 400: return f'ERROR: HTTP {response.status_code} - {response.reason}\nURL: {url}' response.raise_for_status() # Check content type content_type = response.headers.get('Content-Type', '').lower() if 'text/html' not in content_type and 'application/xhtml' not in content_type: if 'application/pdf' in content_type: return ( 'ERROR: URL points to a PDF file, not a webpage.\n' 'Use `extract_as_markdown` instead.' ) return ( f'ERROR: URL does not point to a webpage (Content-Type: {content_type}).\n' 'For non-HTML content, use `download_file` or `extract_as_markdown`.' ) # Parse HTML soup = BeautifulSoup(response.content, 'html.parser') # Remove unwanted elements for element in soup( [ 'script', 'style', 'nav', 'footer', 'header', 'aside', 'iframe', 'noscript', 'svg', 'form', 'button', '[role="navigation"]', '[role="banner"]', '[role="complementary"]', '.advertisement', '.ad', '.sidebar', '.menu', '.navigation', ] ): element.decompose() # Try to find main content area for selector in [ 'main', 'article', '[role="main"]', '.main-content', '#main-content', '#content', '.post-content', '.entry-content', '.article-content', '.page-content', ]: main_content = soup.select_one(selector) if main_content: break if not main_content: main_content = soup.body if soup.body else soup # Extract text text = main_content.get_text(separator='\n', strip=True) # Clean up text lines = [line.strip() for line in text.split('\n') if line.strip()] text = '\n\n'.join(lines) # Remove excessive whitespace text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r' {2,}', ' ', text) if not text: return ( f'ERROR: Could not extract meaningful content from {url}\nThe page may be:\n' '- Dynamically loaded with JavaScript (not supported)\n' '- Behind a login wall\n' '- Empty or malformed' ) # Get page title title = soup.title.string if soup.title else 'No title' title = title.strip() # Truncate if needed original_length = len(text) if len(text) > max_length: text = text[:max_length] truncated_msg = ( f'\n\n---\n**[Content truncated from {original_length:,}' f' to {max_length:,} characters]**' ) else: truncated_msg = '' # Format output output = f'# {title}\n\n**Source:** {url}\n' output += f'**Length:** {original_length:,} characters\n\n---\n\n' output += text output += truncated_msg return output except requests.exceptions.Timeout: return 'ERROR: Request timed out after 20s. The website may be slow or unresponsive.' except requests.exceptions.ConnectionError: return ( f'ERROR: Could not connect to {url}\nPossible causes:\n' '- No internet connection\n' '- Invalid domain name\n' '- Server is down' ) except requests.exceptions.RequestException as e: return f'ERROR: Request failed - {str(e)}' except Exception as e: return f'ERROR: Unexpected error - {type(e).__name__}: {str(e)}'
[docs] def search_wikipedia(query: str, max_results: int = 3) -> str: """Search Wikipedia (only) and return the top search results as Markdown text. The input should be a search query. The output will contain the title, summary, and link to the Wikipedia page. Args: query: The search query string. max_results: The max. no. of search results to consider (default 3). Returns: The search results in Markdown format. """ try: import wikipedia except ImportError: return '`wikipedia` was not found! Please run `pip install wikipedia`' try: results = wikipedia.search(query, results=max_results) if not results: return 'No results found! Try a less restrictive/shorter query.' markdown_results = [] disambiguation_options: list[str] | None = None for title in results: try: page = wikipedia.page(title, auto_suggest=False) markdown_results.append(f'### [{page.title}]({page.url})\n{page.summary}') except wikipedia.exceptions.DisambiguationError as de: # Capture options from first disambiguation hit for fallback message if disambiguation_options is None: disambiguation_options = de.options[:5] continue except wikipedia.exceptions.PageError: continue if not markdown_results: if disambiguation_options: options_str = ', '.join(disambiguation_options) return ( f'Ambiguous query. Wikipedia suggests these specific topics: {options_str}.' ' Please retry with one of these exact terms as the search query.' ) return ( 'No unambiguous results found. Try a more specific query, e.g.,' ' "artificial neural network" instead of "neural networks".' ) return '\n\n'.join(markdown_results) except wikipedia.exceptions.DisambiguationError as de: # Outer catch: triggered when wikipedia.search() itself raises disambiguation # before returning a results list; less common but possible options_str = ', '.join(de.options[:5]) return ( f'ERROR: Ambiguous query. Wikipedia suggests these specific topics: {options_str}.' ' Please retry with one of these exact terms as the search query.' ) except Exception as e: return f'ERROR: While searching Wikipedia: {str(e)}'
[docs] def search_arxiv(query: str, max_results: int = 5) -> str: """Search for academic papers on arXiv.org. The input is a search query. This tool is highly specialized and should be used exclusively for finding scientific and academic papers. It returns the top search results with the title, authors, summary, and a link to the PDF. Args: query: The search query string for the paper. max_results: The maximum number of search results to return (default is 5). Returns: The search results in Markdown format or a message indicating no results were found. """ try: import arxiv # Construct the default API client client = arxiv.Client() search = arxiv.Search( query=query, max_results=max_results, sort_by=arxiv.SortCriterion.Relevance ) results = list(client.results(search)) if not results: return f'No results found for the query: {query}' output = f'## ArXiv Search Results for: {query}\n\n' for result in results: authors = ', '.join([author.name for author in result.authors]) output += f'### [{result.title}]({result.pdf_url})\n' output += f'**Authors:** {authors}\n' output += f'**Abstract:** {result.summary}\n' output += f'**Published:** {result.published.strftime("%Y-%m-%d")}\n\n' return output except ImportError: # Patch to 0.12.1 msg = ( 'ERROR: An error occurred: the required library `arxiv` not installed.' ' Install with: `pip install arxiv`' '\nYou may need to relax the version of requests lib and reinstall.' ) print(msg) return msg except Exception as e: return f'ERROR: An error occurred during the arXiv search: {str(e)}'
[docs] def transcribe_youtube(video_id: str) -> str: """Get the transcript or subtitles for a YouTube video by its video ID. The video ID is the part after '?v=' in the URL. For example: https://www.youtube.com/watch?v=aBc4E has video ID 'aBc4E'. Args: video_id: YouTube video ID (not the full URL). Returns: The transcript text of the video, or an error message if unavailable. """ from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api import _errors as yt_errors try: transcript = YouTubeTranscriptApi().fetch(video_id) transcript_text = ' '.join([item.text for item in transcript.snippets]) except yt_errors.TranscriptsDisabled: return '*** ERROR: Subtitles are disabled for this video. Cannot retrieve transcript.' except yt_errors.NoTranscriptFound: return '*** ERROR: No transcript found for this video.' except Exception as e: return f'*** ERROR: YouTube transcript retrieval failed: {e}' return transcript_text
[docs] def transcribe_audio(file_path: str) -> str: """Convert audio files to text using Fireworks AI Whisper API. The input should be a path to an audio file (e.g., .mp3, .wav, .flac). Args: file_path: Local file system path to the audio file. Returns: The transcript of the audio file as text, or an error message. """ import os api_key = os.getenv('FIREWORKS_API_KEY') if not api_key: return 'ERROR: FIREWORKS_API_KEY environment variable is not set.' try: import requests except ImportError: return ( 'ERROR: Audio transcription error: `requests` library not found.' ' Please install it with `pip install requests`.' ) try: with open(file_path, 'rb') as f: response = requests.post( 'https://audio-turbo.api.fireworks.ai/v1/audio/transcriptions', headers={'Authorization': f'Bearer {api_key}'}, files={'file': f}, data={ 'model': 'whisper-v3-turbo', 'temperature': '0', 'vad_model': 'silero', }, timeout=30, ) if response.status_code == 200: text = response.json().get('text', '').strip() if not text: return 'Error: Transcription succeeded but returned empty text.' return text return f'ERROR: Audio transcription error: {response.status_code}: {response.text}' except FileNotFoundError: return f'ERROR: Audio file not found: {file_path}' except requests.exceptions.Timeout: return 'ERROR: Request timed out after 30s. Audio file may be too large.' except Exception as e: return f'ERROR: {type(e).__name__}: {str(e)}'
[docs] def generate_image(prompt: str, model_name: str) -> str: """Generate an image based on a text prompt using the specified model. It returns the image URL or the file path of the generated image. Args: prompt: Text description of the desired image. model_name: The name of the image generation model to use. Returns: The file path or URL of the generated image or error message. """ import base64 import os import litellm try: response = litellm.image_generation(prompt=prompt, model=model_name) # Check for empty data list if not response.data or len(response.data) == 0: return ( 'ERROR: Image generation returned no data. The API may have rejected ' 'the prompt or encountered an error.' ) image_data = response.data[0] # 1. If a URL is provided, return it if image_data.url: return image_data.url # 2. If URL is None, check for b64_json and save it locally if hasattr(image_data, 'b64_json') and image_data.b64_json: file_path = 'generated_image.png' with open(file_path, 'wb') as f: f.write(base64.b64decode(image_data.b64_json)) return os.path.abspath(file_path) return 'ERROR: No image data (URL or Base64) found in response.' except Exception as ex: return f'ERROR: Image generation failed: {ex}'
if __name__ == '__main__': img_url = generate_image( prompt='A futuristic cityscape at sunset, with flying cars and neon lights', model_name='gemini/imagen-4.0-generate-001', ) print(f'Generated image URL: {img_url}')