summaryrefslogtreecommitdiff
path: root/servers/taskwarrior
diff options
context:
space:
mode:
Diffstat (limited to 'servers/taskwarrior')
-rw-r--r--servers/taskwarrior/src/mcp_server_taskwarrior/server.py81
-rw-r--r--servers/taskwarrior/tests/test_server.py94
2 files changed, 160 insertions, 15 deletions
diff --git a/servers/taskwarrior/src/mcp_server_taskwarrior/server.py b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py
index c561d2a..324311f 100644
--- a/servers/taskwarrior/src/mcp_server_taskwarrior/server.py
+++ b/servers/taskwarrior/src/mcp_server_taskwarrior/server.py
@@ -195,6 +195,35 @@ class TaskWarriorServer:
"output": stdout.strip(),
}
+ async def add_tasks(
+ self,
+ tasks: list[dict[str, Any]],
+ ) -> list[dict[str, Any]]:
+ """Add multiple tasks to TaskWarrior.
+
+ Args:
+ tasks: List of task definitions, each with:
+ - description (required): Task description
+ - project (optional): Project name
+ - priority (optional): H, M, or L
+ - due (optional): Due date
+ - tags (optional): List of tags
+
+ Returns:
+ List of dictionaries with task information including UUIDs
+ """
+ results = []
+ for task in tasks:
+ result = await self.add_task(
+ description=task["description"],
+ project=task.get("project"),
+ priority=task.get("priority"),
+ due=task.get("due"),
+ tags=task.get("tags"),
+ )
+ results.append(result)
+ return results
+
async def done_task(self, uuids: list[str]) -> dict[str, Any]:
"""Mark one or more tasks as completed.
@@ -218,19 +247,25 @@ class TaskWarriorServer:
"output": stdout.strip(),
}
- async def delete_task(self, uuid: str) -> dict[str, Any]:
- """Delete a task.
+ async def delete_task(self, uuids: list[str]) -> dict[str, Any]:
+ """Delete one or more tasks.
Args:
- uuid: Task UUID (stable identifier)
+ uuids: List of task UUIDs (stable identifiers) to delete
Returns:
- Dictionary with deletion information
+ Dictionary with deletion information including all UUIDs
+
+ Raises:
+ ValueError: If the uuids list is empty
"""
- stdout, stderr, _ = await self._run_task_command(uuid, "delete")
+ if not uuids:
+ raise ValueError("At least one UUID is required")
+
+ stdout, stderr, _ = await self._run_task_command(*uuids, "delete")
return {
- "uuid": uuid,
+ "uuids": uuids,
"status": "deleted",
"output": stdout.strip(),
}
@@ -460,6 +495,30 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
return json.dumps({"error": str(e)})
@mcp.tool()
+ async def add_tasks(
+ tasks: list[dict[str, Any]],
+ ) -> str:
+ """Add multiple tasks to TaskWarrior in one call.
+
+ Args:
+ tasks: List of task definitions. Each task should have:
+ - description (required): Task description
+ - project (optional): Project name
+ - priority (optional): Priority (H, M, or L)
+ - due (optional): Due date
+ - tags (optional): List of tags
+
+ Returns:
+ JSON string with list of created task information
+ """
+ try:
+ results = await taskwarrior.add_tasks(tasks)
+ return json.dumps(results, indent=2)
+ except Exception as e:
+ logger.error(f"Error adding tasks: {e}")
+ return json.dumps({"error": str(e)})
+
+ @mcp.tool()
async def done_task(uuids: list[str]) -> str:
"""Mark one or more tasks as completed.
@@ -477,17 +536,17 @@ def create_server(host: str = "127.0.0.1", port: int = 8080) -> FastMCP:
return json.dumps({"error": str(e)})
@mcp.tool()
- async def delete_task(uuid: str) -> str:
- """Delete a task.
+ async def delete_task(uuids: list[str]) -> str:
+ """Delete one or more tasks.
Args:
- uuid: Task UUID (stable identifier, not the numeric ID)
+ uuids: List of task UUIDs (stable identifiers, not numeric IDs)
Returns:
- JSON string with deletion information
+ JSON string with deletion information for all tasks
"""
try:
- result = await taskwarrior.delete_task(uuid)
+ result = await taskwarrior.delete_task(uuids)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error deleting task: {e}")
diff --git a/servers/taskwarrior/tests/test_server.py b/servers/taskwarrior/tests/test_server.py
index 601a535..5781c23 100644
--- a/servers/taskwarrior/tests/test_server.py
+++ b/servers/taskwarrior/tests/test_server.py
@@ -119,6 +119,69 @@ class TestTaskWarriorServer:
assert "Priority must be H, M, or L" in str(exc_info.value)
@pytest.mark.asyncio
+ async def test_add_tasks(self, taskwarrior_server):
+ """Test adding multiple tasks."""
+ mock_output1 = "Created task 1.\n\nUUID: test-uuid-123"
+ mock_output2 = "Created task 2.\n\nUUID: test-uuid-456"
+ mock_tasks = [
+ {"uuid": "test-uuid-123", "description": "Task 1", "status": "pending", "entry": "2024-01-01T00:00:00Z"},
+ {"uuid": "test-uuid-456", "description": "Task 2", "status": "pending", "entry": "2024-01-01T00:00:01Z"},
+ ]
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ side_effect=[
+ (mock_output1, '', 0),
+ (mock_output2, '', 0),
+ ]
+ ), patch.object(
+ taskwarrior_server, 'list_tasks',
+ new_callable=AsyncMock,
+ side_effect=[
+ [mock_tasks[0]],
+ [mock_tasks[1]],
+ ]
+ ):
+ tasks_to_add = [
+ {"description": "Task 1", "project": "Test"},
+ {"description": "Task 2", "priority": "H"},
+ ]
+ results = await taskwarrior_server.add_tasks(tasks_to_add)
+ assert len(results) == 2
+ assert results[0]["description"] == "Task 1"
+ assert results[0]["uuid"] == "test-uuid-123"
+ assert results[1]["description"] == "Task 2"
+ assert results[1]["uuid"] == "test-uuid-456"
+
+ @pytest.mark.asyncio
+ async def test_add_tasks_empty_list(self, taskwarrior_server):
+ """Test adding tasks with empty list returns empty results."""
+ results = await taskwarrior_server.add_tasks([])
+ assert results == []
+
+ @pytest.mark.asyncio
+ async def test_add_tasks_with_one_task(self, taskwarrior_server):
+ """Test adding a single task in list works."""
+ mock_output = "Created task 1.\n\nUUID: test-uuid-123"
+ mock_tasks = [{"uuid": "test-uuid-123", "description": "Single task", "status": "pending", "entry": "2024-01-01T00:00:00Z"}]
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(mock_output, '', 0)
+ ), patch.object(
+ taskwarrior_server, 'list_tasks',
+ new_callable=AsyncMock,
+ return_value=mock_tasks
+ ):
+ tasks_to_add = [{"description": "Single task"}]
+ results = await taskwarrior_server.add_tasks(tasks_to_add)
+ assert len(results) == 1
+ assert results[0]["description"] == "Single task"
+ assert results[0]["uuid"] == "test-uuid-123"
+
+ @pytest.mark.asyncio
async def test_done_task(self, taskwarrior_server):
"""Test marking a single task as done."""
mock_output = "Completed task test-uuid-123."
@@ -156,8 +219,8 @@ class TestTaskWarriorServer:
assert "At least one UUID is required" in str(exc_info.value)
@pytest.mark.asyncio
- async def test_delete_task(self, taskwarrior_server):
- """Test deleting a task."""
+ async def test_delete_task_single(self, taskwarrior_server):
+ """Test deleting a single task."""
mock_output = "Deleted task test-uuid-123."
with patch.object(
@@ -165,9 +228,32 @@ class TestTaskWarriorServer:
new_callable=AsyncMock,
return_value=(mock_output, '', 0)
):
- result = await taskwarrior_server.delete_task("test-uuid-123")
- assert result["uuid"] == "test-uuid-123"
+ result = await taskwarrior_server.delete_task(["test-uuid-123"])
+ assert result["uuids"] == ["test-uuid-123"]
+ assert result["status"] == "deleted"
+
+ @pytest.mark.asyncio
+ async def test_delete_task_multiple(self, taskwarrior_server):
+ """Test deleting multiple tasks."""
+ mock_output = "Deleted task test-uuid-123.\nDeleted task test-uuid-456."
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(mock_output, '', 0)
+ ):
+ result = await taskwarrior_server.delete_task(["test-uuid-123", "test-uuid-456"])
+ assert result["uuids"] == ["test-uuid-123", "test-uuid-456"]
assert result["status"] == "deleted"
+ assert len(result["uuids"]) == 2
+
+ @pytest.mark.asyncio
+ async def test_delete_task_empty_list(self, taskwarrior_server):
+ """Test that deleting tasks with empty list raises ValueError."""
+ with pytest.raises(ValueError) as exc_info:
+ await taskwarrior_server.delete_task([])
+
+ assert "At least one UUID is required" in str(exc_info.value)
@pytest.mark.asyncio
async def test_manage_context_set(self, taskwarrior_server):