From bedbd86e8c70d8d8cfa964842e1eab314384271d Mon Sep 17 00:00:00 2001 From: Dawid Rycerz Date: Fri, 19 Dec 2025 17:12:10 +0100 Subject: feat: enhance task completion functionality in TaskWarrior server - Updated `done_task` method to accept a list of UUIDs for marking multiple tasks as completed. - Adjusted related tests to validate single and multiple task completion, including error handling for empty UUID lists. - Improved docstrings for clarity and consistency. --- .../src/mcp_server_taskwarrior/server.py | 172 +++++++++++---------- 1 file changed, 89 insertions(+), 83 deletions(-) (limited to 'servers/taskwarrior/src') diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/server.py b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py index b73ec0a..2f66dac 100644 --- a/servers/taskwarrior/src/mcp_server_taskwarrior/server.py +++ b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py @@ -10,12 +10,12 @@ from typing import Any from mcp.server.fastmcp import FastMCP # reconfigure UnicodeEncodeError prone default (i.e. windows-1252) to utf-8 -if sys.platform == "win32" and os.environ.get('PYTHONIOENCODING') is None: +if sys.platform == "win32" and os.environ.get("PYTHONIOENCODING") is None: sys.stdin.reconfigure(encoding="utf-8") sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") -logger = logging.getLogger('mcp_taskwarrior_server') +logger = logging.getLogger("mcp_taskwarrior_server") class TaskWarriorError(Exception): @@ -40,14 +40,14 @@ class TaskWarriorServer: self, *args: str, input_text: str | None = None ) -> tuple[str, str, int]: """Execute a TaskWarrior command and return stdout, stderr, return code. - + Args: *args: Command arguments to pass to task command input_text: Optional input text to send to stdin - + Returns: Tuple of (stdout, stderr, return_code) - + Raises: TaskWarriorError: If taskwarrior is not installed or command fails """ @@ -55,35 +55,35 @@ class TaskWarriorServer: # Use rc.confirmation:off to avoid interactive prompts # Use rc.bulk:0 to avoid bulk operation confirmations cmd = ["task", "rc.confirmation:off", "rc.bulk:0"] + list(args) - + process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE if input_text else None, ) - + stdout_bytes, stderr_bytes = await process.communicate( input_text.encode() if input_text else None ) - - stdout = stdout_bytes.decode('utf-8', errors='replace') - stderr = stderr_bytes.decode('utf-8', errors='replace') + + stdout = stdout_bytes.decode("utf-8", errors="replace") + stderr = stderr_bytes.decode("utf-8", errors="replace") return_code = process.returncode - + if return_code != 0: error_msg = stderr.strip() or stdout.strip() or "Unknown error" # Check if taskwarrior is not installed error_lower = error_msg.lower() - if ("command not found" in error_lower or "not found" in error_lower): + if "command not found" in error_lower or "not found" in error_lower: raise TaskWarriorError( "TaskWarrior is not installed or not in PATH. " "Please install TaskWarrior to use this server." ) raise TaskWarriorError(f"TaskWarrior command failed: {error_msg}") - + return stdout, stderr, return_code - + except FileNotFoundError: raise TaskWarriorError( "TaskWarrior is not installed or not in PATH. " @@ -98,10 +98,10 @@ class TaskWarriorServer: async def list_tasks(self, filter_expr: str | None = None) -> list[dict[str, Any]]: """List tasks matching the optional filter. - + Args: filter_expr: Optional filter expression (e.g., "project:Home", "+work") - + Returns: List of task dictionaries from JSON export """ @@ -109,9 +109,9 @@ class TaskWarriorServer: if filter_expr: args.append(filter_expr) args.append("export") - + stdout, _, _ = await self._run_task_command(*args) - + # Parse JSON output try: # TaskWarrior export outputs JSON array, but may have trailing newlines @@ -138,53 +138,53 @@ class TaskWarriorServer: tags: list[str] | None = None, ) -> dict[str, Any]: """Add a new task to TaskWarrior. - + Args: description: Task description (required) project: Optional project name priority: Optional priority (H, M, or L) due: Optional due date (ISO format or TaskWarrior date format) tags: Optional list of tags - + Returns: Dictionary with task information including UUID """ args = ["add", description] - + if project: args.append(f"project:{project}") - + if priority: if priority.upper() not in ["H", "M", "L"]: raise ValueError(f"Priority must be H, M, or L, got: {priority}") args.append(f"priority:{priority.upper()}") - + if due: args.append(f"due:{due}") - + if tags: for tag in tags: args.append(f"+{tag}") - + stdout, stderr, _ = await self._run_task_command(*args) - + # Extract UUID from output # TaskWarrior output format: "Created task 123.\n\nUUID: " uuid = None - for line in stdout.split('\n'): - if line.startswith('UUID:'): - uuid = line.split(':', 1)[1].strip() + for line in stdout.split("\n"): + if line.startswith("UUID:"): + uuid = line.split(":", 1)[1].strip() break - + # If UUID not found in stdout, try to get it by listing the newest task if not uuid: # Get the newest task to find the UUID tasks = await self.list_tasks("status:pending") if tasks: # Sort by entry date (most recent first) - tasks.sort(key=lambda t: t.get('entry', ''), reverse=True) - uuid = tasks[0].get('uuid') - + tasks.sort(key=lambda t: t.get("entry", ""), reverse=True) + uuid = tasks[0].get("uuid") + return { "description": description, "uuid": uuid, @@ -195,34 +195,40 @@ class TaskWarriorServer: "output": stdout.strip(), } - async def done_task(self, uuid: str) -> dict[str, Any]: - """Mark a task as completed. - + async def done_task(self, uuids: list[str]) -> dict[str, Any]: + """Mark one or more tasks as completed. + Args: - uuid: Task UUID (stable identifier) - + uuids: List of task UUIDs (stable identifiers) to mark as done + Returns: - Dictionary with completion information + Dictionary with completion information including all UUIDs + + Raises: + ValueError: If the uuids list is empty """ - stdout, stderr, _ = await self._run_task_command(uuid, "done") - + if not uuids: + raise ValueError("At least one UUID is required") + + stdout, stderr, _ = await self._run_task_command(*uuids, "done") + return { - "uuid": uuid, + "uuids": uuids, "status": "completed", "output": stdout.strip(), } async def delete_task(self, uuid: str) -> dict[str, Any]: """Delete a task. - + Args: uuid: Task UUID (stable identifier) - + Returns: Dictionary with deletion information """ stdout, stderr, _ = await self._run_task_command(uuid, "delete") - + return { "uuid": uuid, "status": "deleted", @@ -233,11 +239,11 @@ class TaskWarriorServer: self, action: str, name: str | None = None ) -> dict[str, Any]: """Manage TaskWarrior contexts. - + Args: action: One of "set", "list", "show", "none" name: Context name (required for "set" action) - + Returns: Dictionary with context information """ @@ -245,31 +251,31 @@ class TaskWarriorServer: raise ValueError( f"Action must be one of: set, list, show, none. Got: {action}" ) - + if action == "set" and not name: raise ValueError("Context name is required for 'set' action") - + args = ["context"] - + if action == "set": args.append(name) elif action == "none": args.append("none") else: args.append(action) - + stdout, stderr, _ = await self._run_task_command(*args) - + result: dict[str, Any] = { "action": action, "output": stdout.strip(), } - + if action == "set" and name: result["context"] = name elif action == "show": # Parse the context show output to extract context name - lines = stdout.strip().split('\n') + lines = stdout.strip().split("\n") for line in lines: if line.startswith("Context '") and "'" in line: context_name = line.split("'")[1] @@ -278,7 +284,7 @@ class TaskWarriorServer: elif action == "list": # Parse context list output contexts = [] - for line in stdout.strip().split('\n'): + for line in stdout.strip().split("\n"): if line.strip() and not line.startswith("Name"): parts = line.split() if len(parts) >= 2: @@ -286,33 +292,33 @@ class TaskWarriorServer: if ctx_name not in [c.get("name") for c in contexts]: contexts.append({"name": ctx_name}) result["contexts"] = contexts - + return result def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: """Create and configure the FastMCP server. - + Args: host: Host to bind to for remote transport port: Port to bind to for remote transport - + Returns: Configured FastMCP server instance """ mcp = FastMCP("TaskWarrior", host=host, port=port) - + # Create a TaskWarriorServer instance taskwarrior = TaskWarriorServer() - + @mcp.tool() async def list_tasks(filter_expr: str | None = None) -> str: """List tasks with optional filter. - + Args: filter_expr: Optional filter expression (e.g., "project:Home", "+work", "status:pending") - + Returns: JSON string of tasks matching the filter """ @@ -322,7 +328,7 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: except Exception as e: logger.error(f"Error listing tasks: {e}") return json.dumps({"error": str(e)}) - + @mcp.tool() async def add_task( description: str, @@ -332,7 +338,7 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: tags: list[str] | None = None, ) -> str: """Add a new task to TaskWarrior. - + Args: description: Task description (required) project: Optional project name @@ -340,7 +346,7 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: due: Optional due date (ISO format or TaskWarrior date format like "tomorrow", "2024-12-25") tags: Optional list of tags - + Returns: JSON string with task information including UUID """ @@ -356,31 +362,31 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: except Exception as e: logger.error(f"Error adding task: {e}") return json.dumps({"error": str(e)}) - + @mcp.tool() - async def done_task(uuid: str) -> str: - """Mark a task as completed. - + async def done_task(uuids: list[str]) -> str: + """Mark one or more tasks as completed. + Args: - uuid: Task UUID (stable identifier, not the numeric ID) - + uuids: List of task UUIDs (stable identifiers, not numeric IDs) + Returns: - JSON string with completion information + JSON string with completion information for all tasks """ try: - result = await taskwarrior.done_task(uuid) + result = await taskwarrior.done_task(uuids) return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error marking task as done: {e}") return json.dumps({"error": str(e)}) - + @mcp.tool() async def delete_task(uuid: str) -> str: """Delete a task. - + Args: uuid: Task UUID (stable identifier, not the numeric ID) - + Returns: JSON string with deletion information """ @@ -390,15 +396,15 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: except Exception as e: logger.error(f"Error deleting task: {e}") return json.dumps({"error": str(e)}) - + @mcp.tool() async def context(action: str, name: str | None = None) -> str: """Manage TaskWarrior contexts. - + Args: action: One of "set", "list", "show", "none" name: Context name (required for "set" action, optional otherwise) - + Returns: JSON string with context information """ @@ -408,13 +414,13 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP: except Exception as e: logger.error(f"Error managing context: {e}") return json.dumps({"error": str(e)}) - + return mcp async def main(transport_type: str, host: str, port: int) -> None: """Start the server with the specified transport. - + Args: transport_type: Transport type ("stdio" or "remote") host: Host to bind to for remote transport @@ -422,10 +428,10 @@ async def main(transport_type: str, host: str, port: int) -> None: """ logger.info("Starting MCP TaskWarrior Server") logger.info(f"Starting TaskWarrior MCP Server with {transport_type} transport") - + # Create the server with host and port mcp = create_server(host=host, port=port) - + # Run the server with the appropriate transport if transport_type == "stdio": logger.info("Server running with stdio transport") -- cgit v1.2.3