Skip to main content

Advanced Patterns

21.1 Multi-Mixin Agent

from gaia.agents.base.agent import Agent
from gaia.agents.base.api_agent import ApiAgent
from gaia.agents.chat.tools.file_tools import FileToolsMixin
from gaia.agents.chat.tools.rag_tools import RAGToolsMixin
from gaia.agents.chat.tools.shell_tools import ShellToolsMixin

class MultiToolAgent(
    ApiAgent,
    Agent,
    FileToolsMixin,
    RAGToolsMixin,
    ShellToolsMixin
):
    """Agent combining multiple tool sets."""

    def _get_system_prompt(self) -> str:
        return """You are a versatile assistant with access to:
        - File operations (read, write, list)
        - Document search (RAG)
        - Shell commands

        Choose the right tool for each task."""

    def _create_console(self):
        from gaia.agents.base.console import AgentConsole
        return AgentConsole()

    def _register_tools(self):
        # Tools from all mixins are available
        # Add custom tools here if needed
        pass

    # API integration
    def get_model_id(self) -> str:
        return "multi-tool-agent"

21.2 Agent with File Watching

from gaia.agents.base.agent import Agent
from gaia.agents.base.tools import tool
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path

class DirectoryMonitorAgent(Agent):
    """Agent that monitors a directory and processes new files."""

    def __init__(self, watch_dir: str = "./watched", **kwargs):
        self.watch_dir = Path(watch_dir)
        self.watch_dir.mkdir(exist_ok=True)
        self._observer = None
        super().__init__(**kwargs)
        self._start_watching()

    def _start_watching(self):
        """Start watching directory."""
        class Handler(FileSystemEventHandler):
            def __init__(self, agent):
                self.agent = agent

            def on_created(self, event):
                if not event.is_directory:
                    print(f"New file: {event.src_path}")
                    # Process automatically
                    self.agent._process_file(event.src_path)

        handler = Handler(self)
        self._observer = Observer()
        self._observer.schedule(handler, str(self.watch_dir), recursive=False)
        self._observer.start()
        print(f"Watching: {self.watch_dir}")

    def _process_file(self, file_path: str):
        """Process new file automatically."""
        # Your processing logic
        print(f"Processing: {file_path}")

    def _get_system_prompt(self) -> str:
        return "You monitor files and process them automatically."

    def _create_console(self):
        from gaia.agents.base.console import AgentConsole
        return AgentConsole()

    def _register_tools(self):
        @tool
        def get_recent_files(count: int = 5) -> dict:
            """Get recently added files."""
            files = sorted(
                self.watch_dir.glob("*"),
                key=lambda p: p.stat().st_mtime,
                reverse=True
            )[:count]
            return {
                "files": [str(f) for f in files],
                "count": len(files)
            }

    def __del__(self):
        """Stop watching on cleanup."""
        if self._observer:
            self._observer.stop()
            self._observer.join(timeout=5)