summaryrefslogtreecommitdiff
path: root/servers/taskwarrior/src
diff options
context:
space:
mode:
authorDawid Rycerz <dawid@rycerz.xyz>2025-12-19 17:12:10 +0100
committerDawid Rycerz <dawid@rycerz.xyz>2025-12-19 17:12:10 +0100
commitbedbd86e8c70d8d8cfa964842e1eab314384271d (patch)
treed7aec04e6c64012774477c39bd0d91c1bcc3d4ed /servers/taskwarrior/src
parent195fb4507405648faf8a5729610457c24aa82430 (diff)
feat: enhance task completion functionality in TaskWarrior server
- Updated `done_task` method to accept a list of UUIDs for marking multiple tasks as completed. - Adjusted related tests to validate single and multiple task completion, including error handling for empty UUID lists. - Improved docstrings for clarity and consistency.
Diffstat (limited to 'servers/taskwarrior/src')
-rw-r--r--servers/taskwarrior/src/mcp_server_taskwarrior/server.py172
1 files changed, 89 insertions, 83 deletions
diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/server.py b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py
index b73ec0a..2f66dac 100644
--- a/servers/taskwarrior/src/mcp_server_taskwarrior/server.py
+++ b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py
@@ -10,12 +10,12 @@ from typing import Any
from mcp.server.fastmcp import FastMCP
# reconfigure UnicodeEncodeError prone default (i.e. windows-1252) to utf-8
-if sys.platform == "win32" and os.environ.get('PYTHONIOENCODING') is None:
+if sys.platform == "win32" and os.environ.get("PYTHONIOENCODING") is None:
sys.stdin.reconfigure(encoding="utf-8")
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
-logger = logging.getLogger('mcp_taskwarrior_server')
+logger = logging.getLogger("mcp_taskwarrior_server")
class TaskWarriorError(Exception):
@@ -40,14 +40,14 @@ class TaskWarriorServer:
self, *args: str, input_text: str | None = None
) -> tuple[str, str, int]:
"""Execute a TaskWarrior command and return stdout, stderr, return code.
-
+
Args:
*args: Command arguments to pass to task command
input_text: Optional input text to send to stdin
-
+
Returns:
Tuple of (stdout, stderr, return_code)
-
+
Raises:
TaskWarriorError: If taskwarrior is not installed or command fails
"""
@@ -55,35 +55,35 @@ class TaskWarriorServer:
# Use rc.confirmation:off to avoid interactive prompts
# Use rc.bulk:0 to avoid bulk operation confirmations
cmd = ["task", "rc.confirmation:off", "rc.bulk:0"] + list(args)
-
+
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if input_text else None,
)
-
+
stdout_bytes, stderr_bytes = await process.communicate(
input_text.encode() if input_text else None
)
-
- stdout = stdout_bytes.decode('utf-8', errors='replace')
- stderr = stderr_bytes.decode('utf-8', errors='replace')
+
+ stdout = stdout_bytes.decode("utf-8", errors="replace")
+ stderr = stderr_bytes.decode("utf-8", errors="replace")
return_code = process.returncode
-
+
if return_code != 0:
error_msg = stderr.strip() or stdout.strip() or "Unknown error"
# Check if taskwarrior is not installed
error_lower = error_msg.lower()
- if ("command not found" in error_lower or "not found" in error_lower):
+ if "command not found" in error_lower or "not found" in error_lower:
raise TaskWarriorError(
"TaskWarrior is not installed or not in PATH. "
"Please install TaskWarrior to use this server."
)
raise TaskWarriorError(f"TaskWarrior command failed: {error_msg}")
-
+
return stdout, stderr, return_code
-
+
except FileNotFoundError:
raise TaskWarriorError(
"TaskWarrior is not installed or not in PATH. "
@@ -98,10 +98,10 @@ class TaskWarriorServer:
async def list_tasks(self, filter_expr: str | None = None) -> list[dict[str, Any]]:
"""List tasks matching the optional filter.
-
+
Args:
filter_expr: Optional filter expression (e.g., "project:Home", "+work")
-
+
Returns:
List of task dictionaries from JSON export
"""
@@ -109,9 +109,9 @@ class TaskWarriorServer:
if filter_expr:
args.append(filter_expr)
args.append("export")
-
+
stdout, _, _ = await self._run_task_command(*args)
-
+
# Parse JSON output
try:
# TaskWarrior export outputs JSON array, but may have trailing newlines
@@ -138,53 +138,53 @@ class TaskWarriorServer:
tags: list[str] | None = None,
) -> dict[str, Any]:
"""Add a new task to TaskWarrior.
-
+
Args:
description: Task description (required)
project: Optional project name
priority: Optional priority (H, M, or L)
due: Optional due date (ISO format or TaskWarrior date format)
tags: Optional list of tags
-
+
Returns:
Dictionary with task information including UUID
"""
args = ["add", description]
-
+
if project:
args.append(f"project:{project}")
-
+
if priority:
if priority.upper() not in ["H", "M", "L"]:
raise ValueError(f"Priority must be H, M, or L, got: {priority}")
args.append(f"priority:{priority.upper()}")
-
+
if due:
args.append(f"due:{due}")
-
+
if tags:
for tag in tags:
args.append(f"+{tag}")
-
+
stdout, stderr, _ = await self._run_task_command(*args)
-
+
# Extract UUID from output
# TaskWarrior output format: "Created task 123.\n\nUUID: <uuid>"
uuid = None
- for line in stdout.split('\n'):
- if line.startswith('UUID:'):
- uuid = line.split(':', 1)[1].strip()
+ for line in stdout.split("\n"):
+ if line.startswith("UUID:"):
+ uuid = line.split(":", 1)[1].strip()
break
-
+
# If UUID not found in stdout, try to get it by listing the newest task
if not uuid:
# Get the newest task to find the UUID
tasks = await self.list_tasks("status:pending")
if tasks:
# Sort by entry date (most recent first)
- tasks.sort(key=lambda t: t.get('entry', ''), reverse=True)
- uuid = tasks[0].get('uuid')
-
+ tasks.sort(key=lambda t: t.get("entry", ""), reverse=True)
+ uuid = tasks[0].get("uuid")
+
return {
"description": description,
"uuid": uuid,
@@ -195,34 +195,40 @@ class TaskWarriorServer:
"output": stdout.strip(),
}
- async def done_task(self, uuid: str) -> dict[str, Any]:
- """Mark a task as completed.
-
+ async def done_task(self, uuids: list[str]) -> dict[str, Any]:
+ """Mark one or more tasks as completed.
+
Args:
- uuid: Task UUID (stable identifier)
-
+ uuids: List of task UUIDs (stable identifiers) to mark as done
+
Returns:
- Dictionary with completion information
+ Dictionary with completion information including all UUIDs
+
+ Raises:
+ ValueError: If the uuids list is empty
"""
- stdout, stderr, _ = await self._run_task_command(uuid, "done")
-
+ if not uuids:
+ raise ValueError("At least one UUID is required")
+
+ stdout, stderr, _ = await self._run_task_command(*uuids, "done")
+
return {
- "uuid": uuid,
+ "uuids": uuids,
"status": "completed",
"output": stdout.strip(),
}
async def delete_task(self, uuid: str) -> dict[str, Any]:
"""Delete a task.
-
+
Args:
uuid: Task UUID (stable identifier)
-
+
Returns:
Dictionary with deletion information
"""
stdout, stderr, _ = await self._run_task_command(uuid, "delete")
-
+
return {
"uuid": uuid,
"status": "deleted",
@@ -233,11 +239,11 @@ class TaskWarriorServer:
self, action: str, name: str | None = None
) -> dict[str, Any]:
"""Manage TaskWarrior contexts.
-
+
Args:
action: One of "set", "list", "show", "none"
name: Context name (required for "set" action)
-
+
Returns:
Dictionary with context information
"""
@@ -245,31 +251,31 @@ class TaskWarriorServer:
raise ValueError(
f"Action must be one of: set, list, show, none. Got: {action}"
)
-
+
if action == "set" and not name:
raise ValueError("Context name is required for 'set' action")
-
+
args = ["context"]
-
+
if action == "set":
args.append(name)
elif action == "none":
args.append("none")
else:
args.append(action)
-
+
stdout, stderr, _ = await self._run_task_command(*args)
-
+
result: dict[str, Any] = {
"action": action,
"output": stdout.strip(),
}
-
+
if action == "set" and name:
result["context"] = name
elif action == "show":
# Parse the context show output to extract context name
- lines = stdout.strip().split('\n')
+ lines = stdout.strip().split("\n")
for line in lines:
if line.startswith("Context '") and "'" in line:
context_name = line.split("'")[1]
@@ -278,7 +284,7 @@ class TaskWarriorServer:
elif action == "list":
# Parse context list output
contexts = []
- for line in stdout.strip().split('\n'):
+ for line in stdout.strip().split("\n"):
if line.strip() and not line.startswith("Name"):
parts = line.split()
if len(parts) >= 2:
@@ -286,33 +292,33 @@ class TaskWarriorServer:
if ctx_name not in [c.get("name") for c in contexts]:
contexts.append({"name": ctx_name})
result["contexts"] = contexts
-
+
return result
def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
"""Create and configure the FastMCP server.
-
+
Args:
host: Host to bind to for remote transport
port: Port to bind to for remote transport
-
+
Returns:
Configured FastMCP server instance
"""
mcp = FastMCP("TaskWarrior", host=host, port=port)
-
+
# Create a TaskWarriorServer instance
taskwarrior = TaskWarriorServer()
-
+
@mcp.tool()
async def list_tasks(filter_expr: str | None = None) -> str:
"""List tasks with optional filter.
-
+
Args:
filter_expr: Optional filter expression
(e.g., "project:Home", "+work", "status:pending")
-
+
Returns:
JSON string of tasks matching the filter
"""
@@ -322,7 +328,7 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
except Exception as e:
logger.error(f"Error listing tasks: {e}")
return json.dumps({"error": str(e)})
-
+
@mcp.tool()
async def add_task(
description: str,
@@ -332,7 +338,7 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
tags: list[str] | None = None,
) -> str:
"""Add a new task to TaskWarrior.
-
+
Args:
description: Task description (required)
project: Optional project name
@@ -340,7 +346,7 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
due: Optional due date (ISO format or TaskWarrior date format
like "tomorrow", "2024-12-25")
tags: Optional list of tags
-
+
Returns:
JSON string with task information including UUID
"""
@@ -356,31 +362,31 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
except Exception as e:
logger.error(f"Error adding task: {e}")
return json.dumps({"error": str(e)})
-
+
@mcp.tool()
- async def done_task(uuid: str) -> str:
- """Mark a task as completed.
-
+ async def done_task(uuids: list[str]) -> str:
+ """Mark one or more tasks as completed.
+
Args:
- uuid: Task UUID (stable identifier, not the numeric ID)
-
+ uuids: List of task UUIDs (stable identifiers, not numeric IDs)
+
Returns:
- JSON string with completion information
+ JSON string with completion information for all tasks
"""
try:
- result = await taskwarrior.done_task(uuid)
+ result = await taskwarrior.done_task(uuids)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error marking task as done: {e}")
return json.dumps({"error": str(e)})
-
+
@mcp.tool()
async def delete_task(uuid: str) -> str:
"""Delete a task.
-
+
Args:
uuid: Task UUID (stable identifier, not the numeric ID)
-
+
Returns:
JSON string with deletion information
"""
@@ -390,15 +396,15 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
except Exception as e:
logger.error(f"Error deleting task: {e}")
return json.dumps({"error": str(e)})
-
+
@mcp.tool()
async def context(action: str, name: str | None = None) -> str:
"""Manage TaskWarrior contexts.
-
+
Args:
action: One of "set", "list", "show", "none"
name: Context name (required for "set" action, optional otherwise)
-
+
Returns:
JSON string with context information
"""
@@ -408,13 +414,13 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
except Exception as e:
logger.error(f"Error managing context: {e}")
return json.dumps({"error": str(e)})
-
+
return mcp
async def main(transport_type: str, host: str, port: int) -> None:
"""Start the server with the specified transport.
-
+
Args:
transport_type: Transport type ("stdio" or "remote")
host: Host to bind to for remote transport
@@ -422,10 +428,10 @@ async def main(transport_type: str, host: str, port: int) -> None:
"""
logger.info("Starting MCP TaskWarrior Server")
logger.info(f"Starting TaskWarrior MCP Server with {transport_type} transport")
-
+
# Create the server with host and port
mcp = create_server(host=host, port=port)
-
+
# Run the server with the appropriate transport
if transport_type == "stdio":
logger.info("Server running with stdio transport")