Advanced Patterns
21.1 Multi-Mixin Agent
Copy
from gaia.agents.base.agent import Agent
from gaia.agents.base.api_agent import ApiAgent
from gaia.agents.chat.tools.file_tools import FileToolsMixin
from gaia.agents.chat.tools.rag_tools import RAGToolsMixin
from gaia.agents.chat.tools.shell_tools import ShellToolsMixin
class MultiToolAgent(
ApiAgent,
Agent,
FileToolsMixin,
RAGToolsMixin,
ShellToolsMixin
):
"""Agent combining multiple tool sets."""
def _get_system_prompt(self) -> str:
return """You are a versatile assistant with access to:
- File operations (read, write, list)
- Document search (RAG)
- Shell commands
Choose the right tool for each task."""
def _create_console(self):
from gaia.agents.base.console import AgentConsole
return AgentConsole()
def _register_tools(self):
# Tools from all mixins are available
# Add custom tools here if needed
pass
# API integration
def get_model_id(self) -> str:
return "multi-tool-agent"
21.2 Agent with File Watching
Copy
from gaia.agents.base.agent import Agent
from gaia.agents.base.tools import tool
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
class DirectoryMonitorAgent(Agent):
"""Agent that monitors a directory and processes new files."""
def __init__(self, watch_dir: str = "./watched", **kwargs):
self.watch_dir = Path(watch_dir)
self.watch_dir.mkdir(exist_ok=True)
self._observer = None
super().__init__(**kwargs)
self._start_watching()
def _start_watching(self):
"""Start watching directory."""
class Handler(FileSystemEventHandler):
def __init__(self, agent):
self.agent = agent
def on_created(self, event):
if not event.is_directory:
print(f"New file: {event.src_path}")
# Process automatically
self.agent._process_file(event.src_path)
handler = Handler(self)
self._observer = Observer()
self._observer.schedule(handler, str(self.watch_dir), recursive=False)
self._observer.start()
print(f"Watching: {self.watch_dir}")
def _process_file(self, file_path: str):
"""Process new file automatically."""
# Your processing logic
print(f"Processing: {file_path}")
def _get_system_prompt(self) -> str:
return "You monitor files and process them automatically."
def _create_console(self):
from gaia.agents.base.console import AgentConsole
return AgentConsole()
def _register_tools(self):
@tool
def get_recent_files(count: int = 5) -> dict:
"""Get recently added files."""
files = sorted(
self.watch_dir.glob("*"),
key=lambda p: p.stat().st_mtime,
reverse=True
)[:count]
return {
"files": [str(f) for f in files],
"count": len(files)
}
def __del__(self):
"""Stop watching on cleanup."""
if self._observer:
self._observer.stop()
self._observer.join(timeout=5)
Related Topics
- Core Agent System - Agent architecture
- Tool Mixins - Reusable tool collections
- API Server - API integration
- Examples - Complete working examples