summaryrefslogtreecommitdiff
path: root/servers/taskwarrior/tests
diff options
context:
space:
mode:
Diffstat (limited to 'servers/taskwarrior/tests')
-rw-r--r--servers/taskwarrior/tests/conftest.py40
-rw-r--r--servers/taskwarrior/tests/test_server.py191
2 files changed, 231 insertions, 0 deletions
diff --git a/servers/taskwarrior/tests/conftest.py b/servers/taskwarrior/tests/conftest.py
new file mode 100644
index 0000000..5a36cc1
--- /dev/null
+++ b/servers/taskwarrior/tests/conftest.py
@@ -0,0 +1,40 @@
+import os
+import sys
+from typing import TYPE_CHECKING
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+# Add the src directory to the Python path for testing
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src')))
+
+# Mock the mcp.server.remote module
+mock_remote = MagicMock()
+mock_remote.serve_remote = AsyncMock()
+sys.modules['mcp.server.remote'] = mock_remote
+
+if TYPE_CHECKING:
+ from mcp_server_taskwarrior.server import TaskWarriorServer
+
+@pytest.fixture
+def taskwarrior_server() -> "TaskWarriorServer":
+ """Create a TaskWarriorServer instance for testing."""
+ from mcp_server_taskwarrior.server import TaskWarriorServer
+ return TaskWarriorServer()
+
+@pytest.fixture
+def mcp_server_mock() -> MagicMock:
+ """Create a mock MCP Server instance with all handlers."""
+ server = MagicMock()
+ server.list_resources = MagicMock(return_value=lambda: None)
+ server.read_resource = MagicMock(return_value=lambda: None)
+ server.list_prompts = MagicMock(return_value=lambda: None)
+ server.get_prompt = MagicMock(return_value=lambda: None)
+ server.list_tools = MagicMock(return_value=lambda: None)
+ server.call_tool = MagicMock(return_value=lambda: None)
+ server.run = AsyncMock()
+ server.get_capabilities = MagicMock(return_value={})
+ server.request_context = MagicMock()
+ server.request_context.session = MagicMock()
+ server.request_context.session.send_resource_updated = AsyncMock()
+ return server
diff --git a/servers/taskwarrior/tests/test_server.py b/servers/taskwarrior/tests/test_server.py
new file mode 100644
index 0000000..f96f817
--- /dev/null
+++ b/servers/taskwarrior/tests/test_server.py
@@ -0,0 +1,191 @@
+"""Tests for TaskWarrior MCP server."""
+
+import json
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+from mcp_server_taskwarrior.server import TaskWarriorServer, TaskWarriorError
+
+
+class TestTaskWarriorServer:
+ """Test the TaskWarriorServer class functionality."""
+
+ def test_init(self):
+ """Test that the server initializes correctly."""
+ server = TaskWarriorServer()
+ assert server is not None
+
+ @pytest.mark.asyncio
+ async def test_run_task_command_success(self, taskwarrior_server):
+ """Test successful task command execution."""
+ with patch('asyncio.create_subprocess_exec') as mock_subprocess:
+ # Mock successful subprocess
+ mock_process = AsyncMock()
+ mock_process.communicate = AsyncMock(return_value=(
+ b'{"test": "output"}',
+ b'',
+ ))
+ mock_process.returncode = 0
+ mock_subprocess.return_value = mock_process
+
+ stdout, stderr, return_code = await taskwarrior_server._run_task_command("export")
+
+ assert return_code == 0
+ assert stdout == '{"test": "output"}'
+ assert stderr == ''
+
+ @pytest.mark.asyncio
+ async def test_run_task_command_failure(self, taskwarrior_server):
+ """Test task command execution failure."""
+ with patch('asyncio.create_subprocess_exec') as mock_subprocess:
+ # Mock failed subprocess with a generic error (not "not found")
+ mock_process = AsyncMock()
+ mock_process.communicate = AsyncMock(return_value=(
+ b'',
+ b'Error: invalid filter expression',
+ ))
+ mock_process.returncode = 1
+ mock_subprocess.return_value = mock_process
+
+ with pytest.raises(TaskWarriorError) as exc_info:
+ await taskwarrior_server._run_task_command("invalid")
+
+ assert "TaskWarrior command failed" in str(exc_info.value)
+
+ @pytest.mark.asyncio
+ async def test_run_task_command_not_installed(self, taskwarrior_server):
+ """Test when TaskWarrior is not installed."""
+ with patch('asyncio.create_subprocess_exec', side_effect=FileNotFoundError()):
+ with pytest.raises(TaskWarriorError) as exc_info:
+ await taskwarrior_server._run_task_command("export")
+
+ assert "not installed" in str(exc_info.value).lower()
+
+ @pytest.mark.asyncio
+ async def test_list_tasks(self, taskwarrior_server):
+ """Test listing tasks."""
+ mock_tasks = [
+ {"uuid": "123", "description": "Test task 1", "status": "pending"},
+ {"uuid": "456", "description": "Test task 2", "status": "pending"},
+ ]
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(json.dumps(mock_tasks), '', 0)
+ ):
+ tasks = await taskwarrior_server.list_tasks()
+ assert len(tasks) == 2
+ assert tasks[0]["uuid"] == "123"
+
+ @pytest.mark.asyncio
+ async def test_list_tasks_with_filter(self, taskwarrior_server):
+ """Test listing tasks with filter."""
+ mock_tasks = [{"uuid": "123", "description": "Test", "status": "pending"}]
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(json.dumps(mock_tasks), '', 0)
+ ):
+ tasks = await taskwarrior_server.list_tasks("project:Home")
+ assert len(tasks) == 1
+
+ @pytest.mark.asyncio
+ async def test_add_task(self, taskwarrior_server):
+ """Test adding a task."""
+ mock_output = "Created task 1.\n\nUUID: test-uuid-123"
+ mock_tasks = [{"uuid": "test-uuid-123", "description": "New task", "status": "pending", "entry": "2024-01-01T00:00:00Z"}]
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(mock_output, '', 0)
+ ), patch.object(
+ taskwarrior_server, 'list_tasks',
+ new_callable=AsyncMock,
+ return_value=mock_tasks
+ ):
+ result = await taskwarrior_server.add_task("New task", project="Test")
+ assert result["description"] == "New task"
+ assert result["uuid"] == "test-uuid-123"
+ assert result["project"] == "Test"
+
+ @pytest.mark.asyncio
+ async def test_add_task_invalid_priority(self, taskwarrior_server):
+ """Test adding a task with invalid priority."""
+ with pytest.raises(ValueError) as exc_info:
+ await taskwarrior_server.add_task("Test", priority="X")
+
+ assert "Priority must be H, M, or L" in str(exc_info.value)
+
+ @pytest.mark.asyncio
+ async def test_done_task(self, taskwarrior_server):
+ """Test marking a task as done."""
+ mock_output = "Completed task test-uuid-123."
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(mock_output, '', 0)
+ ):
+ result = await taskwarrior_server.done_task("test-uuid-123")
+ assert result["uuid"] == "test-uuid-123"
+ assert result["status"] == "completed"
+
+ @pytest.mark.asyncio
+ async def test_delete_task(self, taskwarrior_server):
+ """Test deleting a task."""
+ mock_output = "Deleted task test-uuid-123."
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(mock_output, '', 0)
+ ):
+ result = await taskwarrior_server.delete_task("test-uuid-123")
+ assert result["uuid"] == "test-uuid-123"
+ assert result["status"] == "deleted"
+
+ @pytest.mark.asyncio
+ async def test_manage_context_set(self, taskwarrior_server):
+ """Test setting a context."""
+ mock_output = "Context 'work' set."
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(mock_output, '', 0)
+ ):
+ result = await taskwarrior_server.manage_context("set", "work")
+ assert result["action"] == "set"
+ assert result["context"] == "work"
+
+ @pytest.mark.asyncio
+ async def test_manage_context_list(self, taskwarrior_server):
+ """Test listing contexts."""
+ mock_output = "Name Type Definition\nwork read project:Work"
+
+ with patch.object(
+ taskwarrior_server, '_run_task_command',
+ new_callable=AsyncMock,
+ return_value=(mock_output, '', 0)
+ ):
+ result = await taskwarrior_server.manage_context("list")
+ assert result["action"] == "list"
+ assert "contexts" in result
+
+ @pytest.mark.asyncio
+ async def test_manage_context_invalid_action(self, taskwarrior_server):
+ """Test invalid context action."""
+ with pytest.raises(ValueError) as exc_info:
+ await taskwarrior_server.manage_context("invalid")
+
+ assert "Action must be one of" in str(exc_info.value)
+
+ @pytest.mark.asyncio
+ async def test_manage_context_set_without_name(self, taskwarrior_server):
+ """Test setting context without name."""
+ with pytest.raises(ValueError) as exc_info:
+ await taskwarrior_server.manage_context("set")
+
+ assert "Context name is required" in str(exc_info.value)