Skip to main content
Component: FileChangeHandler Module: gaia.utils.file_watcher (planned location) Import: from gaia import FileChangeHandler (after v1.0.0 restructure)

Overview

FileChangeHandler provides a reusable file system watcher for GAIA agents. It monitors directories for file changes (create, modify, delete) and triggers callbacks, enabling agents to automatically process new files. Key Features:
  • File event detection (create, modify, delete)
  • Callback-based architecture
  • File extension filtering
  • Pattern-based ignore rules
  • Debouncing to prevent duplicate events
  • Integration with watchdog library

Requirements

Functional Requirements

  1. File Event Handling
    • Detect file creation
    • Detect file modification
    • Detect file deletion
    • Optional: Detect file moves
  2. Filtering
    • File extension filters
    • Filename pattern matching
    • Ignore patterns (e.g., temp files, hidden files)
  3. Debouncing
    • Prevent duplicate events
    • Configurable debounce time
    • Handle rapid successive changes
  4. Callback System
    • on_created(event) callback
    • on_modified(event) callback
    • on_deleted(event) callback
    • Pass file path and metadata
  5. Integration with watchdog
    • Extends FileSystemEventHandler
    • Works with Observer
    • Thread-safe

Non-Functional Requirements

  1. Performance
    • Low overhead monitoring
    • Efficient event filtering
    • Non-blocking callbacks
  2. Reliability
    • Handle edge cases (permission errors, symlinks)
    • Graceful degradation
    • Proper cleanup
  3. Usability
    • Simple API
    • Good defaults
    • Clear error messages

API Specification

File Location

src/gaia/utils/file_watcher.py

Public Interface

from pathlib import Path
from typing import Callable, Optional, Set, List
from watchdog.events import FileSystemEventHandler, FileSystemEvent
import time
import logging

logger = logging.getLogger(__name__)

class FileChangeHandler(FileSystemEventHandler):
    """
    Generic file system watcher with callback support.

    Monitors directories for file changes and triggers callbacks.
    Includes debouncing to prevent duplicate event processing.

    Usage:
        def process_new_file(event):
            print(f"New file: {event.src_path}")

        handler = FileChangeHandler(
            on_created=process_new_file,
            extensions={'.pdf', '.png'},
            debounce_seconds=1.0
        )

        from watchdog.observers import Observer
        observer = Observer()
        observer.schedule(handler, "/path/to/watch", recursive=False)
        observer.start()

    Attributes:
        on_created: Callback for file creation events
        on_modified: Callback for file modification events
        on_deleted: Callback for file deletion events
        extensions: Set of file extensions to monitor (e.g., {'.pdf', '.txt'})
        ignore_patterns: Patterns to ignore (e.g., {'.*', '~*'})
        debounce_seconds: Minimum time between events for same file
    """

    SUPPORTED_EXTENSIONS: Set[str] = {'.pdf', '.png', '.jpg', '.jpeg', '.txt', '.md', '.docx'}
    DEFAULT_IGNORE_PATTERNS: Set[str] = {'.*', '~*', '*.tmp', '*.swp', '*.bak'}

    def __init__(
        self,
        on_created: Optional[Callable[[FileSystemEvent], None]] = None,
        on_modified: Optional[Callable[[FileSystemEvent], None]] = None,
        on_deleted: Optional[Callable[[FileSystemEvent], None]] = None,
        extensions: Optional[Set[str]] = None,
        ignore_patterns: Optional[Set[str]] = None,
        debounce_seconds: float = 1.0,
    ):
        """
        Initialize file change handler.

        Args:
            on_created: Callback when file is created
            on_modified: Callback when file is modified
            on_deleted: Callback when file is deleted
            extensions: File extensions to monitor (default: all common types)
            ignore_patterns: Patterns to ignore (default: temp/hidden files)
            debounce_seconds: Min time between events for same file (default: 1.0)
        """
        super().__init__()
        self._on_created = on_created
        self._on_modified = on_modified
        self._on_deleted = on_deleted
        self._extensions = extensions or self.SUPPORTED_EXTENSIONS
        self._ignore_patterns = ignore_patterns or self.DEFAULT_IGNORE_PATTERNS
        self._debounce_seconds = debounce_seconds
        self._last_events: Dict[str, float] = {}  # path → timestamp

    def on_created(self, event: FileSystemEvent) -> None:
        """Handle file creation event."""
        if not event.is_directory and self._should_process(event):
            if self._is_debounced(event.src_path):
                logger.debug(f"Debounced: {event.src_path}")
                return

            logger.info(f"File created: {event.src_path}")
            if self._on_created:
                try:
                    self._on_created(event)
                except Exception as e:
                    logger.error(f"Error in on_created callback: {e}", exc_info=True)

    def on_modified(self, event: FileSystemEvent) -> None:
        """Handle file modification event."""
        if not event.is_directory and self._should_process(event):
            if self._is_debounced(event.src_path):
                logger.debug(f"Debounced: {event.src_path}")
                return

            logger.info(f"File modified: {event.src_path}")
            if self._on_modified:
                try:
                    self._on_modified(event)
                except Exception as e:
                    logger.error(f"Error in on_modified callback: {e}", exc_info=True)

    def on_deleted(self, event: FileSystemEvent) -> None:
        """Handle file deletion event."""
        if not event.is_directory and self._should_process(event):
            logger.info(f"File deleted: {event.src_path}")
            if self._on_deleted:
                try:
                    self._on_deleted(event)
                except Exception as e:
                    logger.error(f"Error in on_deleted callback: {e}", exc_info=True)

            # Clean up debounce tracking
            self._last_events.pop(event.src_path, None)

    def _should_process(self, event: FileSystemEvent) -> bool:
        """Check if file should be processed based on filters."""
        path = Path(event.src_path)

        # Check extension
        if self._extensions and path.suffix not in self._extensions:
            return False

        # Check ignore patterns
        for pattern in self._ignore_patterns:
            if path.match(pattern):
                return False

        return True

    def _is_debounced(self, file_path: str) -> bool:
        """Check if event should be debounced."""
        current_time = time.time()
        last_time = self._last_events.get(file_path, 0)

        if current_time - last_time < self._debounce_seconds:
            return True

        self._last_events[file_path] = current_time
        return False

