Skip to main content
The FileWatcher and FileChangeHandler utilities provide file system monitoring for GAIA agents. Use them to watch directories for changes and trigger callbacks when files are created, modified, deleted, or moved.

Quick Start

from gaia import FileWatcher

def process_new_file(path: str):
    print(f"New file: {path}")

# Watch for new PDFs and images
watcher = FileWatcher(
    directory="./intake_forms",
    on_created=process_new_file,
    extensions=[".pdf", ".png", ".jpg"],
)

watcher.start()
# ... do work ...
watcher.stop()

Two Approaches

Use FileWatcher for simple directory watching with automatic lifecycle management:
from gaia import FileWatcher

def on_new_intake(path: str):
    print(f"Processing: {path}")

# Context manager handles start/stop
with FileWatcher("./data", on_created=on_new_intake) as watcher:
    while True:
        # Watcher is running in background
        time.sleep(1)

2. FileChangeHandler (Advanced)

Use FileChangeHandler directly when you need more control:
from gaia import FileChangeHandler
from watchdog.observers import Observer

handler = FileChangeHandler(
    on_created=process_file,
    on_modified=process_file,
    on_deleted=cleanup_file,
    extensions=[".pdf"],
    debounce_seconds=2.0,
)

observer = Observer()
observer.schedule(handler, "./data", recursive=True)
observer.start()

Integration with Agents

FileWatcher works seamlessly with GAIA agents:
from gaia import Agent, DatabaseMixin, FileWatcher

class IntakeAgent(Agent, DatabaseMixin):
    def __init__(self, watch_dir: str = "./intake_forms", **kwargs):
        super().__init__(**kwargs)
        self.init_db("data/intake.db")

        # Set up file watching with callbacks
        self._watcher = FileWatcher(
            directory=watch_dir,
            on_created=self._process_new_form,
            extensions=[".pdf", ".png", ".jpg"],
        )
        self._watcher.start()

    def _process_new_form(self, path: str):
        """Called automatically when new files arrive."""
        result = self.extract_data(path)
        self.insert("forms", result)

    def close(self):
        self._watcher.stop()
        self.close_db()

API Reference

FileWatcher

ParameterTypeDefaultDescription
directorystrrequiredDirectory to watch
on_createdCallableNoneCallback for new files
on_modifiedCallableNoneCallback for modified files
on_deletedCallableNoneCallback for deleted files
on_movedCallableNoneCallback for moved files (src, dest)
extensionsList[str]defaultsFile extensions to watch
filter_funcCallableNoneCustom filter predicate
debounce_secondsfloat2.0Debounce time
recursiveboolFalseWatch subdirectories

Methods

MethodDescription
start()Start watching (safe to call multiple times)
stop()Stop watching (safe to call multiple times)
is_runningProperty: True if watching
telemetryProperty: Event statistics

FileChangeHandler

ParameterTypeDefaultDescription
on_createdCallable[[str], None]NoneCalled with file path
on_modifiedCallable[[str], None]NoneCalled with file path
on_deletedCallable[[str], None]NoneCalled with file path
on_movedCallable[[str, str], None]NoneCalled with (src, dest)
extensionsList[str]defaultsExtensions to watch
filter_funcCallable[[str], bool]NoneCustom filter
debounce_secondsfloat2.0Min time between events
ignore_directoriesboolTrueSkip directory events

Examples

Watch All Files

# Empty extensions list watches all files
watcher = FileWatcher(
    directory="./data",
    on_created=process,
    extensions=[],  # Watch everything
)

Custom Filter

def is_valid_file(path: str) -> bool:
    # Skip hidden files and temp files
    name = Path(path).name
    return not name.startswith(".") and not name.endswith(".tmp")

watcher = FileWatcher(
    directory="./data",
    on_created=process,
    filter_func=is_valid_file,
)

Handle Moves/Renames

def handle_move(src: str, dest: str):
    print(f"File moved: {src} -> {dest}")
    # Update database references
    db.update("files", {"path": dest}, "path = :old", {"old": src})

watcher = FileWatcher(
    directory="./data",
    on_moved=handle_move,
)

Access Telemetry

watcher = FileWatcher("./data", on_created=process)
watcher.start()

# Later, check statistics
stats = watcher.telemetry
print(f"Files processed: {stats['files_created']}")
print(f"Total events: {stats['total_events']}")

Default Extensions

When extensions is not specified, FileChangeHandler watches these file types:
  • Documents: .pdf, .txt, .md, .markdown, .rst
  • Data: .csv, .json, .xml, .yaml, .yml
  • Code: .py, .js, .ts, .java, .cpp, .c
  • Web: .html, .css
  • Logs: .log

Best Practices

Use Debouncing

Debouncing prevents duplicate events when files are modified multiple times quickly:
# 2 second debounce (default)
watcher = FileWatcher(
    directory="./data",
    on_modified=process,
    debounce_seconds=2.0,  # Ignore events within 2 seconds
)

Handle Errors in Callbacks

Callbacks run in the observer thread. Errors are caught and logged:
def safe_process(path: str):
    try:
        process_file(path)
    except Exception as e:
        logger.error(f"Failed to process {path}: {e}")
        # Consider adding to retry queue

