diff options
Diffstat (limited to 'servers/taskwarrior/src/mcp_server_taskwarrior')
4 files changed, 584 insertions, 0 deletions
diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/__init__.py b/servers/taskwarrior/src/mcp_server_taskwarrior/__init__.py new file mode 100644 index 0000000..cc296ae --- /dev/null +++ b/servers/taskwarrior/src/mcp_server_taskwarrior/__init__.py @@ -0,0 +1,33 @@ +import argparse +import asyncio + +from . import server + + +def main() -> None: + """Main entry point for the package.""" + parser = argparse.ArgumentParser(description="TaskWarrior MCP Server") + parser.add_argument( + "--transport", + default="stdio", + choices=["stdio", "remote"], + help="Transport method (stdio or remote)", + ) + parser.add_argument( + "--host", + default="127.0.0.1", + help="Host for remote transport (default: 127.0.0.1)", + ) + parser.add_argument( + "--port", + type=int, + default=8080, + help="Port for remote transport (default: 8080)", + ) + + args = parser.parse_args() + asyncio.run(server.main(args.transport, args.host, args.port)) + + +# Expose important items at package level +__all__ = ["server", "main"] diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/__main__.py b/servers/taskwarrior/src/mcp_server_taskwarrior/__main__.py new file mode 100644 index 0000000..eb8c2a9 --- /dev/null +++ b/servers/taskwarrior/src/mcp_server_taskwarrior/__main__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python3 +""" +Command-line interface for the TaskWarrior MCP server. +""" + +from .cli import run_server + +if __name__ == "__main__": + run_server() diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/cli.py b/servers/taskwarrior/src/mcp_server_taskwarrior/cli.py new file mode 100644 index 0000000..1c556ff --- /dev/null +++ b/servers/taskwarrior/src/mcp_server_taskwarrior/cli.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" +Common CLI functionality for the TaskWarrior MCP server. +""" + +import argparse +import asyncio +import logging +import sys + +from .server import main + +logger = logging.getLogger("mcp_taskwarrior_server") + + +def parse_args() -> argparse.Namespace: + """Parse command-line arguments.""" + parser = argparse.ArgumentParser(description="TaskWarrior MCP Server") + parser.add_argument( + "--transport", + choices=["stdio", "remote"], + default="stdio", + help="Transport type (stdio or remote)", + ) + parser.add_argument( + "--host", + default="127.0.0.1", + help="Host to bind to for remote transport (default: 127.0.0.1)", + ) + parser.add_argument( + "--port", + type=int, + default=8080, + help="Port to bind to for remote transport (default: 8080)", + ) + parser.add_argument( + "--log-level", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="INFO", + help="Set the logging level (default: INFO)", + ) + return parser.parse_args() + + +def validate_args(args: argparse.Namespace) -> argparse.Namespace: + """Validate command-line arguments.""" + if ( + args.transport == "remote" + and args.port < 1024 + and not sys.platform.startswith("win") + ): + logger.warning( + "Using a port below 1024 may require root privileges on Unix-like systems." + ) + return args + + +def setup_logging(level: str, transport: str) -> None: + """Set up logging configuration. + + Args: + level: The base logging level from command line args + transport: The transport type being used (stdio or remote) + """ + # Create logs directory if it doesn't exist + import os + + os.makedirs("logs", exist_ok=True) + + # Configure file handler for all logs + file_handler = logging.FileHandler("logs/mcp_server.log") + file_handler.setLevel(getattr(logging, level)) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ) + + # Configure console handler with appropriate level + console_handler = logging.StreamHandler() + # Use WARNING level for stdio transport to reduce interference + if transport == "stdio": + console_handler.setLevel(logging.WARNING) + else: + console_handler.setLevel(getattr(logging, level)) + console_handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ) + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(getattr(logging, level)) # Base level for all handlers + root_logger.handlers = [] # Remove any existing handlers + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + +def run_server() -> None: + """Run the server with CLI arguments.""" + args = validate_args(parse_args()) + setup_logging(args.log_level, args.transport) + + try: + asyncio.run(main(args.transport, args.host, args.port)) + except KeyboardInterrupt: + logger.info("Server stopped by user") + except Exception as e: + logger.error(f"Error running server: {e}") + sys.exit(1) diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/server.py b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py new file mode 100644 index 0000000..b73ec0a --- /dev/null +++ b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py @@ -0,0 +1,435 @@ +"""TaskWarrior MCP Server implementation.""" + +import asyncio +import json +import logging +import os +import sys +from typing import Any + +from mcp.server.fastmcp import FastMCP + +# reconfigure UnicodeEncodeError prone default (i.e. windows-1252) to utf-8 +if sys.platform == "win32" and os.environ.get('PYTHONIOENCODING') is None: + sys.stdin.reconfigure(encoding="utf-8") + sys.stdout.reconfigure(encoding="utf-8") + sys.stderr.reconfigure(encoding="utf-8") + +logger = logging.getLogger('mcp_taskwarrior_server') + + +class TaskWarriorError(Exception): + """Base exception for TaskWarrior operations.""" + + pass + + +class TaskWarriorServer: + """Server class for TaskWarrior MCP operations.""" + + def __init__(self) -> None: + """Initialize the TaskWarrior server.""" + self._check_taskwarrior_installed() + + def _check_taskwarrior_installed(self) -> None: + """Check if TaskWarrior is installed and accessible.""" + # This will be checked on first command execution + pass + + async def _run_task_command( + self, *args: str, input_text: str | None = None + ) -> tuple[str, str, int]: + """Execute a TaskWarrior command and return stdout, stderr, return code. + + Args: + *args: Command arguments to pass to task command + input_text: Optional input text to send to stdin + + Returns: + Tuple of (stdout, stderr, return_code) + + Raises: + TaskWarriorError: If taskwarrior is not installed or command fails + """ + try: + # Use rc.confirmation:off to avoid interactive prompts + # Use rc.bulk:0 to avoid bulk operation confirmations + cmd = ["task", "rc.confirmation:off", "rc.bulk:0"] + list(args) + + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE if input_text else None, + ) + + stdout_bytes, stderr_bytes = await process.communicate( + input_text.encode() if input_text else None + ) + + stdout = stdout_bytes.decode('utf-8', errors='replace') + stderr = stderr_bytes.decode('utf-8', errors='replace') + return_code = process.returncode + + if return_code != 0: + error_msg = stderr.strip() or stdout.strip() or "Unknown error" + # Check if taskwarrior is not installed + error_lower = error_msg.lower() + if ("command not found" in error_lower or "not found" in error_lower): + raise TaskWarriorError( + "TaskWarrior is not installed or not in PATH. " + "Please install TaskWarrior to use this server." + ) + raise TaskWarriorError(f"TaskWarrior command failed: {error_msg}") + + return stdout, stderr, return_code + + except FileNotFoundError: + raise TaskWarriorError( + "TaskWarrior is not installed or not in PATH. " + "Please install TaskWarrior to use this server." + ) from None + except Exception as e: + if isinstance(e, TaskWarriorError): + raise + raise TaskWarriorError( + f"Error executing TaskWarrior command: {str(e)}" + ) from e + + async def list_tasks(self, filter_expr: str | None = None) -> list[dict[str, Any]]: + """List tasks matching the optional filter. + + Args: + filter_expr: Optional filter expression (e.g., "project:Home", "+work") + + Returns: + List of task dictionaries from JSON export + """ + args = [] + if filter_expr: + args.append(filter_expr) + args.append("export") + + stdout, _, _ = await self._run_task_command(*args) + + # Parse JSON output + try: + # TaskWarrior export outputs JSON array, but may have trailing newlines + stdout = stdout.strip() + if not stdout: + return [] + tasks = json.loads(stdout) + if not isinstance(tasks, list): + return [tasks] if tasks else [] + return tasks + except json.JSONDecodeError as e: + logger.error(f"Failed to parse TaskWarrior JSON output: {e}") + logger.debug(f"Output was: {stdout}") + raise TaskWarriorError( + f"Failed to parse TaskWarrior output: {str(e)}" + ) from e + + async def add_task( + self, + description: str, + project: str | None = None, + priority: str | None = None, + due: str | None = None, + tags: list[str] | None = None, + ) -> dict[str, Any]: + """Add a new task to TaskWarrior. + + Args: + description: Task description (required) + project: Optional project name + priority: Optional priority (H, M, or L) + due: Optional due date (ISO format or TaskWarrior date format) + tags: Optional list of tags + + Returns: + Dictionary with task information including UUID + """ + args = ["add", description] + + if project: + args.append(f"project:{project}") + + if priority: + if priority.upper() not in ["H", "M", "L"]: + raise ValueError(f"Priority must be H, M, or L, got: {priority}") + args.append(f"priority:{priority.upper()}") + + if due: + args.append(f"due:{due}") + + if tags: + for tag in tags: + args.append(f"+{tag}") + + stdout, stderr, _ = await self._run_task_command(*args) + + # Extract UUID from output + # TaskWarrior output format: "Created task 123.\n\nUUID: <uuid>" + uuid = None + for line in stdout.split('\n'): + if line.startswith('UUID:'): + uuid = line.split(':', 1)[1].strip() + break + + # If UUID not found in stdout, try to get it by listing the newest task + if not uuid: + # Get the newest task to find the UUID + tasks = await self.list_tasks("status:pending") + if tasks: + # Sort by entry date (most recent first) + tasks.sort(key=lambda t: t.get('entry', ''), reverse=True) + uuid = tasks[0].get('uuid') + + return { + "description": description, + "uuid": uuid, + "project": project, + "priority": priority.upper() if priority else None, + "due": due, + "tags": tags or [], + "output": stdout.strip(), + } + + async def done_task(self, uuid: str) -> dict[str, Any]: + """Mark a task as completed. + + Args: + uuid: Task UUID (stable identifier) + + Returns: + Dictionary with completion information + """ + stdout, stderr, _ = await self._run_task_command(uuid, "done") + + return { + "uuid": uuid, + "status": "completed", + "output": stdout.strip(), + } + + async def delete_task(self, uuid: str) -> dict[str, Any]: + """Delete a task. + + Args: + uuid: Task UUID (stable identifier) + + Returns: + Dictionary with deletion information + """ + stdout, stderr, _ = await self._run_task_command(uuid, "delete") + + return { + "uuid": uuid, + "status": "deleted", + "output": stdout.strip(), + } + + async def manage_context( + self, action: str, name: str | None = None + ) -> dict[str, Any]: + """Manage TaskWarrior contexts. + + Args: + action: One of "set", "list", "show", "none" + name: Context name (required for "set" action) + + Returns: + Dictionary with context information + """ + if action not in ["set", "list", "show", "none"]: + raise ValueError( + f"Action must be one of: set, list, show, none. Got: {action}" + ) + + if action == "set" and not name: + raise ValueError("Context name is required for 'set' action") + + args = ["context"] + + if action == "set": + args.append(name) + elif action == "none": + args.append("none") + else: + args.append(action) + + stdout, stderr, _ = await self._run_task_command(*args) + + result: dict[str, Any] = { + "action": action, + "output": stdout.strip(), + } + + if action == "set" and name: + result["context"] = name + elif action == "show": + # Parse the context show output to extract context name + lines = stdout.strip().split('\n') + for line in lines: + if line.startswith("Context '") and "'" in line: + context_name = line.split("'")[1] + result["context"] = context_name + break + elif action == "list": + # Parse context list output + contexts = [] + for line in stdout.strip().split('\n'): + if line.strip() and not line.startswith("Name"): + parts = line.split() + if len(parts) >= 2: + ctx_name = parts[0] + if ctx_name not in [c.get("name") for c in contexts]: + contexts.append({"name": ctx_name}) + result["contexts"] = contexts + + return result + + +def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: + """Create and configure the FastMCP server. + + Args: + host: Host to bind to for remote transport + port: Port to bind to for remote transport + + Returns: + Configured FastMCP server instance + """ + mcp = FastMCP("TaskWarrior", host=host, port=port) + + # Create a TaskWarriorServer instance + taskwarrior = TaskWarriorServer() + + @mcp.tool() + async def list_tasks(filter_expr: str | None = None) -> str: + """List tasks with optional filter. + + Args: + filter_expr: Optional filter expression + (e.g., "project:Home", "+work", "status:pending") + + Returns: + JSON string of tasks matching the filter + """ + try: + tasks = await taskwarrior.list_tasks(filter_expr) + return json.dumps(tasks, indent=2) + except Exception as e: + logger.error(f"Error listing tasks: {e}") + return json.dumps({"error": str(e)}) + + @mcp.tool() + async def add_task( + description: str, + project: str | None = None, + priority: str | None = None, + due: str | None = None, + tags: list[str] | None = None, + ) -> str: + """Add a new task to TaskWarrior. + + Args: + description: Task description (required) + project: Optional project name + priority: Optional priority (H, M, or L) + due: Optional due date (ISO format or TaskWarrior date format + like "tomorrow", "2024-12-25") + tags: Optional list of tags + + Returns: + JSON string with task information including UUID + """ + try: + result = await taskwarrior.add_task( + description=description, + project=project, + priority=priority, + due=due, + tags=tags, + ) + return json.dumps(result, indent=2) + except Exception as e: + logger.error(f"Error adding task: {e}") + return json.dumps({"error": str(e)}) + + @mcp.tool() + async def done_task(uuid: str) -> str: + """Mark a task as completed. + + Args: + uuid: Task UUID (stable identifier, not the numeric ID) + + Returns: + JSON string with completion information + """ + try: + result = await taskwarrior.done_task(uuid) + return json.dumps(result, indent=2) + except Exception as e: + logger.error(f"Error marking task as done: {e}") + return json.dumps({"error": str(e)}) + + @mcp.tool() + async def delete_task(uuid: str) -> str: + """Delete a task. + + Args: + uuid: Task UUID (stable identifier, not the numeric ID) + + Returns: + JSON string with deletion information + """ + try: + result = await taskwarrior.delete_task(uuid) + return json.dumps(result, indent=2) + except Exception as e: + logger.error(f"Error deleting task: {e}") + return json.dumps({"error": str(e)}) + + @mcp.tool() + async def context(action: str, name: str | None = None) -> str: + """Manage TaskWarrior contexts. + + Args: + action: One of "set", "list", "show", "none" + name: Context name (required for "set" action, optional otherwise) + + Returns: + JSON string with context information + """ + try: + result = await taskwarrior.manage_context(action, name) + return json.dumps(result, indent=2) + except Exception as e: + logger.error(f"Error managing context: {e}") + return json.dumps({"error": str(e)}) + + return mcp + + +async def main(transport_type: str, host: str, port: int) -> None: + """Start the server with the specified transport. + + Args: + transport_type: Transport type ("stdio" or "remote") + host: Host to bind to for remote transport + port: Port to bind to for remote transport + """ + logger.info("Starting MCP TaskWarrior Server") + logger.info(f"Starting TaskWarrior MCP Server with {transport_type} transport") + + # Create the server with host and port + mcp = create_server(host=host, port=port) + + # Run the server with the appropriate transport + if transport_type == "stdio": + logger.info("Server running with stdio transport") + await mcp.run_stdio_async() + else: # remote transport + logger.info(f"Server running with remote transport on {host}:{port}") + await mcp.run_sse_async() |
