"""TaskWarrior MCP Server implementation.""" import asyncio import json import logging import os import sys from typing import Any from mcp.server.fastmcp import FastMCP # reconfigure UnicodeEncodeError prone default (i.e. windows-1252) to utf-8 if sys.platform == "win32" and os.environ.get('PYTHONIOENCODING') is None: sys.stdin.reconfigure(encoding="utf-8") sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") logger = logging.getLogger('mcp_taskwarrior_server') class TaskWarriorError(Exception): """Base exception for TaskWarrior operations.""" pass class TaskWarriorServer: """Server class for TaskWarrior MCP operations.""" def __init__(self) -> None: """Initialize the TaskWarrior server.""" self._check_taskwarrior_installed() def _check_taskwarrior_installed(self) -> None: """Check if TaskWarrior is installed and accessible.""" # This will be checked on first command execution pass async def _run_task_command( self, *args: str, input_text: str | None = None ) -> tuple[str, str, int]: """Execute a TaskWarrior command and return stdout, stderr, return code. Args: *args: Command arguments to pass to task command input_text: Optional input text to send to stdin Returns: Tuple of (stdout, stderr, return_code) Raises: TaskWarriorError: If taskwarrior is not installed or command fails """ try: # Use rc.confirmation:off to avoid interactive prompts # Use rc.bulk:0 to avoid bulk operation confirmations cmd = ["task", "rc.confirmation:off", "rc.bulk:0"] + list(args) process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE if input_text else None, ) stdout_bytes, stderr_bytes = await process.communicate( input_text.encode() if input_text else None ) stdout = stdout_bytes.decode('utf-8', errors='replace') stderr = stderr_bytes.decode('utf-8', errors='replace') return_code = process.returncode if return_code != 0: error_msg = stderr.strip() or stdout.strip() or "Unknown error" # Check if taskwarrior is not installed error_lower = error_msg.lower() if ("command not found" in error_lower or "not found" in error_lower): raise TaskWarriorError( "TaskWarrior is not installed or not in PATH. " "Please install TaskWarrior to use this server." ) raise TaskWarriorError(f"TaskWarrior command failed: {error_msg}") return stdout, stderr, return_code except FileNotFoundError: raise TaskWarriorError( "TaskWarrior is not installed or not in PATH. " "Please install TaskWarrior to use this server." ) from None except Exception as e: if isinstance(e, TaskWarriorError): raise raise TaskWarriorError( f"Error executing TaskWarrior command: {str(e)}" ) from e async def list_tasks(self, filter_expr: str | None = None) -> list[dict[str, Any]]: """List tasks matching the optional filter. Args: filter_expr: Optional filter expression (e.g., "project:Home", "+work") Returns: List of task dictionaries from JSON export """ args = [] if filter_expr: args.append(filter_expr) args.append("export") stdout, _, _ = await self._run_task_command(*args) # Parse JSON output try: # TaskWarrior export outputs JSON array, but may have trailing newlines stdout = stdout.strip() if not stdout: return [] tasks = json.loads(stdout) if not isinstance(tasks, list): return [tasks] if tasks else [] return tasks except json.JSONDecodeError as e: logger.error(f"Failed to parse TaskWarrior JSON output: {e}") logger.debug(f"Output was: {stdout}") raise TaskWarriorError( f"Failed to parse TaskWarrior output: {str(e)}" ) from e async def add_task( self, description: str, project: str | None = None, priority: str | None = None, due: str | None = None, tags: list[str] | None = None, ) -> dict[str, Any]: """Add a new task to TaskWarrior. Args: description: Task description (required) project: Optional project name priority: Optional priority (H, M, or L) due: Optional due date (ISO format or TaskWarrior date format) tags: Optional list of tags Returns: Dictionary with task information including UUID """ args = ["add", description] if project: args.append(f"project:{project}") if priority: if priority.upper() not in ["H", "M", "L"]: raise ValueError(f"Priority must be H, M, or L, got: {priority}") args.append(f"priority:{priority.upper()}") if due: args.append(f"due:{due}") if tags: for tag in tags: args.append(f"+{tag}") stdout, stderr, _ = await self._run_task_command(*args) # Extract UUID from output # TaskWarrior output format: "Created task 123.\n\nUUID: " uuid = None for line in stdout.split('\n'): if line.startswith('UUID:'): uuid = line.split(':', 1)[1].strip() break # If UUID not found in stdout, try to get it by listing the newest task if not uuid: # Get the newest task to find the UUID tasks = await self.list_tasks("status:pending") if tasks: # Sort by entry date (most recent first) tasks.sort(key=lambda t: t.get('entry', ''), reverse=True) uuid = tasks[0].get('uuid') return { "description": description, "uuid": uuid, "project": project, "priority": priority.upper() if priority else None, "due": due, "tags": tags or [], "output": stdout.strip(), } async def done_task(self, uuid: str) -> dict[str, Any]: """Mark a task as completed. Args: uuid: Task UUID (stable identifier) Returns: Dictionary with completion information """ stdout, stderr, _ = await self._run_task_command(uuid, "done") return { "uuid": uuid, "status": "completed", "output": stdout.strip(), } async def delete_task(self, uuid: str) -> dict[str, Any]: """Delete a task. Args: uuid: Task UUID (stable identifier) Returns: Dictionary with deletion information """ stdout, stderr, _ = await self._run_task_command(uuid, "delete") return { "uuid": uuid, "status": "deleted", "output": stdout.strip(), } async def manage_context( self, action: str, name: str | None = None ) -> dict[str, Any]: """Manage TaskWarrior contexts. Args: action: One of "set", "list", "show", "none" name: Context name (required for "set" action) Returns: Dictionary with context information """ if action not in ["set", "list", "show", "none"]: raise ValueError( f"Action must be one of: set, list, show, none. Got: {action}" ) if action == "set" and not name: raise ValueError("Context name is required for 'set' action") args = ["context"] if action == "set": args.append(name) elif action == "none": args.append("none") else: args.append(action) stdout, stderr, _ = await self._run_task_command(*args) result: dict[str, Any] = { "action": action, "output": stdout.strip(), } if action == "set" and name: result["context"] = name elif action == "show": # Parse the context show output to extract context name lines = stdout.strip().split('\n') for line in lines: if line.startswith("Context '") and "'" in line: context_name = line.split("'")[1] result["context"] = context_name break elif action == "list": # Parse context list output contexts = [] for line in stdout.strip().split('\n'): if line.strip() and not line.startswith("Name"): parts = line.split() if len(parts) >= 2: ctx_name = parts[0] if ctx_name not in [c.get("name") for c in contexts]: contexts.append({"name": ctx_name}) result["contexts"] = contexts return result def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: """Create and configure the FastMCP server. Args: host: Host to bind to for remote transport port: Port to bind to for remote transport Returns: Configured FastMCP server instance """ mcp = FastMCP("TaskWarrior", host=host, port=port) # Create a TaskWarriorServer instance taskwarrior = TaskWarriorServer() @mcp.tool() async def list_tasks(filter_expr: str | None = None) -> str: """List tasks with optional filter. Args: filter_expr: Optional filter expression (e.g., "project:Home", "+work", "status:pending") Returns: JSON string of tasks matching the filter """ try: tasks = await taskwarrior.list_tasks(filter_expr) return json.dumps(tasks, indent=2) except Exception as e: logger.error(f"Error listing tasks: {e}") return json.dumps({"error": str(e)}) @mcp.tool() async def add_task( description: str, project: str | None = None, priority: str | None = None, due: str | None = None, tags: list[str] | None = None, ) -> str: """Add a new task to TaskWarrior. Args: description: Task description (required) project: Optional project name priority: Optional priority (H, M, or L) due: Optional due date (ISO format or TaskWarrior date format like "tomorrow", "2024-12-25") tags: Optional list of tags Returns: JSON string with task information including UUID """ try: result = await taskwarrior.add_task( description=description, project=project, priority=priority, due=due, tags=tags, ) return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error adding task: {e}") return json.dumps({"error": str(e)}) @mcp.tool() async def done_task(uuid: str) -> str: """Mark a task as completed. Args: uuid: Task UUID (stable identifier, not the numeric ID) Returns: JSON string with completion information """ try: result = await taskwarrior.done_task(uuid) return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error marking task as done: {e}") return json.dumps({"error": str(e)}) @mcp.tool() async def delete_task(uuid: str) -> str: """Delete a task. Args: uuid: Task UUID (stable identifier, not the numeric ID) Returns: JSON string with deletion information """ try: result = await taskwarrior.delete_task(uuid) return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error deleting task: {e}") return json.dumps({"error": str(e)}) @mcp.tool() async def context(action: str, name: str | None = None) -> str: """Manage TaskWarrior contexts. Args: action: One of "set", "list", "show", "none" name: Context name (required for "set" action, optional otherwise) Returns: JSON string with context information """ try: result = await taskwarrior.manage_context(action, name) return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error managing context: {e}") return json.dumps({"error": str(e)}) return mcp async def main(transport_type: str, host: str, port: int) -> None: """Start the server with the specified transport. Args: transport_type: Transport type ("stdio" or "remote") host: Host to bind to for remote transport port: Port to bind to for remote transport """ logger.info("Starting MCP TaskWarrior Server") logger.info(f"Starting TaskWarrior MCP Server with {transport_type} transport") # Create the server with host and port mcp = create_server(host=host, port=port) # Run the server with the appropriate transport if transport_type == "stdio": logger.info("Server running with stdio transport") await mcp.run_stdio_async() else: # remote transport logger.info(f"Server running with remote transport on {host}:{port}") await mcp.run_sse_async()