Component: ShellToolsMixin
Module: gaia.agents.chat.tools.shell_tools
Import: from gaia.agents.chat.tools.shell_tools import ShellToolsMixin
Overview
ShellToolsMixin provides secure shell command execution with comprehensive rate limiting, command whitelisting, and path validation to prevent abuse while enabling file operations and system queries.
Key Features:
- Whitelist-based command security (only safe, read-only commands)
- Dual-tier rate limiting (burst + sustained)
- Path traversal prevention with argument validation
- Git read-only operations support
- Output truncation for large results
Security Model:
- WHITELIST approach (explicitly allow safe commands only)
- Path validation for all file-like arguments
- Git subcommand filtering (read-only only)
- Rate limits prevent DOS attacks
Requirements
Functional Requirements
-
Safe Command Execution
- Whitelist: ls, cat, grep, find, git status, etc.
- Blacklist dangerous commands (rm, chmod, sudo, etc.)
- Git read-only subcommands only
- Working directory support
- Timeout protection (default: 30s)
-
Rate Limiting
- Max 10 commands per minute (sustained)
- Max 3 commands per 10 seconds (burst)
- Clear wait time messaging
- Timestamp tracking with deque
-
Path Security
- Validate working directory access
- Check arguments for path traversal (.., separators)
- Resolve relative paths to absolute
- PathValidator integration
-
Output Management
- Capture stdout and stderr separately
- Truncate large outputs (max 10,000 chars)
- Return execution duration
- Return code reporting
Non-Functional Requirements
-
Security
- Zero tolerance for dangerous commands
- Path validation prevents escape attacks
- No shell interpolation vulnerabilities
- Safe parsing with shlex
-
Performance
- Configurable timeouts
- Efficient rate limit checking (O(1))
- Non-blocking execution
-
Usability
- Clear error messages with hints
- Rate limit guidance (wait times)
- Allowed command examples on rejection
API Specification
File Location
src/gaia/agents/chat/tools/shell_tools.py
Public Interface
from typing import Any, Dict, Optional
from collections import deque
class ShellToolsMixin:
"""
Mixin providing shell command execution with rate limiting.
Attributes:
shell_command_times: Deque tracking command timestamps
max_commands_per_minute: Sustained rate limit (default: 10)
max_commands_per_10_seconds: Burst rate limit (default: 3)
Tools provided:
- run_shell_command: Execute safe shell commands with security checks
"""
def __init__(self, *args, **kwargs):
"""Initialize shell tools with rate limiting."""
super().__init__(*args, **kwargs)
self.shell_command_times = deque(maxlen=100)
self.max_commands_per_minute = 10
self.max_commands_per_10_seconds = 3
def register_shell_tools(self) -> None:
"""Register shell command execution tools."""
pass
def _check_rate_limit(self) -> tuple:
"""
Check if rate limit allows another command.
Returns:
(allowed: bool, reason: str, wait_time: float)
"""
pass
def _record_command_execution(self):
"""Record command execution timestamp for rate limiting."""
pass
@tool
def run_shell_command(
command: str,
working_directory: Optional[str] = None,
timeout: int = 30
) -> Dict[str, Any]:
"""
Execute a shell command and return output.
Security Layers:
1. Rate limit check (burst + sustained)
2. Working directory validation
3. Command parsing (shlex for safety)
4. Command whitelist check
5. Git subcommand filtering
6. Argument path validation
Args:
command: Shell command to execute (e.g., 'ls -la')
working_directory: Optional directory to run in
timeout: Max execution time in seconds (default: 30)
Returns:
{
"status": "success" | "error",
"command": str,
"stdout": str,
"stderr": str,
"return_code": int,
"has_errors": bool,
"duration_seconds": float,
"timeout": int,
"cwd": str,
"output_truncated": bool,
"rate_limited": bool, # If rate limited
"wait_time_seconds": float, # Time to wait
"timed_out": bool # If command timed out
}
Allowed Commands:
File operations: ls, cat, head, tail, grep, find
File info: file, stat, du, df
System info: whoami, hostname, uname, date
Git (read-only): status, log, show, diff, branch
Path utils: which, basename, dirname
"""
pass
Implementation Details
Rate Limiting
def _check_rate_limit(self) -> tuple:
current_time = time.time()
minute_ago = current_time - 60
ten_sec_ago = current_time - 10
recent_minute = sum(1 for t in self.shell_command_times if t > minute_ago)
recent_10_sec = sum(1 for t in self.shell_command_times if t > ten_sec_ago)
# Check burst limit first
if recent_10_sec >= self.max_commands_per_10_seconds:
wait_time = 10 - (current_time - min(recent_times))
return False, f"Rate limit: max 3 commands per 10 seconds", wait_time
# Check sustained limit
if recent_minute >= self.max_commands_per_minute:
wait_time = 60 - (current_time - min(recent_times))
return False, f"Rate limit: max 10 commands per minute", wait_time
return True, "", 0.0
Command Whitelisting
ALLOWED_COMMANDS = {
# File listing (READ-ONLY)
"ls", "dir", "pwd", "cd",
# File content (READ-ONLY)
"cat", "head", "tail", "more", "less",
# Text processing (READ-ONLY)
"grep", "find", "wc", "sort", "uniq", "diff",
# File info (READ-ONLY)
"file", "stat", "du", "df",
# System info (READ-ONLY)
"whoami", "hostname", "uname", "date", "uptime",
# Git (filtered separately)
"git"
}
cmd_base = cmd_parts[0].lower()
if cmd_base not in ALLOWED_COMMANDS:
return {
"status": "error",
"error": f"Command '{cmd_base}' is not in the allowed list",
"hint": "Only read-only, informational commands are allowed",
"examples": "ls, cat, grep, find, git status, etc."
}
Git Command Filtering
if cmd_base == "git":
safe_git_commands = {
"status", "log", "show", "diff", "branch",
"remote", "ls-files", "ls-tree", "describe",
"rev-parse", "config", "help"
}
git_subcmd = cmd_parts[1].lower()
if git_subcmd not in safe_git_commands:
return {
"status": "error",
"error": f"Git command '{git_subcmd}' is not allowed",
"allowed_git_commands": list(safe_git_commands)
}
Path Traversal Prevention
# Validate all file-like arguments
for arg in cmd_parts[1:]:
candidate_path = arg
if arg.startswith("-") and "=" in arg:
_, candidate_path = arg.split("=", 1)
# Check for path indicators
if os.sep in candidate_path or "/" in candidate_path or ".." in candidate_path:
# Ignore URLs
if candidate_path.startswith(("http://", "https://", "git://")):
continue
# Resolve and validate path
resolved_path = str(Path(cwd).joinpath(candidate_path).resolve())
if not self.path_validator.is_path_allowed(resolved_path):
return {
"status": "error",
"error": f"Access denied: '{arg}' resolves to forbidden path"
}
Testing Requirements
File: tests/agents/chat/test_shell_tools_mixin.py
def test_run_shell_command_success(agent):
"""Test successful command execution."""
result = agent.run_shell_command("ls -la")
assert result["status"] == "success"
assert result["return_code"] == 0
def test_run_shell_command_rate_limit_burst(agent):
"""Test burst rate limiting (3 per 10 seconds)."""
# Execute 3 commands quickly
for i in range(3):
result = agent.run_shell_command("pwd")
assert result["status"] == "success"
# 4th should be rate limited
result = agent.run_shell_command("pwd")
assert result["status"] == "error"
assert result["rate_limited"] is True
def test_run_shell_command_rate_limit_sustained(agent):
"""Test sustained rate limiting (10 per minute)."""
# Would require time manipulation or mocking
def test_run_shell_command_dangerous_blocked(agent):
"""Test dangerous commands are blocked."""
dangerous = ["rm -rf /", "chmod 777", "sudo su", "dd if=/dev/zero"]
for cmd in dangerous:
result = agent.run_shell_command(cmd)
assert result["status"] == "error"
assert "not in the allowed list" in result["error"]
def test_run_shell_command_git_write_blocked(agent):
"""Test git write operations are blocked."""
result = agent.run_shell_command("git commit -m test")
assert result["status"] == "error"
assert "not allowed" in result["error"]
def test_run_shell_command_git_read_allowed(agent):
"""Test git read operations are allowed."""
result = agent.run_shell_command("git status")
assert result["status"] == "success" or "not a git repository" in result["stderr"]
def test_run_shell_command_path_traversal_blocked(agent, monkeypatch):
"""Test path traversal prevention."""
# Mock path_validator to deny parent directory
def mock_denied(path):
return False
monkeypatch.setattr(agent.path_validator, "is_path_allowed", mock_denied)
result = agent.run_shell_command("cat ../secret.txt")
assert result["status"] == "error"
assert "denied" in result["error"].lower()
def test_run_shell_command_timeout(agent):
"""Test command timeout handling."""
result = agent.run_shell_command("sleep 100", timeout=1)
assert result["status"] == "error"
assert result["timed_out"] is True
def test_run_shell_command_output_truncation(agent):
"""Test large output truncation."""
# Command that produces >10,000 chars
result = agent.run_shell_command("yes | head -n 1000")
assert result["output_truncated"] is True
assert len(result["stdout"]) <= 10100 # 10,000 + truncation message
Dependencies
import logging
import os
import shlex
import subprocess
import time
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
Usage Examples
Example 1: Safe File Operations
# List directory
result = agent.run_shell_command("ls -la /home/user/Documents")
print(result["stdout"])
# Search for files
result = agent.run_shell_command("find . -name '*.pdf'")
files = result["stdout"].splitlines()
# Read file
result = agent.run_shell_command("cat README.md")
content = result["stdout"]
Example 2: Git Operations
# Allowed: git status
result = agent.run_shell_command("git status")
print(result["stdout"])
# Allowed: git log
result = agent.run_shell_command("git log --oneline -10")
# Blocked: git commit
result = agent.run_shell_command("git commit -m 'test'")
# Returns error: "Git command 'commit' is not allowed"
Example 3: Rate Limiting
# Quick burst of commands
for i in range(5):
result = agent.run_shell_command("pwd")
if result.get("rate_limited"):
wait_time = result["wait_time_seconds"]
print(f"Rate limited. Wait {wait_time:.1f}s")
time.sleep(wait_time)
result = agent.run_shell_command("pwd")
Acceptance Criteria
ShellToolsMixin Technical Specification