Testing Requirements

Unit Tests

File: tests/sdk/test_file_change_handler.py
import pytest
from pathlib import Path
import time
import tempfile
from gaia import FileChangeHandler
from watchdog.observers import Observer
from watchdog.events import FileCreatedEvent, FileModifiedEvent

def test_file_change_handler_can_be_imported():
    """Verify FileChangeHandler can be imported from gaia."""
    from gaia import FileChangeHandler
    assert FileChangeHandler is not None

def test_handler_creation():
    """Test handler can be created."""
    handler = FileChangeHandler()
    assert handler is not None

def test_handler_with_callbacks():
    """Test handler with callbacks."""
    created_files = []
    modified_files = []

    def on_create(event):
        created_files.append(event.src_path)

    def on_modify(event):
        modified_files.append(event.src_path)

    handler = FileChangeHandler(
        on_created=on_create,
        on_modified=on_modify
    )

    assert handler._on_created is not None
    assert handler._on_modified is not None

def test_extension_filtering():
    """Test file extension filtering."""
    processed_files = []

    def on_create(event):
        processed_files.append(event.src_path)

    handler = FileChangeHandler(
        on_created=on_create,
        extensions={'.pdf', '.txt'}
    )

    # Simulate events
    pdf_event = FileCreatedEvent("test.pdf")
    txt_event = FileCreatedEvent("test.txt")
    jpg_event = FileCreatedEvent("test.jpg")

    handler.on_created(pdf_event)
    handler.on_created(txt_event)
    handler.on_created(jpg_event)

    # Only .pdf and .txt should be processed
    assert len(processed_files) == 2
    assert "test.pdf" in processed_files[0]
    assert "test.txt" in processed_files[1]

def test_ignore_patterns():
    """Test ignore patterns."""
    processed_files = []

    def on_create(event):
        processed_files.append(event.src_path)

    handler = FileChangeHandler(
        on_created=on_create,
        ignore_patterns={'.*', '~*'}
    )

    # Simulate events
    normal_event = FileCreatedEvent("file.txt")
    hidden_event = FileCreatedEvent(".hidden")
    temp_event = FileCreatedEvent("~temp.txt")

    handler.on_created(normal_event)
    handler.on_created(hidden_event)
    handler.on_created(temp_event)

    # Only normal file should be processed
    assert len(processed_files) == 1
    assert "file.txt" in processed_files[0]

