vikunjamcp/main.py

718 lines
28 KiB
Python

import os
import requests
import logging
from fastmcp import FastMCP
# --- Configuration ---
VIKUNJA_URL = os.getenv("VIKUNJA_URL")
VIKUNJA_USERNAME = os.getenv("VIKUNJA_USERNAME")
VIKUNJA_PASSWORD = os.getenv("VIKUNJA_PASSWORD")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
DEBUG_TASK_MATCHES = os.getenv("DEBUG_TASK_MATCHES", "false").lower() in ("1", "true", "yes")
# --- MCP Application Setup ---
mcp = FastMCP()
session = requests.Session()
# Configure basic logging. Level can be controlled with LOG_LEVEL env var.
level = getattr(logging, LOG_LEVEL.upper(), logging.INFO)
logging.basicConfig(level=level, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
if DEBUG_TASK_MATCHES:
logger.info("DEBUG_TASK_MATCHES is enabled: per-task match attempts will be logged at INFO level")
# --- Input Validation ---
if not all([VIKUNJA_URL, VIKUNJA_USERNAME, VIKUNJA_PASSWORD]):
print("Error: Please set the VIKUNJA_URL, VIKUNJA_USERNAME, and VIKUNJA_PASSWORD environment variables.")
exit(1)
@mcp.tool()
def login():
"""
Authenticates with the Vikunja API to get a session token.
"""
global session
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/login",
json={"username": VIKUNJA_USERNAME, "password": VIKUNJA_PASSWORD}
)
response.raise_for_status()
token = response.json().get("token")
if not token:
return "Login failed: Token not found in response."
session.headers.update({"Authorization": f"Bearer {token}"})
return "Login successful. Token stored for session."
except requests.exceptions.RequestException as e:
return f"Login failed: {e}"
@mcp.tool()
def search_tasks(query: str):
"""
Searches for tasks in Vikunja.
:param query: The search string to use for finding tasks.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
# Vikunja does not expose a /tasks/search endpoint in the public API.
# Fetch all tasks and filter client-side by title/description.
logger.info("search_tasks: fetching all tasks from %s", f"{VIKUNJA_URL}/api/v1/tasks/all")
all_tasks = []
page = 1
per_page = 50
while True:
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/all?page={page}&limit={per_page}")
response.raise_for_status()
data = response.json()
tasks = data if isinstance(data, list) else data.get("tasks", [])
if not tasks:
break
all_tasks.extend(tasks)
page += 1
tasks = all_tasks
q = (query or "").strip()
logger.info("search_tasks: raw query='%s'", q)
if not q:
logger.info("search_tasks: empty query, returning %d tasks", len(tasks))
return tasks
terms = [t.lower() for t in q.split() if t]
logger.info("search_tasks: parsed terms=%s", terms)
def matches(task: dict) -> bool:
title = (task.get("title") or "")
description = (task.get("description") or "")
title_l = title.lower()
desc_l = description.lower()
for term in terms:
if term in title_l or term in desc_l:
logger.debug("search_tasks: task id=%s matched term='%s' (title=%r, description=%r)", task.get("id"), term, title, description)
return True
else:
logger.debug("search_tasks: task id=%s did not match term='%s' (title=%r)", task.get("id"), term, title)
return False
filtered = [t for t in tasks if matches(t)]
logger.info("search_tasks: fetched=%d filtered=%d", len(tasks), len(filtered))
return filtered
except requests.exceptions.RequestException as e:
logger.exception("search_tasks: request failed")
return f"Error searching tasks: {e}"
@mcp.tool()
def active_tasks(project_id: int = None, is_favorite: bool = None):
"""
Returns a list of active tasks (where 'done' is False) from Vikunja.
:param project_id: Optional project ID to filter tasks by project.
:param is_favorite: Optional filter to return only tasks matching favorite status.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
logger.info("active_tasks: fetching all tasks from %s", f"{VIKUNJA_URL}/api/v1/tasks/all")
all_tasks = []
page = 1
per_page = 50
while True:
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/all?page={page}&limit={per_page}")
response.raise_for_status()
data = response.json()
tasks = data if isinstance(data, list) else data.get("tasks", [])
if not tasks:
break
all_tasks.extend(tasks)
page += 1
# Filter active tasks (not done)
active = [t for t in all_tasks if not t.get("done", False)]
# If project_id provided, further filter by task's project_id
if project_id is not None:
before = len(active)
active = [t for t in active if t.get("project_id") == project_id or t.get("projectID") == project_id]
logger.info("active_tasks: filtered by project_id=%s before=%d after=%d", project_id, before, len(active))
# If is_favorite provided, filter by is_favorite / is_favourite field
if is_favorite is not None:
before = len(active)
def fav_val(task):
return bool(task.get("is_favorite", task.get("is_favourite", False)))
active = [t for t in active if fav_val(t) == bool(is_favorite)]
logger.info("active_tasks: filtered by is_favorite=%s before=%d after=%d", is_favorite, before, len(active))
logger.info("active_tasks: fetched=%d active=%d", len(all_tasks), len(active))
return active
except requests.exceptions.RequestException as e:
logger.exception("active_tasks: request failed")
return f"Error retrieving active tasks: {e}"
@mcp.tool()
def get_task_details(task_id: int):
"""
Gets task details in Vikunja.
:param task_id: The ID of the task of which to fetch comments.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
response.raise_for_status()
details = response.json()
if not details:
return "No details found for this task."
# Helpers to produce compact readable strings for nested structures
def join_names(items):
if not items:
return "[]"
out = []
for it in items:
if not isinstance(it, dict):
out.append(str(it))
continue
out.append(str(it.get("name") or it.get("username") or it.get("id") or it))
return ", ".join(out)
def labels_list(labels):
if not labels:
return "[]"
out = []
for l in labels:
if isinstance(l, dict):
out.append(str(l.get("title") or l.get("name") or l.get("id") or l))
else:
out.append(str(l))
return ", ".join(out)
fetched_id = details.get("id", "N/A")
title = details.get("title", "N/A")
identifier = details.get("identifier", "N/A")
created = details.get("created", "N/A")
updated = details.get("updated", "N/A")
created_by = details.get("created_by") or {}
created_by_str = f"{created_by.get('name','N/A')} (id={created_by.get('id','N/A')}, email={created_by.get('email','N/A')}, username={created_by.get('username','N/A')})"
description = details.get("description", "N/A")
done = details.get("done", "N/A")
done_at = details.get("done_at", "N/A")
percent_done = details.get("percent_done", "N/A")
due_date = details.get("due_date", "N/A")
start_date = details.get("start_date", "N/A")
end_date = details.get("end_date", "N/A")
repeat_mode = details.get("repeat_mode", "N/A")
repeat_after = details.get("repeat_after", "N/A")
project_id = details.get("project_id", "N/A")
bucket_id = details.get("bucket_id", "N/A")
# Coerce potentially-None fields to safe defaults so len() and iteration are safe.
# Vikunja may return null for some nested fields; normalize to empty lists/dicts
# to avoid TypeError when calling len() or iterating.
buckets = details.get("buckets") or []
index = details.get("index", "N/A")
position = details.get("position", "N/A")
priority = details.get("priority", "N/A")
is_favorite = details.get("is_favorite", details.get("is_favourite", "N/A"))
hex_color = details.get("hex_color", "N/A")
assignees = details.get("assignees") or []
attachments = details.get("attachments") or []
cover_image_attachment_id = details.get("cover_image_attachment_id", "N/A")
comments = details.get("comments") or []
labels = details.get("labels") or []
reminders = details.get("reminders") or []
reactions = details.get("reactions") or {}
related_tasks = details.get("related_tasks") or {}
subscription = details.get("subscription") or {}
result = []
result.append(f"ID: {fetched_id}, Identifier: {identifier}, Title: {title}")
result.append(f"Created: {created} by {created_by_str}, Updated: {updated}")
result.append(f"Done: {done} (done_at={done_at}), Percent Done: {percent_done}, Priority: {priority}, Favorite: {is_favorite}")
result.append(f"Due Date: {due_date}, Start Date: {start_date}, End Date: {end_date}, Repeat Mode: {repeat_mode}, Repeat After: {repeat_after}")
result.append(f"Project ID: {project_id}, Bucket ID: {bucket_id}, Buckets: {len(buckets)} items, Index: {index}, Position: {position}")
result.append(f"Labels: {labels_list(labels)}, Hex Color: {hex_color}")
result.append(f"Assignees: {join_names(assignees)}, Attachments Count: {len(attachments)}, Cover Image Attachment ID: {cover_image_attachment_id}")
result.append(f"Comments Count: {len(comments)}, Reminders Count: {len(reminders)}")
result.append(f"Reactions: {reactions}, Related Tasks: {related_tasks}, Subscription: {subscription}")
result.append(f"Description: {description}")
return "\n".join(result)
except requests.exceptions.RequestException as e:
logger.exception("get_task_details: request failed for task_id=%s", task_id)
return f"Error looking up details for task: {e}"
@mcp.tool()
def add_task(project_id: int, title: str, description: str = ""):
"""
Adds a new task to a Vikunja project.
:param project_id: The ID of the project to add the task to.
:param title: The title of the new task.
:param description: An optional description for the task.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
task_payload = {
"project_id": project_id,
"title": title,
"description": description
}
try:
response = session.put(
f"{VIKUNJA_URL}/api/v1/projects/{project_id}/tasks",
json=task_payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return f"Error adding task: {e}"
@mcp.tool()
def update_task_title(task_id: int, title: str):
"""
Updates an existing title on a task.
:param task_id: The ID of the task.
:param title: The updated title text.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (title or "").strip():
return "Title cannot be empty."
task_payload = {
"title": title
}
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
json=task_payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("update_task_title: request failed for task_id=%s", task_id)
return f"Error updating task title: {e}"
@mcp.tool()
def update_task_description(task_id: int, description: str):
"""
Updates an existing description on a task.
:param task_id: The ID of the task.
:param description: The updated description text.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (description or "").strip():
return "Description cannot be empty."
task_payload = {
"description": description
}
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
json=task_payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("update_task_description: request failed for task_id=%s", task_id)
return f"Error updating task description: {e}"
@mcp.tool()
def delete_task(task_id: int):
"""
Deletes an existing task.
:param task_id: The ID of the task.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
response = session.delete(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("delete_task: request failed for task_id=%s", task_id)
return f"Error deleting task: {e}"
@mcp.tool()
def close_task(task_id: int):
"""
Closes (marks as done) a task in Vikunja.
:param task_id: The ID of the task to close.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
task_payload = {
"done": True
}
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
json=task_payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return f"Error closing task: {e}"
@mcp.tool()
def comment_task(task_id: int, description: str):
"""
Adds a comment to a task in Vikunja.
:param task_id: The ID of the task to comment on.
:param description: The text of the comment to add.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (description or "").strip():
return "Comment description cannot be empty."
payload = {"comment": description}
try:
response = session.put(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments",
json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("comment_task: request failed for task_id=%s", task_id)
return f"Error adding comment to task: {e}"
@mcp.tool()
def lookup_comment_task(task_id: int):
"""
Lists comments in a task in Vikunja.
:param task_id: The ID of the task of which to fetch comments.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments")
response.raise_for_status()
comments = response.json()
if not comments:
return "No comments found for this task."
result = []
for comment in comments:
comment_id = comment.get("id", "N/A")
comment_text = comment.get("comment", "N/A")
comment_created = comment.get("created", "N/A")
comment_author = comment.get("author", "N/A")
result.append(f"ID: {comment_id}, Created: {comment_created}, Author: {comment_author}, Comment: {comment_text}")
return "\n".join(result)
except requests.exceptions.RequestException as e:
logger.exception("lookup_comment_task: request failed for task_id=%s", task_id)
return f"Error looking up comments to task: {e}"
@mcp.tool()
def update_comment(task_id: int, comment_id: int, comment: str):
"""
Updates an existing comment on a task.
:param task_id: The ID of the task.
:param comment_id: The ID of the comment to update.
:param comment: The updated comment text.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (comment or "").strip():
return "Comment cannot be empty."
payload = {"comment": comment}
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments/{comment_id}",
json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("update_comment: request failed for task_id=%s, comment_id=%s", task_id, comment_id)
return f"Error updating comment for task: {e}"
@mcp.tool()
def lookup_project():
"""
Retrieves all projects the user has access to and lists them by name and ID.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
response = session.get(f"{VIKUNJA_URL}/api/v1/projects")
response.raise_for_status()
projects = response.json()
if not projects:
return "No projects found."
result = []
for project in projects:
project_id = project.get("id", "N/A")
project_title = project.get("title", "Untitled")
result.append(f"ID: {project_id}, Name: {project_title}")
return "\n".join(result)
except requests.exceptions.RequestException as e:
return f"Error retrieving projects: {e}"
@mcp.tool()
def create_project(title: str, description: str = "", is_favorite: bool = False):
"""
Creates a new project in Vikunja.
:param title: The title of the project.
:param description: Optional description for the project.
:param is_favorite: Whether the project should be marked as favorite.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (title or "").strip():
return "Project title cannot be empty."
payload = {
"title": title,
"description": description,
"is_favourite": bool(is_favorite)
}
try:
response = session.put(
f"{VIKUNJA_URL}/api/v1/projects",
json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("create_project: request failed for title=%s", title)
return f"Error creating project: {e}"
@mcp.tool()
def update_task_details(
task_id: int,
title: str = None,
description: str = None,
done: bool = None,
percent_done: int = None,
due_date: str = None,
start_date: str = None,
end_date: str = None,
repeat_mode: int = None,
repeat_after: int = None,
project_id: int = None,
bucket_id: int = None,
priority: int = None,
position: int = None,
is_favorite: bool = None,
hex_color: str = None,
labels: str = None,
):
"""
Updates any primary fields on a task. Only fields provided (not None) will be sent.
If `labels` is provided as a comma-separated string, it will replace existing labels.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
# First, fetch the existing task to get all its properties
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
response.raise_for_status()
payload = response.json()
except requests.exceptions.RequestException as e:
logger.exception("update_task_details: failed to fetch existing task id=%s", task_id)
return f"Error fetching task details before update: {e}"
# Simple helper to set only when value explicitly passed (not None)
def set_if(provided, key, transform=lambda x: x):
if provided is not None:
payload[key] = transform(provided)
set_if(title, "title")
set_if(description, "description")
set_if(done, "done")
set_if(percent_done, "percent_done")
set_if(due_date, "due_date")
set_if(start_date, "start_date")
set_if(end_date, "end_date")
set_if(repeat_mode, "repeat_mode")
set_if(repeat_after, "repeat_after")
set_if(project_id, "project_id")
set_if(bucket_id, "bucket_id")
set_if(priority, "priority")
set_if(position, "position")
set_if(is_favorite, "is_favorite")
set_if(hex_color, "hex_color")
# Prepare labels payload separately because Vikunja's task update endpoint
# does not accept label updates; labels are managed via the labels bulk API.
labels_payload = None
if labels is not None:
# Vikunja expects an array of Label objects, not raw strings.
# Normalize various inputs into a list of label objects like {"title": "..."} or {"id": 123}.
def make_label_obj(item):
# If it's already a dict, try to normalize known keys.
if isinstance(item, dict):
# If key 'name' used, convert to 'title' which Vikunja also accepts.
if 'title' in item or 'id' in item:
return item
if 'name' in item:
new = dict(item)
new['title'] = new.pop('name')
return new
# Unknown dict shape - keep as-is but coerce to string title fallback.
return {k: v for k, v in item.items()}
# Integers are likely label IDs
if isinstance(item, int):
return {'id': item}
# Otherwise treat as a title string
return {'title': str(item)}
if isinstance(labels, str):
parsed = [s.strip() for s in labels.split(',') if s.strip()]
labels_payload = {"labels": [make_label_obj(s) for s in parsed]}
elif isinstance(labels, list):
labels_payload = {"labels": [make_label_obj(i) for i in labels]}
elif isinstance(labels, dict):
labels_payload = {"labels": [make_label_obj(labels)]}
else:
labels_payload = {"labels": [make_label_obj(labels)]}
try:
logger.info("update_task_details: updating task_id=%s with payload keys=%s", task_id, list(payload.keys()))
response = session.post(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}", json=payload)
response.raise_for_status()
task_update_resp = response.json()
# If labels were provided, resolve existing labels and send them to the labels bulk endpoint
if labels_payload is not None:
try:
logger.info("update_task_details: resolving existing labels from %s/labels", VIKUNJA_URL)
# Fetch all existing labels for this user to try to reuse existing label IDs
existing_resp = session.get(f"{VIKUNJA_URL}/api/v1/labels")
existing_resp.raise_for_status()
existing_labels = existing_resp.json() or []
# Build a map of title(lowercase) -> id for quick lookup
title_to_id = {}
for lab in existing_labels:
if not isinstance(lab, dict):
continue
title = lab.get('title') or lab.get('name')
if title:
title_to_id[title.lower()] = lab.get('id')
final_labels = []
for item in labels_payload.get('labels', []):
# If already an id, pass through
if isinstance(item, dict) and 'id' in item:
final_labels.append({'id': item['id']})
continue
# Determine title string
if isinstance(item, dict):
title = item.get('title') or item.get('name') or str(item)
else:
title = str(item)
existing_id = title_to_id.get(title.lower())
if existing_id:
final_labels.append({'id': existing_id})
else:
# Try to create the label first via the /labels endpoint so we have a real id
try:
logger.info("update_task_details: creating missing label '%s' via %s/labels", title, VIKUNJA_URL)
create_resp = session.put(f"{VIKUNJA_URL}/api/v1/labels", json={"title": title, "description": title})
# If creation failed with 4xx/5xx this will raise and be caught below
create_resp.raise_for_status()
created = create_resp.json() or {}
created_id = created.get('id')
if created_id:
title_to_id[title.lower()] = created_id
final_labels.append({'id': created_id})
logger.info("update_task_details: created label '%s' id=%s", title, created_id)
else:
# Fallback: include title/description in payload if no id returned
logger.warning("update_task_details: label created but no id returned for title='%s' - including title in bulk payload", title)
final_labels.append({'title': title, 'description': title})
except requests.exceptions.RequestException as ce:
# Creation failed; log more details and include title in payload as a fallback
resp = getattr(ce, 'response', None)
if resp is not None:
try:
logger.error("update_task_details: label creation returned status=%s body=%s for title=%s", resp.status_code, resp.text, title)
except Exception:
logger.exception("update_task_details: failed to read create label response body for title=%s", title)
else:
logger.exception("update_task_details: label creation request failed for title=%s", title)
# As a last resort try to send title in bulk payload (some servers may accept this)
final_labels.append({'title': title, 'description': title})
final_payload = {'labels': final_labels}
logger.info("update_task_details: updating labels for task_id=%s labels=%s", task_id, final_payload)
lab_resp = session.post(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/labels/bulk", json=final_payload)
lab_resp.raise_for_status()
logger.info("update_task_details: labels updated successfully for task_id=%s", task_id)
except requests.exceptions.RequestException as le:
resp = getattr(le, 'response', None)
if resp is not None:
try:
logger.error("update_task_details: labels update returned status=%s body=%s", resp.status_code, resp.text)
except Exception:
logger.exception("update_task_details: failed to read labels response body for task_id=%s", task_id)
logger.exception("update_task_details: labels update request failed for task_id=%s", task_id)
return task_update_resp
except requests.exceptions.RequestException as e:
logger.exception("update_task_details: request failed for task_id=%s", task_id)
return f"Error updating task details: {e}"
if __name__ == "__main__":
print("--- Vikunja MCP Client ---")
print("Available commands: login, search_tasks, add_task, close_task, lookup_project, help, exit")
mcp.run()