Source code for kodeagent.code_runner

"""Run Python code generated by an LLM in a given environment. Currently, supports local host and
E2B sandbox execution. Also, defines custom exceptions for security and environment errors.

User Code

[1] Syntax Check (AST parse)

[2] Import Whitelist

[3] Pattern Detection - Catches obfuscation

[4] LLM Review

[5] Execute Code (locally or in Sandbox)
"""

import os
import re
import shutil
import subprocess as sp
import sys
import tempfile
import warnings
from abc import ABC, abstractmethod
from typing import Literal, NamedTuple

from . import kutils as ku
from .code_reviewer import CodeSecurityReviewer
from .pattern_detector import analyze_code_patterns
from .usage_tracker import UsageTracker

CODE_ENV_NAMES = Literal['host', 'docker', 'e2b']
"""Allowed code execution environment names."""

DEFAULT_ALLOWED_IMPORTS = {
    'ast',
    'operator',
    're',  # calculator tool
    'time',
    'random',
}
""" Default allowed imports for code execution."""

# Check for the use of dangerous builtins
DANGEROUS_BUILTINS = {'exec', 'eval', '__import__', 'compile'}
"""Dangerous built-in functions that are not allowed in code execution."""

logger = ku.get_logger()


[docs] class CodeSecurityError(Exception): """Exception raised for security violations in code execution."""
[docs] class UnknownCodeEnvError(Exception): """Exception raised for unknown code execution environments."""
[docs] class CodeRunResult(NamedTuple): """Named tuple for code run results.""" stdout: str stderr: str return_code: int generated_files: list[str]
[docs] class CodeRunnerEnv(ABC): """Abstract base class for code execution environments."""
[docs] def __init__(self, work_dir: str | None = None): """Initialize the code runner environment. Args: work_dir: Optional local workspace directory. Output files from code execution will be stored here. It will be used only if the path already exists. Otherwise, a temporary directory will be created. """ if work_dir: if os.path.isabs(work_dir): if os.path.exists(work_dir): self.work_dir = os.path.abspath(work_dir) else: self.work_dir = tempfile.mkdtemp(prefix='kodeagent_run_') else: abs_path = os.path.abspath(work_dir) if os.path.exists(abs_path): self.work_dir = abs_path else: self.work_dir = tempfile.mkdtemp(prefix='kodeagent_run_') else: self.work_dir = tempfile.mkdtemp(prefix='kodeagent_run_') logger.info('Local workspace dir for copying files: %s', self.work_dir) self.local_modules_to_copy = []
@property def effective_work_dir(self) -> str: """Return the effective working directory specified or creating a temporary one if needed. Returns: The effective working directory path. """ return self.work_dir
[docs] @abstractmethod async def run(self, source_code: str, task_id: str, timeout: int) -> CodeRunResult: """Execute Python code in the environment. Args: source_code: The Python source code to execute. task_id: Unique task identifier. timeout: Timeout for code execution. Returns: A tuple of (stdout, stderr, return_code, generated_files). """
[docs] @abstractmethod async def download_files_from_remote(self, remote_paths: list[str]) -> list[str]: """Download files from the environment to the local work_dir."""
[docs] def cleanup(self): """Clean up environment resources. Implementation is left to the subclass if required. Cleaning up resources may have a side effect of removing the files produced by the code. """ pass
[docs] class HostCodeRunnerEnv(CodeRunnerEnv): """Execution environment for the local host."""
[docs] async def run(self, source_code: str, task_id: str, timeout: int) -> CodeRunResult: """Execute Python code on the local host. Args: source_code: The Python source code to execute. task_id: Unique task identifier. timeout: Timeout for code execution. Returns: A tuple of (stdout, stderr, return_code, generated_files). """ warnings.warn( 'You are running LLM-generated code on your host. This could be potentially' ' dangerous! Please consider using a different code runner environment.', UserWarning, ) code_file_path = os.path.join(self.work_dir, 'task_code.py') with open(code_file_path, mode='w+', encoding='utf-8') as code_file: code_file.write(source_code) for a_file in self.local_modules_to_copy: shutil.copy2(os.path.join(os.path.dirname(__file__), a_file), self.work_dir) # List files before to identify NEW files files_before = set(os.listdir(self.work_dir)) result = sp.run( [sys.executable, 'task_code.py'], shell=False, capture_output=True, text=True, timeout=timeout, cwd=self.work_dir, check=False, encoding='utf-8', ) files_after = set(os.listdir(self.work_dir)) new_files = list(files_after - files_before) # Identify generated files (anything in temp_dir that wasn't there originally) # excluded_files = {'task_code.py'} | set(self.local_modules_to_copy) generated_files = [] for item in new_files: full_path = os.path.join(self.work_dir, item) if os.path.isfile(full_path): generated_files.append(full_path) return CodeRunResult(result.stdout, result.stderr, result.returncode, generated_files)
[docs] async def download_files_from_remote(self, remote_paths: list[str]) -> list[str]: """On host, files are already local. Args: remote_paths: List of absolute paths in the host environment. Returns: The same list of paths, as they are already local. """ return remote_paths
[docs] class E2BCodeRunnerEnv(CodeRunnerEnv): """Execution environment for the E2B sandbox."""
[docs] def __init__( self, work_dir: str | None = None, env_vars: dict[str, str] | None = None, pip_packages_str: str | None = None, ): """Initialize the E2B code runner environment. Args: work_dir: Optional local workspace directory. Output files from code execution will be stored here. It will be used only if the path already exists. Otherwise, a temporary directory will be created. env_vars: Optional environment variables to set in the E2B sandbox. pip_packages_str: Optional string of pip packages to install in the E2B sandbox. """ super().__init__(work_dir) self.env_vars = env_vars or {} self.pip_packages_str = pip_packages_str self._sbx = None
async def _get_sandbox(self, task_id: str, timeout: int): """Create or return the existing sandbox. Args: task_id: Unique task identifier. timeout: Timeout for code execution. Returns: An E2B Sandbox instance. """ if self._sbx: return self._sbx try: import e2b_code_interpreter as e2b except ModuleNotFoundError: logger.critical( 'The module `e2b_code_interpreter` was not found. Please install E2B as:' ' `pip install e2b-code-interpreter`\nExecution will halt now.' ) sys.exit(-1) self._sbx = e2b.Sandbox.create( timeout=timeout + 15, envs=self.env_vars, metadata={'task_id': task_id} ) if self.pip_packages_str: self._sbx.commands.run(f'pip install {self.pip_packages_str}') for a_file in self.local_modules_to_copy: with open(os.path.join(os.path.dirname(__file__), a_file), encoding='utf-8') as py_file: self._sbx.files.write(f'/home/user/{a_file}', py_file.read()) logger.info('Copied file %s...', a_file) return self._sbx
[docs] async def run(self, source_code: str, task_id: str, timeout: int) -> CodeRunResult: """Execute Python code in the E2B sandbox. Args: source_code: The Python source code to execute. task_id: Unique task identifier. timeout: Timeout for code execution. Returns: A tuple of (stdout, stderr, return_code, generated_files). """ sbx = await self._get_sandbox(task_id, timeout) logger.debug('E2B sandbox: %s', sbx.get_info()) # List files before to identify NEW files files_before = set(f.path for f in sbx.files.list('/home/user')) execution = sbx.run_code(code=source_code, timeout=timeout) # List files after files_after = set(f.path for f in sbx.files.list('/home/user')) new_files = list(files_after - files_before) std_out: str = '\n'.join(execution.logs.stdout) std_err: str = '\n'.join(execution.logs.stderr) if execution.error: std_err += f'\n{execution.error.name}\n{execution.error.value}' ret_code: int = -1 if execution.error else 0 return CodeRunResult(std_out, std_err, ret_code, new_files)
[docs] async def download_files_from_remote(self, remote_paths: list[str]) -> list[str]: """Download files from the E2B sandbox to the local work_dir. Args: remote_paths: List of absolute paths in the E2B sandbox. Returns: List of local absolute paths for the downloaded files. """ if not remote_paths or not self._sbx: return [] local_dir = self.effective_work_dir local_files = [] for remote_path in remote_paths: try: # CRITICAL: Always read as bytes from E2B to prevent corruption of binary files # E2B's files.read() returns a string by default, which corrupts binary data # like images during UTF-8 decode/encode. Using format='bytes' ensures we get # the raw binary content. content = self._sbx.files.read(remote_path, format='bytes') filename = os.path.basename(remote_path) local_path = os.path.join(local_dir, filename) # Since we always read as bytes, always write in binary mode # This works for both binary files (images, PDFs) and text files with open(local_path, 'wb') as f: f.write(content) local_files.append(local_path) logger.info('Downloaded file from E2B: %s -> %s', remote_path, local_path) except Exception as e: logger.error('Failed to download file %s from E2B: %s', remote_path, e) return local_files
[docs] def cleanup(self): """Close the sandbox and clean up.""" if self._sbx: try: self._sbx.kill() except Exception as e: logger.warning('Failed to close E2B sandbox: %s', e) self._sbx = None super().cleanup()
[docs] class CodeRunner: """Run Python code generated by an LLM in a given environment."""
[docs] def __init__( self, env: CODE_ENV_NAMES, allowed_imports: list[str], model_name: str, pip_packages: str | None = None, timeout: int = 30, env_vars_to_set: dict[str, str] | None = None, litellm_params: dict | None = None, work_dir: str | None = None, usage_tracker: UsageTracker | None = None, tool_names: set[str] | None = None, ): """Create an environment to run Python code. Args: env: The code execution environment. Must be a string from `CODE_ENV_NAMES`. allowed_imports: A list of Python modules that are allowed to be imported. model_name: The LLM model name to use for security review. pip_packages: Optional Python libs to be installed by `pip` [E2B]. timeout: Code execution timeout (default 30s). env_vars_to_set: Optional environment variables to set in the code execution environment (E2B only). litellm_params: Optional parameters for LiteLLM. work_dir: Optional local workspace directory. usage_tracker: Optional UsageTracker instance. tool_names: Optional set of whitelisted tool names provided by the user. """ self.allowed_imports: set[str] = set(allowed_imports).union(DEFAULT_ALLOWED_IMPORTS) self.env_name: CODE_ENV_NAMES = env self.work_dir = work_dir if pip_packages and len(pip_packages.strip()) > 0: self.pip_packages: list[str] = re.split('[,;]', pip_packages) else: self.pip_packages = [] self.default_timeout = timeout self.pip_packages_str = ' '.join(self.pip_packages) if self.pip_packages else None self.code_reviewer = CodeSecurityReviewer( model_name=model_name, litellm_params=litellm_params, usage_tracker=usage_tracker, tool_names=tool_names, ) # Initialize the implementation if self.env_name == 'host': self.env_impl = HostCodeRunnerEnv(work_dir=self.work_dir) elif self.env_name == 'e2b': self.env_impl = E2BCodeRunnerEnv( work_dir=self.work_dir, env_vars=env_vars_to_set, pip_packages_str=self.pip_packages_str, ) else: raise UnknownCodeEnvError(f'Unsupported code execution env: {self.env_name}')
@property def local_modules_to_copy(self) -> list[str]: """Get the list of local modules to copy to the execution environment. Returns: A list of local module filenames. """ return self.env_impl.local_modules_to_copy @local_modules_to_copy.setter def local_modules_to_copy(self, value: list[str]): """Set the list of local modules to copy to the execution environment. Args: value: A list of local module filenames. """ self.env_impl.local_modules_to_copy = value
[docs] def check_imports(self, code: str) -> set[str]: """Check for disallowed imports in code, allowing submodules. Args: code: The Python source code to check. Returns: A set of disallowed import module names found in the code. Raises: CodeSecurityError: If dangerous builtins are used. """ import ast tree = ast.parse(code) imported_modules = set() for node in ast.walk(tree): if isinstance(node, ast.Name) and node.id in DANGEROUS_BUILTINS: raise CodeSecurityError(f'Forbidden builtin: {node.id}') if isinstance(node, ast.Import): for alias in node.names: imported_modules.add(alias.name) elif isinstance(node, ast.ImportFrom): if node.module: imported_modules.add(node.module) # Filter imports: allow if exact match or if it starts with 'allowed_name.' disallowed = { imp for imp in imported_modules if not any( imp == allowed or imp.startswith(f'{allowed}.') for allowed in self.allowed_imports ) } return disallowed
[docs] async def run(self, tools_code: str, generated_code: str, task_id: str) -> CodeRunResult: """Run Python code in pre-specified environment after security review. Args: tools_code: The Python source code for agent tools. generated_code: The Python source code generated to solve a task. task_id: Unique task identifier for tracking. Returns: A tuple of (stdout, stderr, return_code, generated_files). Raises: UnknownCodeEnvError: If the specified environment is unsupported. CodeSecurityError: If the code fails security review. """ import ast source_code = f'{tools_code}\n\n{generated_code}' try: ast.parse(source_code) except SyntaxError as se: return CodeRunResult( '', f'Code parsing failed due to: {type(se).__name__}\n{se.text}\nError: {str(se)}', -1, [], ) disallowed_imports: set = self.check_imports(source_code) if len(disallowed_imports) > 0: modules = '\n'.join(list(disallowed_imports)) logger.error('CodeRunner found disallowed imports: %s', modules) return CodeRunResult( '', f'The following imports are disallowed:{modules}' '\nPlease only use the allowed modules for importing.', -1, [], ) # Security review before execution logger.debug('Performing static analysis of code...') is_safe, reason, _ = analyze_code_patterns(generated_code) if not is_safe: raise CodeSecurityError(f'Pattern detection blocked: {reason}') logger.debug('Performing security review of code...') review_result = await self.code_reviewer.review(generated_code) if not review_result.is_secure: logger.error('Code failed security review: %s', review_result.reason) raise CodeSecurityError( f'Code execution blocked due to security concerns: {review_result.reason}' ) logger.info('Code security review passed: %s', review_result.reason) return await self.env_impl.run(source_code, task_id, self.default_timeout)
[docs] async def download_files_from_remote(self, remote_paths: list[str]) -> list[str]: """Download files from the remote environment to the local workspace. Args: remote_paths: List of absolute paths in the remote environment. Returns: List of local absolute paths for the downloaded files. """ return await self.env_impl.download_files_from_remote(remote_paths)
[docs] def cleanup(self): """Clean up resources in the environment.""" self.env_impl.cleanup()