Source code for noxrunner.backend.local

"""
Local filesystem backend for NoxRunner sandbox execution.

WARNING: This backend executes commands in the local environment.
Use with extreme caution as it can cause data loss or security risks.
"""

import os
import shutil
import subprocess
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, List, Optional, Union

from noxrunner.backend.base import SandboxBackend
from noxrunner.fileops.tar_handler import TarHandler
from noxrunner.security.command_validator import CommandValidator
from noxrunner.security.path_sanitizer import PathSanitizer


[docs] class LocalBackend(SandboxBackend): """ Local filesystem backend for offline testing. WARNING: This backend executes commands in the local environment using temporary directories. It should ONLY be used for testing purposes. Using this in production can cause severe data loss or security risks. """
[docs] def __init__(self, base_dir: str = "/tmp"): """ Initialize local sandbox backend. Args: base_dir: Base directory for sandbox storage (default: /tmp) """ self.base_dir = Path(base_dir) self._sandboxes: Dict[str, Dict] = {} # session_id -> sandbox info # Initialize security and file operation utilities self.validator = CommandValidator() self.sanitizer = PathSanitizer() self.tar_handler = TarHandler() # Print warning on initialization self._print_warning( "Local sandbox mode is enabled. This executes commands in your local environment.", "⚠️ Using local sandbox can cause SEVERE DATA LOSS or SECURITY RISKS! ⚠️", )
def _print_warning(self, message: str, critical: Optional[str] = None): """Print a warning message to stderr.""" warning_prefix = "\033[91m\033[1m⚠️ WARNING\033[0m\033[91m" if critical: warning_prefix = "\033[91m\033[1m🚨 CRITICAL WARNING\033[0m\033[91m" # Print with clear formatting print("", file=sys.stderr) # Empty line for visibility print(f"{warning_prefix}: {message}\033[0m", file=sys.stderr) if critical: print(f"\033[91m\033[1m{critical}\033[0m", file=sys.stderr) print("", file=sys.stderr) # Empty line for visibility def _get_sandbox_path(self, session_id: str) -> Path: """Get the sandbox directory path for a session.""" # Sanitize session_id to prevent path traversal safe_id = "".join(c for c in session_id if c.isalnum() or c in ("-", "_")) if not safe_id: safe_id = "default" return self.base_dir / f"noxrunner_sandbox_{safe_id}" def _ensure_sandbox(self, session_id: str) -> Path: """Ensure sandbox directory exists and return its path.""" sandbox_path = self._get_sandbox_path(session_id) sandbox_path.mkdir(parents=True, exist_ok=True) # Create workspace directory workspace = sandbox_path / "workspace" workspace.mkdir(exist_ok=True) return sandbox_path
[docs] def health_check(self) -> bool: """Check if the local sandbox backend is healthy.""" return True
[docs] def create_sandbox( self, session_id: str, ttl_seconds: int = 900, image: Optional[str] = None, cpu_limit: Optional[str] = None, memory_limit: Optional[str] = None, ephemeral_storage_limit: Optional[str] = None, ) -> dict: """ Create or ensure a sandbox exists. Args: session_id: Unique session identifier ttl_seconds: Time to live in seconds image: Container image (ignored in local mode) cpu_limit: CPU limit (ignored in local mode) memory_limit: Memory limit (ignored in local mode) ephemeral_storage_limit: Storage limit (ignored in local mode) Returns: Dict with 'podName' and 'expiresAt' """ sandbox_path = self._ensure_sandbox(session_id) expires_at = datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds) self._sandboxes[session_id] = { "path": sandbox_path, "created_at": datetime.now(timezone.utc), "expires_at": expires_at, "ttl_seconds": ttl_seconds, } return {"podName": f"local-{session_id}", "expiresAt": expires_at.isoformat() + "Z"}
[docs] def touch(self, session_id: str) -> bool: """Extend the TTL of a sandbox.""" if session_id not in self._sandboxes: # Create if doesn't exist self.create_sandbox(session_id) return True sandbox = self._sandboxes[session_id] ttl = sandbox.get("ttl_seconds", 900) sandbox["expires_at"] = datetime.now(timezone.utc) + timedelta(seconds=ttl) return True
[docs] def exec( self, session_id: str, cmd: List[str], workdir: str = "/workspace", env: Optional[Dict[str, str]] = None, timeout_seconds: int = 30, ) -> dict: """ Execute a command in the sandbox. WARNING: This executes commands in the local environment! """ # Print warning for every exec self._print_warning( f"Executing command in LOCAL environment: {' '.join(cmd)}", "⚠️ This may cause DATA LOSS or SECURITY RISKS! ⚠️", ) if session_id not in self._sandboxes: # Auto-create sandbox if doesn't exist self.create_sandbox(session_id) sandbox = self._sandboxes[session_id] sandbox_path = sandbox["path"] # Validate command using CommandValidator if not self.validator.validate(cmd): return { "exitCode": 1, "stdout": "", "stderr": f"Command not allowed: {cmd[0] if cmd else 'empty'}", "durationMs": 0, } # Sanitize workdir using PathSanitizer workdir_path = self.sanitizer.sanitize(workdir, sandbox_path) workdir_path.mkdir(parents=True, exist_ok=True) # Prepare environment exec_env = os.environ.copy() if env: exec_env.update(env) # Change to sandbox workspace for safety original_cwd = os.getcwd() try: os.chdir(str(workdir_path)) start_time = time.time() # Execute command with timeout try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout_seconds, env=exec_env, cwd=str(workdir_path), # Security: Don't allow shell injection shell=False, ) exit_code = result.returncode stdout = result.stdout stderr = result.stderr except subprocess.TimeoutExpired: exit_code = 124 # Standard timeout exit code stdout = "" stderr = f"Command timed out after {timeout_seconds} seconds" except FileNotFoundError: exit_code = 127 # Command not found stdout = "" stderr = f"Command not found: {cmd[0]}" except Exception as e: exit_code = 1 stdout = "" stderr = f"Execution error: {str(e)}" duration_ms = int((time.time() - start_time) * 1000) return { "exitCode": exit_code, "stdout": stdout, "stderr": stderr, "durationMs": duration_ms, } finally: os.chdir(original_cwd)
[docs] def upload_files( self, session_id: str, files: Dict[str, Union[str, bytes]], dest: str = "/workspace" ) -> bool: """Upload files to the sandbox.""" if session_id not in self._sandboxes: self.create_sandbox(session_id) sandbox = self._sandboxes[session_id] sandbox_path = sandbox["path"] dest_path = self.sanitizer.sanitize(dest, sandbox_path) dest_path.mkdir(parents=True, exist_ok=True) for filepath, content in files.items(): # Sanitize file path - handle relative paths with subdirectories # First check if it's a relative path with subdirectories if "/" in filepath or "\\" in filepath: # Path contains directory separators, sanitize as relative path # Remove any leading slashes and path traversal attempts clean_path = filepath.lstrip("/").replace("\\", "/") if ".." in clean_path or clean_path.startswith("/"): # Path traversal detected, use filename only safe_path = Path(self.sanitizer.sanitize_filename(filepath)) else: # Safe relative path, use it safe_path = Path(clean_path) else: # Simple filename, use sanitize_filename safe_path = Path(self.sanitizer.sanitize_filename(filepath)) target = dest_path / safe_path # Create parent directories target.parent.mkdir(parents=True, exist_ok=True) # Write file if isinstance(content, str): target.write_text(content, encoding="utf-8") else: target.write_bytes(content) return True
[docs] def download_files(self, session_id: str, src: str = "/workspace") -> bytes: """Download files from the sandbox as a tar archive.""" if session_id not in self._sandboxes: raise ValueError(f"Sandbox {session_id} does not exist") sandbox = self._sandboxes[session_id] sandbox_path = sandbox["path"] src_path = self.sanitizer.sanitize(src, sandbox_path) if not src_path.exists(): raise ValueError(f"Source path does not exist: {src}") # Use TarHandler to create tar archive from directory return self.tar_handler.create_tar_from_directory(src_path, src_path)
[docs] def delete_sandbox(self, session_id: str) -> bool: """ Delete a sandbox. This removes the entire /tmp/{sandbox_id} directory. """ if session_id not in self._sandboxes: return False sandbox = self._sandboxes[session_id] sandbox_path = sandbox["path"] # Remove entire sandbox directory if sandbox_path.exists(): shutil.rmtree(sandbox_path) del self._sandboxes[session_id] return True
[docs] def wait_for_pod_ready(self, session_id: str, timeout: int = 30, interval: int = 2) -> bool: """Wait for sandbox to be ready.""" if session_id not in self._sandboxes: self.create_sandbox(session_id) # Local sandbox is always ready immediately return True