718 lines
28 KiB
Python
718 lines
28 KiB
Python
import os
|
|
import requests
|
|
import logging
|
|
from fastmcp import FastMCP
|
|
|
|
# --- Configuration ---
|
|
VIKUNJA_URL = os.getenv("VIKUNJA_URL")
|
|
VIKUNJA_USERNAME = os.getenv("VIKUNJA_USERNAME")
|
|
VIKUNJA_PASSWORD = os.getenv("VIKUNJA_PASSWORD")
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
|
DEBUG_TASK_MATCHES = os.getenv("DEBUG_TASK_MATCHES", "false").lower() in ("1", "true", "yes")
|
|
|
|
# --- MCP Application Setup ---
|
|
mcp = FastMCP()
|
|
session = requests.Session()
|
|
|
|
# Configure basic logging. Level can be controlled with LOG_LEVEL env var.
|
|
level = getattr(logging, LOG_LEVEL.upper(), logging.INFO)
|
|
logging.basicConfig(level=level, format='%(asctime)s %(levelname)s %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
if DEBUG_TASK_MATCHES:
|
|
logger.info("DEBUG_TASK_MATCHES is enabled: per-task match attempts will be logged at INFO level")
|
|
|
|
# --- Input Validation ---
|
|
if not all([VIKUNJA_URL, VIKUNJA_USERNAME, VIKUNJA_PASSWORD]):
|
|
print("Error: Please set the VIKUNJA_URL, VIKUNJA_USERNAME, and VIKUNJA_PASSWORD environment variables.")
|
|
exit(1)
|
|
|
|
@mcp.tool()
|
|
def login():
|
|
"""
|
|
Authenticates with the Vikunja API to get a session token.
|
|
"""
|
|
global session
|
|
try:
|
|
response = session.post(
|
|
f"{VIKUNJA_URL}/api/v1/login",
|
|
json={"username": VIKUNJA_USERNAME, "password": VIKUNJA_PASSWORD}
|
|
)
|
|
response.raise_for_status()
|
|
token = response.json().get("token")
|
|
if not token:
|
|
return "Login failed: Token not found in response."
|
|
|
|
session.headers.update({"Authorization": f"Bearer {token}"})
|
|
return "Login successful. Token stored for session."
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Login failed: {e}"
|
|
|
|
@mcp.tool()
|
|
def search_tasks(query: str):
|
|
"""
|
|
Searches for tasks in Vikunja.
|
|
|
|
:param query: The search string to use for finding tasks.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
try:
|
|
# Vikunja does not expose a /tasks/search endpoint in the public API.
|
|
# Fetch all tasks and filter client-side by title/description.
|
|
logger.info("search_tasks: fetching all tasks from %s", f"{VIKUNJA_URL}/api/v1/tasks/all")
|
|
|
|
all_tasks = []
|
|
page = 1
|
|
per_page = 50
|
|
|
|
while True:
|
|
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/all?page={page}&limit={per_page}")
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
tasks = data if isinstance(data, list) else data.get("tasks", [])
|
|
if not tasks:
|
|
break
|
|
all_tasks.extend(tasks)
|
|
page += 1
|
|
|
|
tasks = all_tasks
|
|
|
|
q = (query or "").strip()
|
|
logger.info("search_tasks: raw query='%s'", q)
|
|
if not q:
|
|
logger.info("search_tasks: empty query, returning %d tasks", len(tasks))
|
|
return tasks
|
|
|
|
terms = [t.lower() for t in q.split() if t]
|
|
logger.info("search_tasks: parsed terms=%s", terms)
|
|
|
|
def matches(task: dict) -> bool:
|
|
title = (task.get("title") or "")
|
|
description = (task.get("description") or "")
|
|
title_l = title.lower()
|
|
desc_l = description.lower()
|
|
for term in terms:
|
|
if term in title_l or term in desc_l:
|
|
logger.debug("search_tasks: task id=%s matched term='%s' (title=%r, description=%r)", task.get("id"), term, title, description)
|
|
return True
|
|
else:
|
|
logger.debug("search_tasks: task id=%s did not match term='%s' (title=%r)", task.get("id"), term, title)
|
|
return False
|
|
|
|
filtered = [t for t in tasks if matches(t)]
|
|
logger.info("search_tasks: fetched=%d filtered=%d", len(tasks), len(filtered))
|
|
return filtered
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("search_tasks: request failed")
|
|
return f"Error searching tasks: {e}"
|
|
|
|
@mcp.tool()
|
|
def active_tasks(project_id: int = None, is_favorite: bool = None):
|
|
"""
|
|
Returns a list of active tasks (where 'done' is False) from Vikunja.
|
|
|
|
:param project_id: Optional project ID to filter tasks by project.
|
|
:param is_favorite: Optional filter to return only tasks matching favorite status.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
try:
|
|
logger.info("active_tasks: fetching all tasks from %s", f"{VIKUNJA_URL}/api/v1/tasks/all")
|
|
|
|
all_tasks = []
|
|
page = 1
|
|
per_page = 50
|
|
|
|
while True:
|
|
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/all?page={page}&limit={per_page}")
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
tasks = data if isinstance(data, list) else data.get("tasks", [])
|
|
if not tasks:
|
|
break
|
|
all_tasks.extend(tasks)
|
|
page += 1
|
|
|
|
# Filter active tasks (not done)
|
|
active = [t for t in all_tasks if not t.get("done", False)]
|
|
|
|
# If project_id provided, further filter by task's project_id
|
|
if project_id is not None:
|
|
before = len(active)
|
|
active = [t for t in active if t.get("project_id") == project_id or t.get("projectID") == project_id]
|
|
logger.info("active_tasks: filtered by project_id=%s before=%d after=%d", project_id, before, len(active))
|
|
|
|
# If is_favorite provided, filter by is_favorite / is_favourite field
|
|
if is_favorite is not None:
|
|
before = len(active)
|
|
def fav_val(task):
|
|
return bool(task.get("is_favorite", task.get("is_favourite", False)))
|
|
active = [t for t in active if fav_val(t) == bool(is_favorite)]
|
|
logger.info("active_tasks: filtered by is_favorite=%s before=%d after=%d", is_favorite, before, len(active))
|
|
|
|
logger.info("active_tasks: fetched=%d active=%d", len(all_tasks), len(active))
|
|
return active
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("active_tasks: request failed")
|
|
return f"Error retrieving active tasks: {e}"
|
|
|
|
@mcp.tool()
|
|
def get_task_details(task_id: int):
|
|
"""
|
|
Gets task details in Vikunja.
|
|
|
|
:param task_id: The ID of the task of which to fetch comments.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
try:
|
|
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
|
|
response.raise_for_status()
|
|
details = response.json()
|
|
if not details:
|
|
return "No details found for this task."
|
|
|
|
# Helpers to produce compact readable strings for nested structures
|
|
def join_names(items):
|
|
if not items:
|
|
return "[]"
|
|
out = []
|
|
for it in items:
|
|
if not isinstance(it, dict):
|
|
out.append(str(it))
|
|
continue
|
|
out.append(str(it.get("name") or it.get("username") or it.get("id") or it))
|
|
return ", ".join(out)
|
|
|
|
def labels_list(labels):
|
|
if not labels:
|
|
return "[]"
|
|
out = []
|
|
for l in labels:
|
|
if isinstance(l, dict):
|
|
out.append(str(l.get("title") or l.get("name") or l.get("id") or l))
|
|
else:
|
|
out.append(str(l))
|
|
return ", ".join(out)
|
|
|
|
fetched_id = details.get("id", "N/A")
|
|
title = details.get("title", "N/A")
|
|
identifier = details.get("identifier", "N/A")
|
|
created = details.get("created", "N/A")
|
|
updated = details.get("updated", "N/A")
|
|
created_by = details.get("created_by") or {}
|
|
created_by_str = f"{created_by.get('name','N/A')} (id={created_by.get('id','N/A')}, email={created_by.get('email','N/A')}, username={created_by.get('username','N/A')})"
|
|
|
|
description = details.get("description", "N/A")
|
|
done = details.get("done", "N/A")
|
|
done_at = details.get("done_at", "N/A")
|
|
percent_done = details.get("percent_done", "N/A")
|
|
due_date = details.get("due_date", "N/A")
|
|
start_date = details.get("start_date", "N/A")
|
|
end_date = details.get("end_date", "N/A")
|
|
repeat_mode = details.get("repeat_mode", "N/A")
|
|
repeat_after = details.get("repeat_after", "N/A")
|
|
|
|
project_id = details.get("project_id", "N/A")
|
|
bucket_id = details.get("bucket_id", "N/A")
|
|
# Coerce potentially-None fields to safe defaults so len() and iteration are safe.
|
|
# Vikunja may return null for some nested fields; normalize to empty lists/dicts
|
|
# to avoid TypeError when calling len() or iterating.
|
|
buckets = details.get("buckets") or []
|
|
index = details.get("index", "N/A")
|
|
position = details.get("position", "N/A")
|
|
|
|
priority = details.get("priority", "N/A")
|
|
is_favorite = details.get("is_favorite", details.get("is_favourite", "N/A"))
|
|
hex_color = details.get("hex_color", "N/A")
|
|
|
|
assignees = details.get("assignees") or []
|
|
attachments = details.get("attachments") or []
|
|
cover_image_attachment_id = details.get("cover_image_attachment_id", "N/A")
|
|
comments = details.get("comments") or []
|
|
labels = details.get("labels") or []
|
|
reminders = details.get("reminders") or []
|
|
reactions = details.get("reactions") or {}
|
|
related_tasks = details.get("related_tasks") or {}
|
|
subscription = details.get("subscription") or {}
|
|
|
|
result = []
|
|
result.append(f"ID: {fetched_id}, Identifier: {identifier}, Title: {title}")
|
|
result.append(f"Created: {created} by {created_by_str}, Updated: {updated}")
|
|
result.append(f"Done: {done} (done_at={done_at}), Percent Done: {percent_done}, Priority: {priority}, Favorite: {is_favorite}")
|
|
result.append(f"Due Date: {due_date}, Start Date: {start_date}, End Date: {end_date}, Repeat Mode: {repeat_mode}, Repeat After: {repeat_after}")
|
|
result.append(f"Project ID: {project_id}, Bucket ID: {bucket_id}, Buckets: {len(buckets)} items, Index: {index}, Position: {position}")
|
|
result.append(f"Labels: {labels_list(labels)}, Hex Color: {hex_color}")
|
|
result.append(f"Assignees: {join_names(assignees)}, Attachments Count: {len(attachments)}, Cover Image Attachment ID: {cover_image_attachment_id}")
|
|
result.append(f"Comments Count: {len(comments)}, Reminders Count: {len(reminders)}")
|
|
result.append(f"Reactions: {reactions}, Related Tasks: {related_tasks}, Subscription: {subscription}")
|
|
result.append(f"Description: {description}")
|
|
|
|
return "\n".join(result)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("get_task_details: request failed for task_id=%s", task_id)
|
|
return f"Error looking up details for task: {e}"
|
|
|
|
@mcp.tool()
|
|
def add_task(project_id: int, title: str, description: str = ""):
|
|
"""
|
|
Adds a new task to a Vikunja project.
|
|
|
|
:param project_id: The ID of the project to add the task to.
|
|
:param title: The title of the new task.
|
|
:param description: An optional description for the task.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
task_payload = {
|
|
"project_id": project_id,
|
|
"title": title,
|
|
"description": description
|
|
}
|
|
|
|
try:
|
|
response = session.put(
|
|
f"{VIKUNJA_URL}/api/v1/projects/{project_id}/tasks",
|
|
json=task_payload
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Error adding task: {e}"
|
|
|
|
@mcp.tool()
|
|
def update_task_title(task_id: int, title: str):
|
|
"""
|
|
Updates an existing title on a task.
|
|
|
|
:param task_id: The ID of the task.
|
|
:param title: The updated title text.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
if not (title or "").strip():
|
|
return "Title cannot be empty."
|
|
|
|
task_payload = {
|
|
"title": title
|
|
}
|
|
|
|
try:
|
|
response = session.post(
|
|
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
|
|
json=task_payload
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("update_task_title: request failed for task_id=%s", task_id)
|
|
return f"Error updating task title: {e}"
|
|
|
|
@mcp.tool()
|
|
def update_task_description(task_id: int, description: str):
|
|
"""
|
|
Updates an existing description on a task.
|
|
|
|
:param task_id: The ID of the task.
|
|
:param description: The updated description text.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
if not (description or "").strip():
|
|
return "Description cannot be empty."
|
|
|
|
task_payload = {
|
|
"description": description
|
|
}
|
|
|
|
try:
|
|
response = session.post(
|
|
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
|
|
json=task_payload
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("update_task_description: request failed for task_id=%s", task_id)
|
|
return f"Error updating task description: {e}"
|
|
|
|
@mcp.tool()
|
|
def delete_task(task_id: int):
|
|
"""
|
|
Deletes an existing task.
|
|
|
|
:param task_id: The ID of the task.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
try:
|
|
response = session.delete(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("delete_task: request failed for task_id=%s", task_id)
|
|
return f"Error deleting task: {e}"
|
|
|
|
@mcp.tool()
|
|
def close_task(task_id: int):
|
|
"""
|
|
Closes (marks as done) a task in Vikunja.
|
|
|
|
:param task_id: The ID of the task to close.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
task_payload = {
|
|
"done": True
|
|
}
|
|
|
|
try:
|
|
response = session.post(
|
|
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
|
|
json=task_payload
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Error closing task: {e}"
|
|
|
|
@mcp.tool()
|
|
def comment_task(task_id: int, description: str):
|
|
"""
|
|
Adds a comment to a task in Vikunja.
|
|
|
|
:param task_id: The ID of the task to comment on.
|
|
:param description: The text of the comment to add.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
if not (description or "").strip():
|
|
return "Comment description cannot be empty."
|
|
|
|
payload = {"comment": description}
|
|
|
|
try:
|
|
response = session.put(
|
|
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments",
|
|
json=payload
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("comment_task: request failed for task_id=%s", task_id)
|
|
return f"Error adding comment to task: {e}"
|
|
|
|
@mcp.tool()
|
|
def lookup_comment_task(task_id: int):
|
|
"""
|
|
Lists comments in a task in Vikunja.
|
|
|
|
:param task_id: The ID of the task of which to fetch comments.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
try:
|
|
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments")
|
|
response.raise_for_status()
|
|
comments = response.json()
|
|
if not comments:
|
|
return "No comments found for this task."
|
|
|
|
result = []
|
|
for comment in comments:
|
|
comment_id = comment.get("id", "N/A")
|
|
comment_text = comment.get("comment", "N/A")
|
|
comment_created = comment.get("created", "N/A")
|
|
comment_author = comment.get("author", "N/A")
|
|
result.append(f"ID: {comment_id}, Created: {comment_created}, Author: {comment_author}, Comment: {comment_text}")
|
|
return "\n".join(result)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("lookup_comment_task: request failed for task_id=%s", task_id)
|
|
return f"Error looking up comments to task: {e}"
|
|
|
|
@mcp.tool()
|
|
def update_comment(task_id: int, comment_id: int, comment: str):
|
|
"""
|
|
Updates an existing comment on a task.
|
|
|
|
:param task_id: The ID of the task.
|
|
:param comment_id: The ID of the comment to update.
|
|
:param comment: The updated comment text.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
if not (comment or "").strip():
|
|
return "Comment cannot be empty."
|
|
|
|
payload = {"comment": comment}
|
|
|
|
try:
|
|
response = session.post(
|
|
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments/{comment_id}",
|
|
json=payload
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("update_comment: request failed for task_id=%s, comment_id=%s", task_id, comment_id)
|
|
return f"Error updating comment for task: {e}"
|
|
|
|
@mcp.tool()
|
|
def lookup_project():
|
|
"""
|
|
Retrieves all projects the user has access to and lists them by name and ID.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
try:
|
|
response = session.get(f"{VIKUNJA_URL}/api/v1/projects")
|
|
response.raise_for_status()
|
|
projects = response.json()
|
|
|
|
if not projects:
|
|
return "No projects found."
|
|
|
|
result = []
|
|
for project in projects:
|
|
project_id = project.get("id", "N/A")
|
|
project_title = project.get("title", "Untitled")
|
|
result.append(f"ID: {project_id}, Name: {project_title}")
|
|
|
|
return "\n".join(result)
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Error retrieving projects: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
def create_project(title: str, description: str = "", is_favorite: bool = False):
|
|
"""
|
|
Creates a new project in Vikunja.
|
|
|
|
:param title: The title of the project.
|
|
:param description: Optional description for the project.
|
|
:param is_favorite: Whether the project should be marked as favorite.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
if not (title or "").strip():
|
|
return "Project title cannot be empty."
|
|
|
|
payload = {
|
|
"title": title,
|
|
"description": description,
|
|
"is_favourite": bool(is_favorite)
|
|
}
|
|
|
|
try:
|
|
response = session.put(
|
|
f"{VIKUNJA_URL}/api/v1/projects",
|
|
json=payload
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("create_project: request failed for title=%s", title)
|
|
return f"Error creating project: {e}"
|
|
|
|
@mcp.tool()
|
|
def update_task_details(
|
|
task_id: int,
|
|
title: str = None,
|
|
description: str = None,
|
|
done: bool = None,
|
|
percent_done: int = None,
|
|
due_date: str = None,
|
|
start_date: str = None,
|
|
end_date: str = None,
|
|
repeat_mode: int = None,
|
|
repeat_after: int = None,
|
|
project_id: int = None,
|
|
bucket_id: int = None,
|
|
priority: int = None,
|
|
position: int = None,
|
|
is_favorite: bool = None,
|
|
hex_color: str = None,
|
|
labels: str = None,
|
|
):
|
|
"""
|
|
Updates any primary fields on a task. Only fields provided (not None) will be sent.
|
|
If `labels` is provided as a comma-separated string, it will replace existing labels.
|
|
"""
|
|
if "Authorization" not in session.headers:
|
|
return "Please run the 'login' command first."
|
|
|
|
try:
|
|
# First, fetch the existing task to get all its properties
|
|
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("update_task_details: failed to fetch existing task id=%s", task_id)
|
|
return f"Error fetching task details before update: {e}"
|
|
|
|
|
|
# Simple helper to set only when value explicitly passed (not None)
|
|
def set_if(provided, key, transform=lambda x: x):
|
|
if provided is not None:
|
|
payload[key] = transform(provided)
|
|
|
|
set_if(title, "title")
|
|
set_if(description, "description")
|
|
set_if(done, "done")
|
|
set_if(percent_done, "percent_done")
|
|
set_if(due_date, "due_date")
|
|
set_if(start_date, "start_date")
|
|
set_if(end_date, "end_date")
|
|
set_if(repeat_mode, "repeat_mode")
|
|
set_if(repeat_after, "repeat_after")
|
|
set_if(project_id, "project_id")
|
|
set_if(bucket_id, "bucket_id")
|
|
set_if(priority, "priority")
|
|
set_if(position, "position")
|
|
set_if(is_favorite, "is_favorite")
|
|
set_if(hex_color, "hex_color")
|
|
|
|
# Prepare labels payload separately because Vikunja's task update endpoint
|
|
# does not accept label updates; labels are managed via the labels bulk API.
|
|
labels_payload = None
|
|
if labels is not None:
|
|
# Vikunja expects an array of Label objects, not raw strings.
|
|
# Normalize various inputs into a list of label objects like {"title": "..."} or {"id": 123}.
|
|
def make_label_obj(item):
|
|
# If it's already a dict, try to normalize known keys.
|
|
if isinstance(item, dict):
|
|
# If key 'name' used, convert to 'title' which Vikunja also accepts.
|
|
if 'title' in item or 'id' in item:
|
|
return item
|
|
if 'name' in item:
|
|
new = dict(item)
|
|
new['title'] = new.pop('name')
|
|
return new
|
|
# Unknown dict shape - keep as-is but coerce to string title fallback.
|
|
return {k: v for k, v in item.items()}
|
|
# Integers are likely label IDs
|
|
if isinstance(item, int):
|
|
return {'id': item}
|
|
# Otherwise treat as a title string
|
|
return {'title': str(item)}
|
|
|
|
if isinstance(labels, str):
|
|
parsed = [s.strip() for s in labels.split(',') if s.strip()]
|
|
labels_payload = {"labels": [make_label_obj(s) for s in parsed]}
|
|
elif isinstance(labels, list):
|
|
labels_payload = {"labels": [make_label_obj(i) for i in labels]}
|
|
elif isinstance(labels, dict):
|
|
labels_payload = {"labels": [make_label_obj(labels)]}
|
|
else:
|
|
labels_payload = {"labels": [make_label_obj(labels)]}
|
|
|
|
try:
|
|
logger.info("update_task_details: updating task_id=%s with payload keys=%s", task_id, list(payload.keys()))
|
|
response = session.post(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}", json=payload)
|
|
response.raise_for_status()
|
|
task_update_resp = response.json()
|
|
|
|
# If labels were provided, resolve existing labels and send them to the labels bulk endpoint
|
|
if labels_payload is not None:
|
|
try:
|
|
logger.info("update_task_details: resolving existing labels from %s/labels", VIKUNJA_URL)
|
|
# Fetch all existing labels for this user to try to reuse existing label IDs
|
|
existing_resp = session.get(f"{VIKUNJA_URL}/api/v1/labels")
|
|
existing_resp.raise_for_status()
|
|
existing_labels = existing_resp.json() or []
|
|
|
|
# Build a map of title(lowercase) -> id for quick lookup
|
|
title_to_id = {}
|
|
for lab in existing_labels:
|
|
if not isinstance(lab, dict):
|
|
continue
|
|
title = lab.get('title') or lab.get('name')
|
|
if title:
|
|
title_to_id[title.lower()] = lab.get('id')
|
|
|
|
final_labels = []
|
|
for item in labels_payload.get('labels', []):
|
|
# If already an id, pass through
|
|
if isinstance(item, dict) and 'id' in item:
|
|
final_labels.append({'id': item['id']})
|
|
continue
|
|
|
|
# Determine title string
|
|
if isinstance(item, dict):
|
|
title = item.get('title') or item.get('name') or str(item)
|
|
else:
|
|
title = str(item)
|
|
|
|
existing_id = title_to_id.get(title.lower())
|
|
if existing_id:
|
|
final_labels.append({'id': existing_id})
|
|
else:
|
|
# Try to create the label first via the /labels endpoint so we have a real id
|
|
try:
|
|
logger.info("update_task_details: creating missing label '%s' via %s/labels", title, VIKUNJA_URL)
|
|
create_resp = session.put(f"{VIKUNJA_URL}/api/v1/labels", json={"title": title, "description": title})
|
|
# If creation failed with 4xx/5xx this will raise and be caught below
|
|
create_resp.raise_for_status()
|
|
created = create_resp.json() or {}
|
|
created_id = created.get('id')
|
|
if created_id:
|
|
title_to_id[title.lower()] = created_id
|
|
final_labels.append({'id': created_id})
|
|
logger.info("update_task_details: created label '%s' id=%s", title, created_id)
|
|
else:
|
|
# Fallback: include title/description in payload if no id returned
|
|
logger.warning("update_task_details: label created but no id returned for title='%s' - including title in bulk payload", title)
|
|
final_labels.append({'title': title, 'description': title})
|
|
except requests.exceptions.RequestException as ce:
|
|
# Creation failed; log more details and include title in payload as a fallback
|
|
resp = getattr(ce, 'response', None)
|
|
if resp is not None:
|
|
try:
|
|
logger.error("update_task_details: label creation returned status=%s body=%s for title=%s", resp.status_code, resp.text, title)
|
|
except Exception:
|
|
logger.exception("update_task_details: failed to read create label response body for title=%s", title)
|
|
else:
|
|
logger.exception("update_task_details: label creation request failed for title=%s", title)
|
|
# As a last resort try to send title in bulk payload (some servers may accept this)
|
|
final_labels.append({'title': title, 'description': title})
|
|
|
|
final_payload = {'labels': final_labels}
|
|
|
|
logger.info("update_task_details: updating labels for task_id=%s labels=%s", task_id, final_payload)
|
|
lab_resp = session.post(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/labels/bulk", json=final_payload)
|
|
lab_resp.raise_for_status()
|
|
logger.info("update_task_details: labels updated successfully for task_id=%s", task_id)
|
|
except requests.exceptions.RequestException as le:
|
|
resp = getattr(le, 'response', None)
|
|
if resp is not None:
|
|
try:
|
|
logger.error("update_task_details: labels update returned status=%s body=%s", resp.status_code, resp.text)
|
|
except Exception:
|
|
logger.exception("update_task_details: failed to read labels response body for task_id=%s", task_id)
|
|
logger.exception("update_task_details: labels update request failed for task_id=%s", task_id)
|
|
|
|
return task_update_resp
|
|
except requests.exceptions.RequestException as e:
|
|
logger.exception("update_task_details: request failed for task_id=%s", task_id)
|
|
return f"Error updating task details: {e}"
|
|
|
|
if __name__ == "__main__":
|
|
print("--- Vikunja MCP Client ---")
|
|
print("Available commands: login, search_tasks, add_task, close_task, lookup_project, help, exit")
|
|
mcp.run()
|