diff options
| author | Dawid Rycerz <dawid@rycerz.xyz> | 2025-12-19 16:59:07 +0100 |
|---|---|---|
| committer | Dawid Rycerz <dawid@rycerz.xyz> | 2025-12-19 16:59:43 +0100 |
| commit | 195fb4507405648faf8a5729610457c24aa82430 (patch) | |
| tree | f75ad25897cd5cfcd783f8a6d8b3b99a717be22c /servers/taskwarrior/tests | |
| parent | f7077b5c2f64f4d4a5d870f70e2f63caec220958 (diff) | |
feat: create task warrior mcp server
Diffstat (limited to 'servers/taskwarrior/tests')
| -rw-r--r-- | servers/taskwarrior/tests/conftest.py | 40 | ||||
| -rw-r--r-- | servers/taskwarrior/tests/test_server.py | 191 |
2 files changed, 231 insertions, 0 deletions
diff --git a/servers/taskwarrior/tests/conftest.py b/servers/taskwarrior/tests/conftest.py new file mode 100644 index 0000000..5a36cc1 --- /dev/null +++ b/servers/taskwarrior/tests/conftest.py @@ -0,0 +1,40 @@ +import os +import sys +from typing import TYPE_CHECKING +from unittest.mock import AsyncMock, MagicMock + +import pytest + +# Add the src directory to the Python path for testing +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src'))) + +# Mock the mcp.server.remote module +mock_remote = MagicMock() +mock_remote.serve_remote = AsyncMock() +sys.modules['mcp.server.remote'] = mock_remote + +if TYPE_CHECKING: + from mcp_server_taskwarrior.server import TaskWarriorServer + +@pytest.fixture +def taskwarrior_server() -> "TaskWarriorServer": + """Create a TaskWarriorServer instance for testing.""" + from mcp_server_taskwarrior.server import TaskWarriorServer + return TaskWarriorServer() + +@pytest.fixture +def mcp_server_mock() -> MagicMock: + """Create a mock MCP Server instance with all handlers.""" + server = MagicMock() + server.list_resources = MagicMock(return_value=lambda: None) + server.read_resource = MagicMock(return_value=lambda: None) + server.list_prompts = MagicMock(return_value=lambda: None) + server.get_prompt = MagicMock(return_value=lambda: None) + server.list_tools = MagicMock(return_value=lambda: None) + server.call_tool = MagicMock(return_value=lambda: None) + server.run = AsyncMock() + server.get_capabilities = MagicMock(return_value={}) + server.request_context = MagicMock() + server.request_context.session = MagicMock() + server.request_context.session.send_resource_updated = AsyncMock() + return server diff --git a/servers/taskwarrior/tests/test_server.py b/servers/taskwarrior/tests/test_server.py new file mode 100644 index 0000000..f96f817 --- /dev/null +++ b/servers/taskwarrior/tests/test_server.py @@ -0,0 +1,191 @@ +"""Tests for TaskWarrior MCP server.""" + +import json +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from mcp_server_taskwarrior.server import TaskWarriorServer, TaskWarriorError + + +class TestTaskWarriorServer: + """Test the TaskWarriorServer class functionality.""" + + def test_init(self): + """Test that the server initializes correctly.""" + server = TaskWarriorServer() + assert server is not None + + @pytest.mark.asyncio + async def test_run_task_command_success(self, taskwarrior_server): + """Test successful task command execution.""" + with patch('asyncio.create_subprocess_exec') as mock_subprocess: + # Mock successful subprocess + mock_process = AsyncMock() + mock_process.communicate = AsyncMock(return_value=( + b'{"test": "output"}', + b'', + )) + mock_process.returncode = 0 + mock_subprocess.return_value = mock_process + + stdout, stderr, return_code = await taskwarrior_server._run_task_command("export") + + assert return_code == 0 + assert stdout == '{"test": "output"}' + assert stderr == '' + + @pytest.mark.asyncio + async def test_run_task_command_failure(self, taskwarrior_server): + """Test task command execution failure.""" + with patch('asyncio.create_subprocess_exec') as mock_subprocess: + # Mock failed subprocess with a generic error (not "not found") + mock_process = AsyncMock() + mock_process.communicate = AsyncMock(return_value=( + b'', + b'Error: invalid filter expression', + )) + mock_process.returncode = 1 + mock_subprocess.return_value = mock_process + + with pytest.raises(TaskWarriorError) as exc_info: + await taskwarrior_server._run_task_command("invalid") + + assert "TaskWarrior command failed" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_run_task_command_not_installed(self, taskwarrior_server): + """Test when TaskWarrior is not installed.""" + with patch('asyncio.create_subprocess_exec', side_effect=FileNotFoundError()): + with pytest.raises(TaskWarriorError) as exc_info: + await taskwarrior_server._run_task_command("export") + + assert "not installed" in str(exc_info.value).lower() + + @pytest.mark.asyncio + async def test_list_tasks(self, taskwarrior_server): + """Test listing tasks.""" + mock_tasks = [ + {"uuid": "123", "description": "Test task 1", "status": "pending"}, + {"uuid": "456", "description": "Test task 2", "status": "pending"}, + ] + + with patch.object( + taskwarrior_server, '_run_task_command', + new_callable=AsyncMock, + return_value=(json.dumps(mock_tasks), '', 0) + ): + tasks = await taskwarrior_server.list_tasks() + assert len(tasks) == 2 + assert tasks[0]["uuid"] == "123" + + @pytest.mark.asyncio + async def test_list_tasks_with_filter(self, taskwarrior_server): + """Test listing tasks with filter.""" + mock_tasks = [{"uuid": "123", "description": "Test", "status": "pending"}] + + with patch.object( + taskwarrior_server, '_run_task_command', + new_callable=AsyncMock, + return_value=(json.dumps(mock_tasks), '', 0) + ): + tasks = await taskwarrior_server.list_tasks("project:Home") + assert len(tasks) == 1 + + @pytest.mark.asyncio + async def test_add_task(self, taskwarrior_server): + """Test adding a task.""" + mock_output = "Created task 1.\n\nUUID: test-uuid-123" + mock_tasks = [{"uuid": "test-uuid-123", "description": "New task", "status": "pending", "entry": "2024-01-01T00:00:00Z"}] + + with patch.object( + taskwarrior_server, '_run_task_command', + new_callable=AsyncMock, + return_value=(mock_output, '', 0) + ), patch.object( + taskwarrior_server, 'list_tasks', + new_callable=AsyncMock, + return_value=mock_tasks + ): + result = await taskwarrior_server.add_task("New task", project="Test") + assert result["description"] == "New task" + assert result["uuid"] == "test-uuid-123" + assert result["project"] == "Test" + + @pytest.mark.asyncio + async def test_add_task_invalid_priority(self, taskwarrior_server): + """Test adding a task with invalid priority.""" + with pytest.raises(ValueError) as exc_info: + await taskwarrior_server.add_task("Test", priority="X") + + assert "Priority must be H, M, or L" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_done_task(self, taskwarrior_server): + """Test marking a task as done.""" + mock_output = "Completed task test-uuid-123." + + with patch.object( + taskwarrior_server, '_run_task_command', + new_callable=AsyncMock, + return_value=(mock_output, '', 0) + ): + result = await taskwarrior_server.done_task("test-uuid-123") + assert result["uuid"] == "test-uuid-123" + assert result["status"] == "completed" + + @pytest.mark.asyncio + async def test_delete_task(self, taskwarrior_server): + """Test deleting a task.""" + mock_output = "Deleted task test-uuid-123." + + with patch.object( + taskwarrior_server, '_run_task_command', + new_callable=AsyncMock, + return_value=(mock_output, '', 0) + ): + result = await taskwarrior_server.delete_task("test-uuid-123") + assert result["uuid"] == "test-uuid-123" + assert result["status"] == "deleted" + + @pytest.mark.asyncio + async def test_manage_context_set(self, taskwarrior_server): + """Test setting a context.""" + mock_output = "Context 'work' set." + + with patch.object( + taskwarrior_server, '_run_task_command', + new_callable=AsyncMock, + return_value=(mock_output, '', 0) + ): + result = await taskwarrior_server.manage_context("set", "work") + assert result["action"] == "set" + assert result["context"] == "work" + + @pytest.mark.asyncio + async def test_manage_context_list(self, taskwarrior_server): + """Test listing contexts.""" + mock_output = "Name Type Definition\nwork read project:Work" + + with patch.object( + taskwarrior_server, '_run_task_command', + new_callable=AsyncMock, + return_value=(mock_output, '', 0) + ): + result = await taskwarrior_server.manage_context("list") + assert result["action"] == "list" + assert "contexts" in result + + @pytest.mark.asyncio + async def test_manage_context_invalid_action(self, taskwarrior_server): + """Test invalid context action.""" + with pytest.raises(ValueError) as exc_info: + await taskwarrior_server.manage_context("invalid") + + assert "Action must be one of" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_manage_context_set_without_name(self, taskwarrior_server): + """Test setting context without name.""" + with pytest.raises(ValueError) as exc_info: + await taskwarrior_server.manage_context("set") + + assert "Context name is required" in str(exc_info.value) |
