Source Code:
src/gaia/agents/base/console.pyComponent: OutputHandler, AgentConsole, SilentConsole, SSEOutputHandler
Module:
gaia.agents.base.console, gaia.api.sse_handler
Import: from gaia.agents.base.console import OutputHandler, AgentConsole, SilentConsoleOverview
The console output system provides a unified interface for handling agent output across different contexts (CLI, testing, API). It implements a modular design where agents report progress through an abstractOutputHandler interface, and different implementations handle the output appropriately for their context.
Key Features:
- Abstract interface for agent output
- Rich terminal formatting (when available)
- Silent mode for testing
- Server-Sent Events (SSE) for API streaming
- Progress indicators and file preview
- Consistent output format across all agents
Architecture
Copy
OutputHandler (ABC)
├── AgentConsole (Rich CLI output)
├── SilentConsole (Suppressed output for testing)
└── SSEOutputHandler (API streaming via SSE)
- Agents call output methods without knowing the implementation
- Output handlers decide HOW to display information
- Enables testing without modifying agent code
- Supports real-time streaming to API clients
API Specification
OutputHandler (Abstract Base Class)
Copy
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
class OutputHandler(ABC):
"""
Abstract base class for handling agent output.
Defines the minimal interface that agents use to report their progress.
Each implementation handles the output differently:
- AgentConsole: Rich console output for CLI
- SilentConsole: Suppressed output for testing
- SSEOutputHandler: Server-Sent Events for API streaming
"""
# === Core Progress/State Methods (Required) ===
@abstractmethod
def print_processing_start(self, query: str, max_steps: int):
"""Print processing start message."""
...
@abstractmethod
def print_step_header(self, step_num: int, step_limit: int):
"""Print step header."""
...
@abstractmethod
def print_state_info(self, state_message: str):
"""Print current execution state."""
...
@abstractmethod
def print_thought(self, thought: str):
"""Print agent's reasoning/thought."""
...
@abstractmethod
def print_goal(self, goal: str):
"""Print agent's current goal."""
...
@abstractmethod
def print_plan(self, plan: List[Any], current_step: int = None):
"""Print agent's plan with optional current step highlight."""
...
# === Tool Execution Methods (Required) ===
@abstractmethod
def print_tool_usage(self, tool_name: str):
"""Print tool being called."""
...
@abstractmethod
def print_tool_complete(self):
"""Print tool completion."""
...
@abstractmethod
def pretty_print_json(self, data: Dict[str, Any], title: str = None):
"""Print JSON data (tool args/results)."""
...
# === Status Messages (Required) ===
@abstractmethod
def print_error(self, error_message: str):
"""Print error message."""
...
@abstractmethod
def print_warning(self, warning_message: str):
"""Print warning message."""
...
@abstractmethod
def print_info(self, message: str):
"""Print informational message."""
...
# === Progress Indicators (Required) ===
@abstractmethod
def start_progress(self, message: str):
"""Start progress indicator."""
...
@abstractmethod
def stop_progress(self):
"""Stop progress indicator."""
...
# === Completion Methods (Required) ===
@abstractmethod
def print_final_answer(self, answer: str):
"""Print final answer/result."""
...
@abstractmethod
def print_repeated_tool_warning(self):
"""Print warning about repeated tool calls (loop detection)."""
...
@abstractmethod
def print_completion(self, steps_taken: int, steps_limit: int):
"""Print completion summary."""
...
@abstractmethod
def print_step_paused(self, description: str):
"""Print step paused message."""
...
@abstractmethod
def print_command_executing(self, command: str):
"""Print command executing message."""
...
@abstractmethod
def print_agent_selected(self, agent_name: str, language: str, project_type: str):
"""Print agent selected message."""
...
# === Optional Methods (with default no-op implementations) ===
def print_prompt(self, prompt: str, title: str = "Prompt"):
"""Print prompt (for debugging). Optional - default no-op."""
...
def print_response(self, response: str, title: str = "Response"):
"""Print response (for debugging). Optional - default no-op."""
...
def print_streaming_text(self, text_chunk: str, end_of_stream: bool = False):
"""Print streaming text. Optional - default no-op."""
...
def display_stats(self, stats: Dict[str, Any]):
"""Display performance statistics. Optional - default no-op."""
...
AgentConsole
Copy
class AgentConsole(OutputHandler):
"""
Rich console output for CLI applications.
Provides:
- Syntax highlighting for code and JSON
- Progress spinners
- Live file preview during code generation
- Colored output with emojis
- Formatted tables for statistics
"""
def __init__(self):
"""Initialize the AgentConsole with Rich library support."""
self.rich_available = RICH_AVAILABLE
self.console = Console() if self.rich_available else None
self.progress = ProgressIndicator()
self.streaming_buffer = ""
def print(self, *args, **kwargs):
"""
Print method that delegates to Rich Console or standard print.
Allows code to call console.print() directly on AgentConsole instances.
"""
...
# File Preview Methods (Code Agent specific)
def start_file_preview(
self,
filename: str,
max_lines: int = 15,
title_prefix: str = "📄"
) -> None:
"""Start a live streaming file preview window."""
...
def update_file_preview(self, content_chunk: str) -> None:
"""Update the live file preview with new content."""
...
def stop_file_preview(self) -> None:
"""Stop the live file preview and show final summary."""
...
# Additional Display Methods
def print_diff(self, diff: str, filename: str) -> None:
"""Print a code diff with syntax highlighting."""
...
def print_checklist(self, items: List[Any], current_idx: int) -> None:
"""Print checklist with current item highlighted."""
...
def get_streaming_buffer(self) -> str:
"""Get accumulated streaming text and reset buffer."""
...
SilentConsole
Copy
class SilentConsole(OutputHandler):
"""
Silent console that suppresses all output for JSON-only mode.
Used for:
- Unit testing (no visual noise)
- API mode (only structured output)
- Automated workflows
All methods are no-ops except:
- print_final_answer (unless silence_final_answer=True)
- display_stats (if explicitly requested)
"""
def __init__(self, silence_final_answer: bool = False):
"""
Initialize the silent console.
Args:
silence_final_answer: If True, suppress even the final answer
"""
self.streaming_buffer = ""
self.silence_final_answer = silence_final_answer
def print_final_answer(self, answer: str, streaming: bool = True) -> None:
"""
Print the final answer.
Only suppressed if silence_final_answer is True.
"""
if self.silence_final_answer:
return
print(f"\n🧠 gaia: {answer}")
def display_stats(self, stats: Dict[str, Any]) -> None:
"""
Display stats even in silent mode (since explicitly requested).
Uses the same Rich table format as AgentConsole.
"""
...
# All other methods are no-ops
SSEOutputHandler
Copy
from collections import deque
class SSEOutputHandler(OutputHandler):
"""
Output handler for Server-Sent Events (SSE) streaming to API clients.
Formats agent outputs as SSE-compatible JSON chunks that can be
streamed to API clients (e.g., VSCode extension).
Each output is converted to a dictionary and added to a queue
that can be consumed by the API server.
"""
def __init__(self, debug_mode: bool = False):
"""
Initialize the SSE output handler.
Args:
debug_mode: Enable verbose event streaming for debugging
"""
self.queue = deque()
self.streaming_buffer = ""
self.debug_mode = debug_mode
self.current_step = 0
self.total_steps = 0
def _add_event(self, event_type: str, data: Dict[str, Any]):
"""Add an event to the output queue."""
self.queue.append({
"type": event_type,
"data": data,
"timestamp": time.time()
})
def should_stream_as_content(self, event_type: str) -> bool:
"""
Determine if an event should be streamed as content to the client.
In normal mode: Only stream key status updates and final answers
In debug mode: Stream all events
"""
...
def get_events(self) -> List[Dict[str, Any]]:
"""Get all queued events and clear the queue."""
events = list(self.queue)
self.queue.clear()
return events
def has_events(self) -> bool:
"""Check if there are any queued events."""
return len(self.queue) > 0
def format_event_as_content(self, event: Dict[str, Any]) -> str:
"""
Format an event as clean content text for streaming.
Sends clean, minimal text that is OpenAI-compatible.
The VSCode extension will add formatting (emojis, separators) for display.
"""
...
def print(self, *args, **_kwargs):
"""
Handle generic print() calls - queue as message event.
Captures print() calls from agent code and queues them
as SSE events so they can be streamed to the client.
"""
message = " ".join(str(arg) for arg in args)
if message.strip():
self._add_event("message", {"text": message})
Usage Examples
Example 1: CLI Agent with Rich Output
Copy
from gaia.agents.base.console import AgentConsole
from gaia.agents.base.agent import Agent
class MyAgent(Agent):
def _create_console(self):
return AgentConsole()
def process_query(self, query: str):
# Agent reports progress through console
self.console.print_processing_start(query, max_steps=10)
self.console.print_step_header(1, 10)
self.console.print_thought("Analyzing the request...")
# Execute tools
self.console.print_tool_usage("search_files")
result = {"files": ["main.py", "test.py"]}
self.console.pretty_print_json(result, title="Search Results")
self.console.print_tool_complete()
# Show final answer
self.console.print_final_answer("Task completed successfully!")
self.console.print_completion(steps_taken=1, steps_limit=10)
Example 2: Testing with Silent Console
Copy
from gaia.agents.base.console import SilentConsole
def test_agent_execution():
"""Test agent without console noise."""
agent = MyAgent(silent_mode=True)
# All agent output is suppressed
result = agent.process_query("test query")
# Only structured result is returned
assert result.status == "success"
Example 3: API Streaming with SSE
Copy
from gaia.api.sse_handler import SSEOutputHandler
def stream_agent_execution(query: str):
"""Stream agent execution to API client."""
sse_handler = SSEOutputHandler(debug_mode=False)
agent = MyAgent(output_handler=sse_handler)
# Execute agent in background
import threading
thread = threading.Thread(target=agent.process_query, args=(query,))
thread.start()
# Stream events as they arrive
while thread.is_alive() or sse_handler.has_events():
events = sse_handler.get_events()
for event in events:
# Format for SSE streaming
if sse_handler.should_stream_as_content(event["type"]):
content = sse_handler.format_event_as_content(event)
yield f"data: {content}\n\n"
time.sleep(0.1)
Testing Requirements
Unit Tests
File:tests/agents/test_console.py
Copy
import pytest
from gaia.agents.base.console import AgentConsole, SilentConsole, OutputHandler
def test_output_handler_is_abstract():
"""Verify OutputHandler cannot be instantiated."""
with pytest.raises(TypeError):
OutputHandler()
def test_agent_console_creation():
"""Test AgentConsole initialization."""
console = AgentConsole()
assert console is not None
assert hasattr(console, 'console')
assert hasattr(console, 'progress')
def test_silent_console_suppresses_output(capsys):
"""Test SilentConsole suppresses output."""
console = SilentConsole(silence_final_answer=True)
# All methods should be no-ops
console.print_processing_start("query", 10)
console.print_thought("thinking...")
console.print_tool_usage("search")
console.print_final_answer("answer")
# No output should be captured
captured = capsys.readouterr()
assert captured.out == ""
def test_silent_console_shows_final_answer(capsys):
"""Test SilentConsole shows final answer when not silenced."""
console = SilentConsole(silence_final_answer=False)
console.print_final_answer("The answer is 42")
captured = capsys.readouterr()
assert "42" in captured.out
def test_sse_handler_queues_events():
"""Test SSEOutputHandler queues events."""
from gaia.api.sse_handler import SSEOutputHandler
handler = SSEOutputHandler()
handler.print_processing_start("test query", 5)
handler.print_tool_usage("search")
handler.print_final_answer("done")
events = handler.get_events()
assert len(events) == 3
assert events[0]["type"] == "processing_start"
assert events[1]["type"] == "tool_usage"
assert events[2]["type"] == "final_answer"
def test_sse_handler_formats_content():
"""Test SSE content formatting."""
from gaia.api.sse_handler import SSEOutputHandler
handler = SSEOutputHandler(debug_mode=False)
event = {
"type": "final_answer",
"data": {"answer": "Task completed"},
"timestamp": 1234567890
}
content = handler.format_event_as_content(event)
assert "Task completed" in content
assert content.endswith("\n")
Dependencies
Required Packages
Copy
# pyproject.toml
[project]
dependencies = [
"rich>=13.0", # For AgentConsole formatting
]
Import Dependencies
Copy
import json
import threading
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from collections import deque
# Rich library (optional - graceful fallback)
try:
from rich import print as rprint
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.spinner import Spinner
from rich.syntax import Syntax
from rich.table import Table
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
Integration Points
Agent Base Class
Copy
class Agent:
def __init__(self, output_handler: Optional[OutputHandler] = None, **kwargs):
"""Initialize agent with output handler."""
if output_handler:
self.console = output_handler
else:
self.console = self._create_console()
def _create_console(self) -> OutputHandler:
"""Create appropriate console based on mode."""
if self.silent_mode:
return SilentConsole()
return AgentConsole()
API Server Integration
Copy
# In API server endpoint
@app.post("/api/v1/agent/execute")
async def execute_agent(request: AgentRequest):
sse_handler = SSEOutputHandler()
agent = Agent(output_handler=sse_handler)
# Execute and stream
async def event_generator():
# Run agent in background
thread = threading.Thread(
target=agent.process_query,
args=(request.query,)
)
thread.start()
# Stream events
while thread.is_alive() or sse_handler.has_events():
for event in sse_handler.get_events():
yield format_sse(event)
await asyncio.sleep(0.1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
Acceptance Criteria
- OutputHandler abstract base class defined
- All 3 implementations (AgentConsole, SilentConsole, SSEOutputHandler) working
- Rich formatting works in AgentConsole (when library available)
- Graceful fallback when Rich not installed
- SilentConsole suppresses all output correctly
- SSEOutputHandler queues and formats events
- File preview works in AgentConsole
- All unit tests pass (8+ tests)
- Integration with Agent base class works
- API streaming integration works
- Documentation complete
Console Output Handlers Technical Specification