summaryrefslogtreecommitdiff
path: root/servers/taskwarrior/src
diff options
context:
space:
mode:
Diffstat (limited to 'servers/taskwarrior/src')
-rw-r--r--servers/taskwarrior/src/mcp_server_taskwarrior/__init__.py33
-rw-r--r--servers/taskwarrior/src/mcp_server_taskwarrior/__main__.py9
-rw-r--r--servers/taskwarrior/src/mcp_server_taskwarrior/cli.py107
-rw-r--r--servers/taskwarrior/src/mcp_server_taskwarrior/server.py435
4 files changed, 584 insertions, 0 deletions
diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/__init__.py b/servers/taskwarrior/src/mcp_server_taskwarrior/__init__.py
new file mode 100644
index 0000000..cc296ae
--- /dev/null
+++ b/servers/taskwarrior/src/mcp_server_taskwarrior/__init__.py
@@ -0,0 +1,33 @@
+import argparse
+import asyncio
+
+from . import server
+
+
+def main() -> None:
+ """Main entry point for the package."""
+ parser = argparse.ArgumentParser(description="TaskWarrior MCP Server")
+ parser.add_argument(
+ "--transport",
+ default="stdio",
+ choices=["stdio", "remote"],
+ help="Transport method (stdio or remote)",
+ )
+ parser.add_argument(
+ "--host",
+ default="127.0.0.1",
+ help="Host for remote transport (default: 127.0.0.1)",
+ )
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=8080,
+ help="Port for remote transport (default: 8080)",
+ )
+
+ args = parser.parse_args()
+ asyncio.run(server.main(args.transport, args.host, args.port))
+
+
+# Expose important items at package level
+__all__ = ["server", "main"]
diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/__main__.py b/servers/taskwarrior/src/mcp_server_taskwarrior/__main__.py
new file mode 100644
index 0000000..eb8c2a9
--- /dev/null
+++ b/servers/taskwarrior/src/mcp_server_taskwarrior/__main__.py
@@ -0,0 +1,9 @@
+#!/usr/bin/env python3
+"""
+Command-line interface for the TaskWarrior MCP server.
+"""
+
+from .cli import run_server
+
+if __name__ == "__main__":
+ run_server()
diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/cli.py b/servers/taskwarrior/src/mcp_server_taskwarrior/cli.py
new file mode 100644
index 0000000..1c556ff
--- /dev/null
+++ b/servers/taskwarrior/src/mcp_server_taskwarrior/cli.py
@@ -0,0 +1,107 @@
+#!/usr/bin/env python3
+"""
+Common CLI functionality for the TaskWarrior MCP server.
+"""
+
+import argparse
+import asyncio
+import logging
+import sys
+
+from .server import main
+
+logger = logging.getLogger("mcp_taskwarrior_server")
+
+
+def parse_args() -> argparse.Namespace:
+ """Parse command-line arguments."""
+ parser = argparse.ArgumentParser(description="TaskWarrior MCP Server")
+ parser.add_argument(
+ "--transport",
+ choices=["stdio", "remote"],
+ default="stdio",
+ help="Transport type (stdio or remote)",
+ )
+ parser.add_argument(
+ "--host",
+ default="127.0.0.1",
+ help="Host to bind to for remote transport (default: 127.0.0.1)",
+ )
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=8080,
+ help="Port to bind to for remote transport (default: 8080)",
+ )
+ parser.add_argument(
+ "--log-level",
+ choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+ default="INFO",
+ help="Set the logging level (default: INFO)",
+ )
+ return parser.parse_args()
+
+
+def validate_args(args: argparse.Namespace) -> argparse.Namespace:
+ """Validate command-line arguments."""
+ if (
+ args.transport == "remote"
+ and args.port < 1024
+ and not sys.platform.startswith("win")
+ ):
+ logger.warning(
+ "Using a port below 1024 may require root privileges on Unix-like systems."
+ )
+ return args
+
+
+def setup_logging(level: str, transport: str) -> None:
+ """Set up logging configuration.
+
+ Args:
+ level: The base logging level from command line args
+ transport: The transport type being used (stdio or remote)
+ """
+ # Create logs directory if it doesn't exist
+ import os
+
+ os.makedirs("logs", exist_ok=True)
+
+ # Configure file handler for all logs
+ file_handler = logging.FileHandler("logs/mcp_server.log")
+ file_handler.setLevel(getattr(logging, level))
+ file_handler.setFormatter(
+ logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+ )
+
+ # Configure console handler with appropriate level
+ console_handler = logging.StreamHandler()
+ # Use WARNING level for stdio transport to reduce interference
+ if transport == "stdio":
+ console_handler.setLevel(logging.WARNING)
+ else:
+ console_handler.setLevel(getattr(logging, level))
+ console_handler.setFormatter(
+ logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+ )
+
+ # Configure root logger
+ root_logger = logging.getLogger()
+ root_logger.setLevel(getattr(logging, level)) # Base level for all handlers
+ root_logger.handlers = [] # Remove any existing handlers
+ root_logger.addHandler(file_handler)
+ root_logger.addHandler(console_handler)
+
+
+def run_server() -> None:
+ """Run the server with CLI arguments."""
+ args = validate_args(parse_args())
+ setup_logging(args.log_level, args.transport)
+
+ try:
+ asyncio.run(main(args.transport, args.host, args.port))
+ except KeyboardInterrupt:
+ logger.info("Server stopped by user")
+ except Exception as e:
+ logger.error(f"Error running server: {e}")
+ sys.exit(1)
diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/server.py b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py
new file mode 100644
index 0000000..b73ec0a
--- /dev/null
+++ b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py
@@ -0,0 +1,435 @@
+"""TaskWarrior MCP Server implementation."""
+
+import asyncio
+import json
+import logging
+import os
+import sys
+from typing import Any
+
+from mcp.server.fastmcp import FastMCP
+
+# reconfigure UnicodeEncodeError prone default (i.e. windows-1252) to utf-8
+if sys.platform == "win32" and os.environ.get('PYTHONIOENCODING') is None:
+ sys.stdin.reconfigure(encoding="utf-8")
+ sys.stdout.reconfigure(encoding="utf-8")
+ sys.stderr.reconfigure(encoding="utf-8")
+
+logger = logging.getLogger('mcp_taskwarrior_server')
+
+
+class TaskWarriorError(Exception):
+ """Base exception for TaskWarrior operations."""
+
+ pass
+
+
+class TaskWarriorServer:
+ """Server class for TaskWarrior MCP operations."""
+
+ def __init__(self) -> None:
+ """Initialize the TaskWarrior server."""
+ self._check_taskwarrior_installed()
+
+ def _check_taskwarrior_installed(self) -> None:
+ """Check if TaskWarrior is installed and accessible."""
+ # This will be checked on first command execution
+ pass
+
+ async def _run_task_command(
+ self, *args: str, input_text: str | None = None
+ ) -> tuple[str, str, int]:
+ """Execute a TaskWarrior command and return stdout, stderr, return code.
+
+ Args:
+ *args: Command arguments to pass to task command
+ input_text: Optional input text to send to stdin
+
+ Returns:
+ Tuple of (stdout, stderr, return_code)
+
+ Raises:
+ TaskWarriorError: If taskwarrior is not installed or command fails
+ """
+ try:
+ # Use rc.confirmation:off to avoid interactive prompts
+ # Use rc.bulk:0 to avoid bulk operation confirmations
+ cmd = ["task", "rc.confirmation:off", "rc.bulk:0"] + list(args)
+
+ process = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ stdin=asyncio.subprocess.PIPE if input_text else None,
+ )
+
+ stdout_bytes, stderr_bytes = await process.communicate(
+ input_text.encode() if input_text else None
+ )
+
+ stdout = stdout_bytes.decode('utf-8', errors='replace')
+ stderr = stderr_bytes.decode('utf-8', errors='replace')
+ return_code = process.returncode
+
+ if return_code != 0:
+ error_msg = stderr.strip() or stdout.strip() or "Unknown error"
+ # Check if taskwarrior is not installed
+ error_lower = error_msg.lower()
+ if ("command not found" in error_lower or "not found" in error_lower):
+ raise TaskWarriorError(
+ "TaskWarrior is not installed or not in PATH. "
+ "Please install TaskWarrior to use this server."
+ )
+ raise TaskWarriorError(f"TaskWarrior command failed: {error_msg}")
+
+ return stdout, stderr, return_code
+
+ except FileNotFoundError:
+ raise TaskWarriorError(
+ "TaskWarrior is not installed or not in PATH. "
+ "Please install TaskWarrior to use this server."
+ ) from None
+ except Exception as e:
+ if isinstance(e, TaskWarriorError):
+ raise
+ raise TaskWarriorError(
+ f"Error executing TaskWarrior command: {str(e)}"
+ ) from e
+
+ async def list_tasks(self, filter_expr: str | None = None) -> list[dict[str, Any]]:
+ """List tasks matching the optional filter.
+
+ Args:
+ filter_expr: Optional filter expression (e.g., "project:Home", "+work")
+
+ Returns:
+ List of task dictionaries from JSON export
+ """
+ args = []
+ if filter_expr:
+ args.append(filter_expr)
+ args.append("export")
+
+ stdout, _, _ = await self._run_task_command(*args)
+
+ # Parse JSON output
+ try:
+ # TaskWarrior export outputs JSON array, but may have trailing newlines
+ stdout = stdout.strip()
+ if not stdout:
+ return []
+ tasks = json.loads(stdout)
+ if not isinstance(tasks, list):
+ return [tasks] if tasks else []
+ return tasks
+ except json.JSONDecodeError as e:
+ logger.error(f"Failed to parse TaskWarrior JSON output: {e}")
+ logger.debug(f"Output was: {stdout}")
+ raise TaskWarriorError(
+ f"Failed to parse TaskWarrior output: {str(e)}"
+ ) from e
+
+ async def add_task(
+ self,
+ description: str,
+ project: str | None = None,
+ priority: str | None = None,
+ due: str | None = None,
+ tags: list[str] | None = None,
+ ) -> dict[str, Any]:
+ """Add a new task to TaskWarrior.
+
+ Args:
+ description: Task description (required)
+ project: Optional project name
+ priority: Optional priority (H, M, or L)
+ due: Optional due date (ISO format or TaskWarrior date format)
+ tags: Optional list of tags
+
+ Returns:
+ Dictionary with task information including UUID
+ """
+ args = ["add", description]
+
+ if project:
+ args.append(f"project:{project}")
+
+ if priority:
+ if priority.upper() not in ["H", "M", "L"]:
+ raise ValueError(f"Priority must be H, M, or L, got: {priority}")
+ args.append(f"priority:{priority.upper()}")
+
+ if due:
+ args.append(f"due:{due}")
+
+ if tags:
+ for tag in tags:
+ args.append(f"+{tag}")
+
+ stdout, stderr, _ = await self._run_task_command(*args)
+
+ # Extract UUID from output
+ # TaskWarrior output format: "Created task 123.\n\nUUID: <uuid>"
+ uuid = None
+ for line in stdout.split('\n'):
+ if line.startswith('UUID:'):
+ uuid = line.split(':', 1)[1].strip()
+ break
+
+ # If UUID not found in stdout, try to get it by listing the newest task
+ if not uuid:
+ # Get the newest task to find the UUID
+ tasks = await self.list_tasks("status:pending")
+ if tasks:
+ # Sort by entry date (most recent first)
+ tasks.sort(key=lambda t: t.get('entry', ''), reverse=True)
+ uuid = tasks[0].get('uuid')
+
+ return {
+ "description": description,
+ "uuid": uuid,
+ "project": project,
+ "priority": priority.upper() if priority else None,
+ "due": due,
+ "tags": tags or [],
+ "output": stdout.strip(),
+ }
+
+ async def done_task(self, uuid: str) -> dict[str, Any]:
+ """Mark a task as completed.
+
+ Args:
+ uuid: Task UUID (stable identifier)
+
+ Returns:
+ Dictionary with completion information
+ """
+ stdout, stderr, _ = await self._run_task_command(uuid, "done")
+
+ return {
+ "uuid": uuid,
+ "status": "completed",
+ "output": stdout.strip(),
+ }
+
+ async def delete_task(self, uuid: str) -> dict[str, Any]:
+ """Delete a task.
+
+ Args:
+ uuid: Task UUID (stable identifier)
+
+ Returns:
+ Dictionary with deletion information
+ """
+ stdout, stderr, _ = await self._run_task_command(uuid, "delete")
+
+ return {
+ "uuid": uuid,
+ "status": "deleted",
+ "output": stdout.strip(),
+ }
+
+ async def manage_context(
+ self, action: str, name: str | None = None
+ ) -> dict[str, Any]:
+ """Manage TaskWarrior contexts.
+
+ Args:
+ action: One of "set", "list", "show", "none"
+ name: Context name (required for "set" action)
+
+ Returns:
+ Dictionary with context information
+ """
+ if action not in ["set", "list", "show", "none"]:
+ raise ValueError(
+ f"Action must be one of: set, list, show, none. Got: {action}"
+ )
+
+ if action == "set" and not name:
+ raise ValueError("Context name is required for 'set' action")
+
+ args = ["context"]
+
+ if action == "set":
+ args.append(name)
+ elif action == "none":
+ args.append("none")
+ else:
+ args.append(action)
+
+ stdout, stderr, _ = await self._run_task_command(*args)
+
+ result: dict[str, Any] = {
+ "action": action,
+ "output": stdout.strip(),
+ }
+
+ if action == "set" and name:
+ result["context"] = name
+ elif action == "show":
+ # Parse the context show output to extract context name
+ lines = stdout.strip().split('\n')
+ for line in lines:
+ if line.startswith("Context '") and "'" in line:
+ context_name = line.split("'")[1]
+ result["context"] = context_name
+ break
+ elif action == "list":
+ # Parse context list output
+ contexts = []
+ for line in stdout.strip().split('\n'):
+ if line.strip() and not line.startswith("Name"):
+ parts = line.split()
+ if len(parts) >= 2:
+ ctx_name = parts[0]
+ if ctx_name not in [c.get("name") for c in contexts]:
+ contexts.append({"name": ctx_name})
+ result["contexts"] = contexts
+
+ return result
+
+
+def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
+ """Create and configure the FastMCP server.
+
+ Args:
+ host: Host to bind to for remote transport
+ port: Port to bind to for remote transport
+
+ Returns:
+ Configured FastMCP server instance
+ """
+ mcp = FastMCP("TaskWarrior", host=host, port=port)
+
+ # Create a TaskWarriorServer instance
+ taskwarrior = TaskWarriorServer()
+
+ @mcp.tool()
+ async def list_tasks(filter_expr: str | None = None) -> str:
+ """List tasks with optional filter.
+
+ Args:
+ filter_expr: Optional filter expression
+ (e.g., "project:Home", "+work", "status:pending")
+
+ Returns:
+ JSON string of tasks matching the filter
+ """
+ try:
+ tasks = await taskwarrior.list_tasks(filter_expr)
+ return json.dumps(tasks, indent=2)
+ except Exception as e:
+ logger.error(f"Error listing tasks: {e}")
+ return json.dumps({"error": str(e)})
+
+ @mcp.tool()
+ async def add_task(
+ description: str,
+ project: str | None = None,
+ priority: str | None = None,
+ due: str | None = None,
+ tags: list[str] | None = None,
+ ) -> str:
+ """Add a new task to TaskWarrior.
+
+ Args:
+ description: Task description (required)
+ project: Optional project name
+ priority: Optional priority (H, M, or L)
+ due: Optional due date (ISO format or TaskWarrior date format
+ like "tomorrow", "2024-12-25")
+ tags: Optional list of tags
+
+ Returns:
+ JSON string with task information including UUID
+ """
+ try:
+ result = await taskwarrior.add_task(
+ description=description,
+ project=project,
+ priority=priority,
+ due=due,
+ tags=tags,
+ )
+ return json.dumps(result, indent=2)
+ except Exception as e:
+ logger.error(f"Error adding task: {e}")
+ return json.dumps({"error": str(e)})
+
+ @mcp.tool()
+ async def done_task(uuid: str) -> str:
+ """Mark a task as completed.
+
+ Args:
+ uuid: Task UUID (stable identifier, not the numeric ID)
+
+ Returns:
+ JSON string with completion information
+ """
+ try:
+ result = await taskwarrior.done_task(uuid)
+ return json.dumps(result, indent=2)
+ except Exception as e:
+ logger.error(f"Error marking task as done: {e}")
+ return json.dumps({"error": str(e)})
+
+ @mcp.tool()
+ async def delete_task(uuid: str) -> str:
+ """Delete a task.
+
+ Args:
+ uuid: Task UUID (stable identifier, not the numeric ID)
+
+ Returns:
+ JSON string with deletion information
+ """
+ try:
+ result = await taskwarrior.delete_task(uuid)
+ return json.dumps(result, indent=2)
+ except Exception as e:
+ logger.error(f"Error deleting task: {e}")
+ return json.dumps({"error": str(e)})
+
+ @mcp.tool()
+ async def context(action: str, name: str | None = None) -> str:
+ """Manage TaskWarrior contexts.
+
+ Args:
+ action: One of "set", "list", "show", "none"
+ name: Context name (required for "set" action, optional otherwise)
+
+ Returns:
+ JSON string with context information
+ """
+ try:
+ result = await taskwarrior.manage_context(action, name)
+ return json.dumps(result, indent=2)
+ except Exception as e:
+ logger.error(f"Error managing context: {e}")
+ return json.dumps({"error": str(e)})
+
+ return mcp
+
+
+async def main(transport_type: str, host: str, port: int) -> None:
+ """Start the server with the specified transport.
+
+ Args:
+ transport_type: Transport type ("stdio" or "remote")
+ host: Host to bind to for remote transport
+ port: Port to bind to for remote transport
+ """
+ logger.info("Starting MCP TaskWarrior Server")
+ logger.info(f"Starting TaskWarrior MCP Server with {transport_type} transport")
+
+ # Create the server with host and port
+ mcp = create_server(host=host, port=port)
+
+ # Run the server with the appropriate transport
+ if transport_type == "stdio":
+ logger.info("Server running with stdio transport")
+ await mcp.run_stdio_async()
+ else: # remote transport
+ logger.info(f"Server running with remote transport on {host}:{port}")
+ await mcp.run_sse_async()