watcher = FileWatcher("./data", on_created=safe_process)

Clean Up Resources

Always stop the watcher when done:
# Use context manager (recommended)
with FileWatcher("./data", on_created=process) as watcher:
    run_application()

# Or manual cleanup
watcher = FileWatcher("./data", on_created=process)
try:
    watcher.start()
    run_application()
finally:
    watcher.stop()

Requirements

FileWatcher requires the watchdog package:
pip install "watchdog>=2.1.0"

# Or with GAIA
pip install "amd-gaia[dev]"
Check if watchdog is available:
from gaia.utils.file_watcher import check_watchdog_available

if check_watchdog_available():
    watcher = FileWatcher(...)
else:
    print("Install watchdog for file watching support")

Complete Example

A complete working agent that watches a directory and tracks processed files using FileWatcherMixin:
import sys
import time
from pathlib import Path

from gaia import Agent, FileWatcherMixin, tool

class FileWatcherAgent(Agent, FileWatcherMixin):
    """Agent that watches a directory and processes new files."""

    def __init__(self, watch_dir: str = "./watched_files", **kwargs):
        # Set attributes BEFORE super().__init__() since it may call _get_system_prompt()
        self._watch_dir = Path(watch_dir)
        self.processed_files = []
        super().__init__(**kwargs)

        # Create watch directory if it doesn't exist
        self._watch_dir.mkdir(parents=True, exist_ok=True)

        # Use mixin to watch directory
        self.watch_directory(
            self._watch_dir,
            on_created=self._on_file_created,
            on_modified=self._on_file_modified,
            on_deleted=self._on_file_deleted,
            on_moved=self._on_file_moved,
            extensions=[],  # Watch all file types
            debounce_seconds=1.0,
        )

    def _get_system_prompt(self) -> str:
        return f"""You are a file processing assistant watching: {self._watch_dir}

When files are added, you automatically process them and can answer questions about them.

Available actions:
- list_processed(): Show all files that have been processed
- get_file_info(filename): Get details about a specific file
- get_stats(): Get watching statistics
"""

    def _register_tools(self):
        agent = self

        @tool
        def list_processed() -> dict:
            """List all files that have been processed."""
            return {
                "files": agent.processed_files,
                "count": len(agent.processed_files),
            }

        @tool
        def get_file_info(filename: str) -> dict:
            """Get information about a processed file."""
            for f in agent.processed_files:
                if f["name"] == filename:
                    return {"found": True, "file": f}
            return {"found": False, "message": f"File '{filename}' not found"}

        @tool
        def get_stats() -> dict:
            """Get file watching statistics."""
            return {
                "directory": str(agent._watch_dir),
                "watching": [str(d) for d in agent.watching_directories],
                "telemetry": agent.watcher_telemetry,
                "processed_count": len(agent.processed_files),
            }

    def _on_file_created(self, path: str) -> None:
        """Called when a new file is created."""
        file_path = Path(path)
        size = file_path.stat().st_size if file_path.exists() else 0
        file_info = {
            "name": file_path.name,
            "path": str(file_path.absolute()),
            "size": size,
            "extension": file_path.suffix,
            "processed_at": time.strftime("%Y-%m-%d %H:%M:%S"),
        }
        self.processed_files.append(file_info)
        self.console.print_file_created(
            filename=file_path.name,
            size=size,
            extension=file_info["extension"] or "none",
        )

    def _on_file_modified(self, path: str) -> None:
        """Called when a file is modified."""
        file_path = Path(path)
        self.console.print_file_modified(file_path.name)

    def _on_file_deleted(self, path: str) -> None:
        """Called when a file is deleted."""
        file_path = Path(path)
        self.console.print_file_deleted(file_path.name)
        # Remove from processed list
        self.processed_files = [
            f for f in self.processed_files if f["name"] != file_path.name
        ]

    def _on_file_moved(self, src_path: str, dest_path: str) -> None:
        """Called when a file is renamed/moved."""
        src_file = Path(src_path)
        dest_file = Path(dest_path)
        self.console.print_file_moved(src_file.name, dest_file.name)
        # Update the filename in processed list
        for f in self.processed_files:
            if f["name"] == src_file.name:
                f["name"] = dest_file.name
                f["path"] = str(dest_file.absolute())
                f["extension"] = dest_file.suffix
                break

def main():
    """Run the File Watcher Agent."""
    watch_dir = sys.argv[1] if len(sys.argv) > 1 else "./watched_files"

    agent = FileWatcherAgent(watch_dir=watch_dir)
    print(f"Watching: {watch_dir}")
    print("Ready! Waiting for files...\n")

    # Interactive loop
    while True:
        try:
            user_input = input("You: ").strip()
            if user_input.lower() in ("quit", "exit", "q"):
                break
            agent.process_query(user_input)
        except KeyboardInterrupt:
            break

    agent.stop_all_watchers()

if __name__ == "__main__":
    main()
Usage:
# Run the example
python examples/file_watcher_agent.py ./my_documents

# In another terminal, create a test file
echo "Hello World" > my_documents/test.txt