Component: FileChangeHandler
Module: gaia.utils.file_watcher (planned location)
Import: from gaia import FileChangeHandler (after v1.0.0 restructure)
Overview
FileChangeHandler provides a reusable file system watcher for GAIA agents. It monitors directories for file changes (create, modify, delete) and triggers callbacks, enabling agents to automatically process new files.
Key Features:
- File event detection (create, modify, delete)
- Callback-based architecture
- File extension filtering
- Pattern-based ignore rules
- Debouncing to prevent duplicate events
- Integration with watchdog library
Requirements
Functional Requirements
-
File Event Handling
- Detect file creation
- Detect file modification
- Detect file deletion
- Optional: Detect file moves
-
Filtering
- File extension filters
- Filename pattern matching
- Ignore patterns (e.g., temp files, hidden files)
-
Debouncing
- Prevent duplicate events
- Configurable debounce time
- Handle rapid successive changes
-
Callback System
on_created(event) callback
on_modified(event) callback
on_deleted(event) callback
- Pass file path and metadata
-
Integration with watchdog
- Extends
FileSystemEventHandler
- Works with
Observer
- Thread-safe
Non-Functional Requirements
-
Performance
- Low overhead monitoring
- Efficient event filtering
- Non-blocking callbacks
-
Reliability
- Handle edge cases (permission errors, symlinks)
- Graceful degradation
- Proper cleanup
-
Usability
- Simple API
- Good defaults
- Clear error messages
API Specification
File Location
src/gaia/utils/file_watcher.py
Public Interface
from pathlib import Path
from typing import Callable, Optional, Set, List
from watchdog.events import FileSystemEventHandler, FileSystemEvent
import time
import logging
logger = logging.getLogger(__name__)
class FileChangeHandler(FileSystemEventHandler):
"""
Generic file system watcher with callback support.
Monitors directories for file changes and triggers callbacks.
Includes debouncing to prevent duplicate event processing.
Usage:
def process_new_file(event):
print(f"New file: {event.src_path}")
handler = FileChangeHandler(
on_created=process_new_file,
extensions={'.pdf', '.png'},
debounce_seconds=1.0
)
from watchdog.observers import Observer
observer = Observer()
observer.schedule(handler, "/path/to/watch", recursive=False)
observer.start()
Attributes:
on_created: Callback for file creation events
on_modified: Callback for file modification events
on_deleted: Callback for file deletion events
extensions: Set of file extensions to monitor (e.g., {'.pdf', '.txt'})
ignore_patterns: Patterns to ignore (e.g., {'.*', '~*'})
debounce_seconds: Minimum time between events for same file
"""
SUPPORTED_EXTENSIONS: Set[str] = {'.pdf', '.png', '.jpg', '.jpeg', '.txt', '.md', '.docx'}
DEFAULT_IGNORE_PATTERNS: Set[str] = {'.*', '~*', '*.tmp', '*.swp', '*.bak'}
def __init__(
self,
on_created: Optional[Callable[[FileSystemEvent], None]] = None,
on_modified: Optional[Callable[[FileSystemEvent], None]] = None,
on_deleted: Optional[Callable[[FileSystemEvent], None]] = None,
extensions: Optional[Set[str]] = None,
ignore_patterns: Optional[Set[str]] = None,
debounce_seconds: float = 1.0,
):
"""
Initialize file change handler.
Args:
on_created: Callback when file is created
on_modified: Callback when file is modified
on_deleted: Callback when file is deleted
extensions: File extensions to monitor (default: all common types)
ignore_patterns: Patterns to ignore (default: temp/hidden files)
debounce_seconds: Min time between events for same file (default: 1.0)
"""
super().__init__()
self._on_created = on_created
self._on_modified = on_modified
self._on_deleted = on_deleted
self._extensions = extensions or self.SUPPORTED_EXTENSIONS
self._ignore_patterns = ignore_patterns or self.DEFAULT_IGNORE_PATTERNS
self._debounce_seconds = debounce_seconds
self._last_events: Dict[str, float] = {} # path → timestamp
def on_created(self, event: FileSystemEvent) -> None:
"""Handle file creation event."""
if not event.is_directory and self._should_process(event):
if self._is_debounced(event.src_path):
logger.debug(f"Debounced: {event.src_path}")
return
logger.info(f"File created: {event.src_path}")
if self._on_created:
try:
self._on_created(event)
except Exception as e:
logger.error(f"Error in on_created callback: {e}", exc_info=True)
def on_modified(self, event: FileSystemEvent) -> None:
"""Handle file modification event."""
if not event.is_directory and self._should_process(event):
if self._is_debounced(event.src_path):
logger.debug(f"Debounced: {event.src_path}")
return
logger.info(f"File modified: {event.src_path}")
if self._on_modified:
try:
self._on_modified(event)
except Exception as e:
logger.error(f"Error in on_modified callback: {e}", exc_info=True)
def on_deleted(self, event: FileSystemEvent) -> None:
"""Handle file deletion event."""
if not event.is_directory and self._should_process(event):
logger.info(f"File deleted: {event.src_path}")
if self._on_deleted:
try:
self._on_deleted(event)
except Exception as e:
logger.error(f"Error in on_deleted callback: {e}", exc_info=True)
# Clean up debounce tracking
self._last_events.pop(event.src_path, None)
def _should_process(self, event: FileSystemEvent) -> bool:
"""Check if file should be processed based on filters."""
path = Path(event.src_path)
# Check extension
if self._extensions and path.suffix not in self._extensions:
return False
# Check ignore patterns
for pattern in self._ignore_patterns:
if path.match(pattern):
return False
return True
def _is_debounced(self, file_path: str) -> bool:
"""Check if event should be debounced."""
current_time = time.time()
last_time = self._last_events.get(file_path, 0)
if current_time - last_time < self._debounce_seconds:
return True
self._last_events[file_path] = current_time
return False
Testing Requirements
Unit Tests
File: tests/sdk/test_file_change_handler.py
import pytest
from pathlib import Path
import time
import tempfile
from gaia import FileChangeHandler
from watchdog.observers import Observer
from watchdog.events import FileCreatedEvent, FileModifiedEvent
def test_file_change_handler_can_be_imported():
"""Verify FileChangeHandler can be imported from gaia."""
from gaia import FileChangeHandler
assert FileChangeHandler is not None
def test_handler_creation():
"""Test handler can be created."""
handler = FileChangeHandler()
assert handler is not None
def test_handler_with_callbacks():
"""Test handler with callbacks."""
created_files = []
modified_files = []
def on_create(event):
created_files.append(event.src_path)
def on_modify(event):
modified_files.append(event.src_path)
handler = FileChangeHandler(
on_created=on_create,
on_modified=on_modify
)
assert handler._on_created is not None
assert handler._on_modified is not None
def test_extension_filtering():
"""Test file extension filtering."""
processed_files = []
def on_create(event):
processed_files.append(event.src_path)
handler = FileChangeHandler(
on_created=on_create,
extensions={'.pdf', '.txt'}
)
# Simulate events
pdf_event = FileCreatedEvent("test.pdf")
txt_event = FileCreatedEvent("test.txt")
jpg_event = FileCreatedEvent("test.jpg")
handler.on_created(pdf_event)
handler.on_created(txt_event)
handler.on_created(jpg_event)
# Only .pdf and .txt should be processed
assert len(processed_files) == 2
assert "test.pdf" in processed_files[0]
assert "test.txt" in processed_files[1]
def test_ignore_patterns():
"""Test ignore patterns."""
processed_files = []
def on_create(event):
processed_files.append(event.src_path)
handler = FileChangeHandler(
on_created=on_create,
ignore_patterns={'.*', '~*'}
)
# Simulate events
normal_event = FileCreatedEvent("file.txt")
hidden_event = FileCreatedEvent(".hidden")
temp_event = FileCreatedEvent("~temp.txt")
handler.on_created(normal_event)
handler.on_created(hidden_event)
handler.on_created(temp_event)
# Only normal file should be processed
assert len(processed_files) == 1
assert "file.txt" in processed_files[0]
def test_debouncing():
"""Test event debouncing."""
call_count = 0
def on_create(event):
nonlocal call_count
call_count += 1
handler = FileChangeHandler(
on_created=on_create,
debounce_seconds=0.5
)
# Rapid events on same file
event = FileCreatedEvent("test.txt")
handler.on_created(event)
handler.on_created(event) # Should be debounced
handler.on_created(event) # Should be debounced
assert call_count == 1
# Wait for debounce period
time.sleep(0.6)
handler.on_created(event) # Should process
assert call_count == 2
def test_real_file_watching(tmp_path):
"""Test with real file system changes."""
processed_files = []
def on_create(event):
processed_files.append(Path(event.src_path).name)
handler = FileChangeHandler(
on_created=on_create,
extensions={'.txt'}
)
observer = Observer()
observer.schedule(handler, str(tmp_path), recursive=False)
observer.start()
try:
# Create a file
test_file = tmp_path / "test.txt"
test_file.write_text("test content")
# Wait for event
time.sleep(0.5)
# Verify callback was triggered
assert "test.txt" in processed_files
finally:
observer.stop()
observer.join(timeout=5)
def test_callback_error_handling():
"""Test that callback errors don't crash handler."""
def bad_callback(event):
raise Exception("Callback error")
handler = FileChangeHandler(on_created=bad_callback)
# Should not raise
event = FileCreatedEvent("test.txt")
handler.on_created(event) # Logs error but doesn't crash
Usage Examples
from gaia import Agent, FileChangeHandler
from gaia.llm.vlm_client import VLMClient
from watchdog.observers import Observer
from pathlib import Path
class MedicalIntakeAgent(Agent):
"""Process medical intake forms automatically."""
def __init__(self, watch_dir: str = "./intake_forms", **kwargs):
super().__init__(**kwargs)
self.watch_dir = Path(watch_dir)
self.watch_dir.mkdir(exist_ok=True)
self.vlm = VLMClient()
self._observer = None
self._start_watching()
def _start_watching(self):
"""Start watching for new intake forms."""
def process_new_form(event):
print(f"New form detected: {event.src_path}")
# Extract data from form
result = self._extract_form_data(event.src_path)
print(f"Extracted: {result}")
handler = FileChangeHandler(
on_created=process_new_form,
extensions={'.pdf', '.png', '.jpg'},
debounce_seconds=2.0
)
self._observer = Observer()
self._observer.schedule(handler, str(self.watch_dir), recursive=False)
self._observer.start()
print(f"Watching: {self.watch_dir}")
def _extract_form_data(self, image_path: str) -> dict:
"""Extract data from intake form."""
path = Path(image_path)
image_bytes = path.read_bytes()
extracted = self.vlm.extract_from_image(image_bytes, "Extract patient data")
return {"file": str(path), "data": extracted}
def __del__(self):
"""Stop watching on cleanup."""
if self._observer:
self._observer.stop()
self._observer.join(timeout=5)
Example 2: Document Indexing Agent
from gaia import Agent, FileChangeHandler
from gaia.rag.sdk import RAGSDK, RAGConfig
from watchdog.observers import Observer
class AutoIndexAgent(Agent):
"""Automatically index new documents."""
def __init__(self, docs_dir: str = "./docs", **kwargs):
super().__init__(**kwargs)
self.rag = RAGSDK(RAGConfig())
self._setup_watching(docs_dir)
def _setup_watching(self, docs_dir: str):
"""Watch directory for new documents."""
def index_new_doc(event):
print(f"Indexing: {event.src_path}")
self.rag.index_document(event.src_path)
print(f"✅ Indexed: {Path(event.src_path).name}")
handler = FileChangeHandler(
on_created=index_new_doc,
extensions={'.pdf', '.txt', '.md', '.docx'}
)
observer = Observer()
observer.schedule(handler, docs_dir, recursive=True)
observer.start()
print(f"Auto-indexing enabled for: {docs_dir}")
Implementation Details
Current location: src/gaia/agents/chat/agent.py (around line 70)
# Current (tightly coupled)
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, agent):
self.agent = agent # ❌ Coupled to specific agent
def on_created(self, event):
self.agent.reindex() # ❌ Calls agent method
Target (generic):
# Generic (callback-based)
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, on_created=None, ...):
self._on_created = on_created # ✅ Generic callback
def on_created(self, event):
if self._on_created:
self._on_created(event) # ✅ Calls callback
Debouncing Implementation
def __init__(self, ..., debounce_seconds=1.0):
self._debounce_seconds = debounce_seconds
self._last_events = {} # path → timestamp
def _is_debounced(self, file_path: str) -> bool:
"""Check if event should be debounced."""
current_time = time.time()
last_time = self._last_events.get(file_path, 0)
if current_time - last_time < self._debounce_seconds:
return True # Skip this event
self._last_events[file_path] = current_time
return False # Process this event
Extension Filtering
def _should_process(self, event: FileSystemEvent) -> bool:
"""Check if file should be processed."""
path = Path(event.src_path)
# Check extension
if self._extensions and path.suffix not in self._extensions:
return False
# Check ignore patterns
for pattern in self._ignore_patterns:
if path.match(pattern):
return False
return True
Dependencies
Required Packages
# pyproject.toml
[project]
dependencies = [
"watchdog>=3.0.0", # File system monitoring
]
Import Dependencies
from pathlib import Path
from typing import Callable, Optional, Set, Dict
from watchdog.events import FileSystemEventHandler, FileSystemEvent
from watchdog.observers import Observer
import time
import logging
Documentation Updates Required
SDK.md
Add new section after Tool Mixins:
## 10. File System Monitoring
### FileChangeHandler
**Import:** `from gaia import FileChangeHandler`
**Purpose:** Monitor directories for file changes and trigger automatic processing.
**When to use:**
- Auto-process files dropped in a folder
- Auto-index new documents
- Watch for configuration changes
- Trigger workflows on file events
[Full documentation with examples]
Update EMR Example
Replace manual file watching with FileChangeHandler in medical-intake-build-guide.md
Acceptance Criteria
Implementation Checklist
Step 1: Create File
Step 2: Implement Class
Step 3: Add Features
Step 4: Write Tests
Step 5: Export & Document
Step 6: Validate
FileChangeHandler Technical Specification