def test_debouncing():
    """Test event debouncing."""
    call_count = 0

    def on_create(event):
        nonlocal call_count
        call_count += 1

    handler = FileChangeHandler(
        on_created=on_create,
        debounce_seconds=0.5
    )

    # Rapid events on same file
    event = FileCreatedEvent("test.txt")
    handler.on_created(event)
    handler.on_created(event)  # Should be debounced
    handler.on_created(event)  # Should be debounced

    assert call_count == 1

    # Wait for debounce period
    time.sleep(0.6)
    handler.on_created(event)  # Should process

    assert call_count == 2

def test_real_file_watching(tmp_path):
    """Test with real file system changes."""
    processed_files = []

    def on_create(event):
        processed_files.append(Path(event.src_path).name)

    handler = FileChangeHandler(
        on_created=on_create,
        extensions={'.txt'}
    )

    observer = Observer()
    observer.schedule(handler, str(tmp_path), recursive=False)
    observer.start()

    try:
        # Create a file
        test_file = tmp_path / "test.txt"
        test_file.write_text("test content")

        # Wait for event
        time.sleep(0.5)

        # Verify callback was triggered
        assert "test.txt" in processed_files

    finally:
        observer.stop()
        observer.join(timeout=5)

def test_callback_error_handling():
    """Test that callback errors don't crash handler."""
    def bad_callback(event):
        raise Exception("Callback error")

    handler = FileChangeHandler(on_created=bad_callback)

    # Should not raise
    event = FileCreatedEvent("test.txt")
    handler.on_created(event)  # Logs error but doesn't crash

Usage Examples

Example 1: EMR Intake Agent (Auto-Process Forms)

from gaia import Agent, FileChangeHandler
from gaia.llm.vlm_client import VLMClient
from watchdog.observers import Observer
from pathlib import Path

class MedicalIntakeAgent(Agent):
    """Process medical intake forms automatically."""

    def __init__(self, watch_dir: str = "./intake_forms", **kwargs):
        super().__init__(**kwargs)
        self.watch_dir = Path(watch_dir)
        self.watch_dir.mkdir(exist_ok=True)
        self.vlm = VLMClient()
        self._observer = None
        self._start_watching()

    def _start_watching(self):
        """Start watching for new intake forms."""
        def process_new_form(event):
            print(f"New form detected: {event.src_path}")
            # Extract data from form
            result = self._extract_form_data(event.src_path)
            print(f"Extracted: {result}")

        handler = FileChangeHandler(
            on_created=process_new_form,
            extensions={'.pdf', '.png', '.jpg'},
            debounce_seconds=2.0
        )

        self._observer = Observer()
        self._observer.schedule(handler, str(self.watch_dir), recursive=False)
        self._observer.start()
        print(f"Watching: {self.watch_dir}")

    def _extract_form_data(self, image_path: str) -> dict:
        """Extract data from intake form."""
        path = Path(image_path)
        image_bytes = path.read_bytes()
        extracted = self.vlm.extract_from_image(image_bytes, "Extract patient data")
        return {"file": str(path), "data": extracted}

    def __del__(self):
        """Stop watching on cleanup."""
        if self._observer:
            self._observer.stop()
            self._observer.join(timeout=5)

Example 2: Document Indexing Agent

from gaia import Agent, FileChangeHandler
from gaia.rag.sdk import RAGSDK, RAGConfig
from watchdog.observers import Observer

class AutoIndexAgent(Agent):
    """Automatically index new documents."""

    def __init__(self, docs_dir: str = "./docs", **kwargs):
        super().__init__(**kwargs)
        self.rag = RAGSDK(RAGConfig())
        self._setup_watching(docs_dir)

    def _setup_watching(self, docs_dir: str):
        """Watch directory for new documents."""
        def index_new_doc(event):
            print(f"Indexing: {event.src_path}")
            self.rag.index_document(event.src_path)
            print(f"✅ Indexed: {Path(event.src_path).name}")

        handler = FileChangeHandler(
            on_created=index_new_doc,
            extensions={'.pdf', '.txt', '.md', '.docx'}
        )

        observer = Observer()
        observer.schedule(handler, docs_dir, recursive=True)
        observer.start()
        print(f"Auto-indexing enabled for: {docs_dir}")

Implementation Details

Extract from ChatAgent

Current location: src/gaia/agents/chat/agent.py (around line 70)
# Current (tightly coupled)
class FileChangeHandler(FileSystemEventHandler):
    def __init__(self, agent):
        self.agent = agent  # ❌ Coupled to specific agent

    def on_created(self, event):
        self.agent.reindex()  # ❌ Calls agent method
Target (generic):
# Generic (callback-based)
class FileChangeHandler(FileSystemEventHandler):
    def __init__(self, on_created=None, ...):
        self._on_created = on_created  # ✅ Generic callback

    def on_created(self, event):
        if self._on_created:
            self._on_created(event)  # ✅ Calls callback

Debouncing Implementation

def __init__(self, ..., debounce_seconds=1.0):
    self._debounce_seconds = debounce_seconds
    self._last_events = {}  # path → timestamp

def _is_debounced(self, file_path: str) -> bool:
    """Check if event should be debounced."""
    current_time = time.time()
    last_time = self._last_events.get(file_path, 0)

    if current_time - last_time < self._debounce_seconds:
        return True  # Skip this event

    self._last_events[file_path] = current_time
    return False  # Process this event

Extension Filtering

def _should_process(self, event: FileSystemEvent) -> bool:
    """Check if file should be processed."""
    path = Path(event.src_path)

    # Check extension
    if self._extensions and path.suffix not in self._extensions:
        return False

    # Check ignore patterns
    for pattern in self._ignore_patterns:
        if path.match(pattern):
            return False

    return True

Dependencies

Required Packages

# pyproject.toml
[project]
dependencies = [
    "watchdog>=3.0.0",  # File system monitoring
]

Import Dependencies

from pathlib import Path
from typing import Callable, Optional, Set, Dict
from watchdog.events import FileSystemEventHandler, FileSystemEvent
from watchdog.observers import Observer
import time
import logging

Documentation Updates Required

SDK.md

Add new section after Tool Mixins:
## 10. File System Monitoring

### FileChangeHandler

**Import:** `from gaia import FileChangeHandler`

**Purpose:** Monitor directories for file changes and trigger automatic processing.

**When to use:**
- Auto-process files dropped in a folder
- Auto-index new documents
- Watch for configuration changes
- Trigger workflows on file events

[Full documentation with examples]

Update EMR Example

Replace manual file watching with FileChangeHandler in medical-intake-build-guide.md

Acceptance Criteria

  • FileChangeHandler implemented in src/gaia/utils/file_watcher.py
  • All methods implemented with docstrings
  • Debouncing works correctly
  • Extension filtering works
  • Ignore patterns work
  • Callbacks are called correctly
  • Error handling in callbacks
  • All unit tests pass (8+ tests)
  • Exported from gaia/__init__.py
  • Can import: from gaia import FileChangeHandler
  • Documented in SDK.md
  • EMR agent can use it
  • No coupling to specific agent types

Implementation Checklist

Step 1: Create File

  • Create src/gaia/utils/ directory
  • Create src/gaia/utils/__init__.py
  • Create src/gaia/utils/file_watcher.py
  • Add copyright header

Step 2: Implement Class

  • Extend FileSystemEventHandler
  • Implement __init__ with callbacks
  • Implement on_created()
  • Implement on_modified()
  • Implement on_deleted()
  • Implement _should_process()
  • Implement _is_debounced()

Step 3: Add Features

  • Extension filtering
  • Ignore patterns
  • Debouncing
  • Error handling in callbacks
  • Logging

Step 4: Write Tests

  • Create tests/sdk/test_file_change_handler.py
  • Test imports
  • Test callbacks
  • Test filtering
  • Test debouncing
  • Test with real Observer
  • Test error handling

Step 5: Export & Document

  • Add to src/gaia/__init__.py
  • Add to __all__ list
  • Update SDK.md
  • Add examples to SDK.md

Step 6: Validate

  • Can import: from gaia import FileChangeHandler
  • Example code works
  • All tests pass
  • EMR agent can use it

FileChangeHandler Technical Specification