167 lines
5.3 KiB
Python
167 lines
5.3 KiB
Python
"""Background task runner for long-running operations."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import Any, Awaitable, Callable, Dict, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TaskStatus(str, Enum):
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
|
|
@dataclass
|
|
class BackgroundTask:
|
|
task_id: str
|
|
description: str
|
|
started_at: float
|
|
status: TaskStatus = TaskStatus.PENDING
|
|
completed_at: Optional[float] = None
|
|
result: Optional[str] = None
|
|
error: Optional[str] = None
|
|
notify_chat_id: Optional[str] = None
|
|
user_id: Optional[str] = None
|
|
|
|
@property
|
|
def elapsed(self) -> float:
|
|
if self.completed_at:
|
|
return self.completed_at - self.started_at
|
|
return time.time() - self.started_at
|
|
|
|
|
|
class TaskRunner:
|
|
"""Singleton that manages background tasks with Feishu notifications."""
|
|
|
|
def __init__(self) -> None:
|
|
self._tasks: dict[str, BackgroundTask] = {}
|
|
self._lock = asyncio.Lock()
|
|
self._notification_handler: Optional[Callable[[BackgroundTask], Awaitable[None]]] = None
|
|
|
|
def set_notification_handler(self, handler: Optional[Callable[[BackgroundTask], Awaitable[None]]]) -> None:
|
|
"""Set custom notification handler for M3 mode (host client -> router)."""
|
|
self._notification_handler = handler
|
|
|
|
async def submit(
|
|
self,
|
|
coro: Awaitable[Any],
|
|
description: str,
|
|
notify_chat_id: Optional[str] = None,
|
|
user_id: Optional[str] = None,
|
|
on_complete: Optional[Callable[[BackgroundTask], Awaitable[None]]] = None,
|
|
) -> str:
|
|
"""Submit a coroutine as a background task."""
|
|
task_id = str(uuid.uuid4())[:8]
|
|
task = BackgroundTask(
|
|
task_id=task_id,
|
|
description=description,
|
|
started_at=time.time(),
|
|
status=TaskStatus.PENDING,
|
|
notify_chat_id=notify_chat_id,
|
|
user_id=user_id,
|
|
)
|
|
|
|
async with self._lock:
|
|
self._tasks[task_id] = task
|
|
|
|
asyncio.create_task(self._run_task(task_id, coro, on_complete))
|
|
logger.info("Submitted background task %s: %s", task_id, description)
|
|
return task_id
|
|
|
|
async def _run_task(self, task_id: str, coro: Awaitable[Any], on_complete: Optional[Callable[[BackgroundTask], Awaitable[None]]] = None) -> None:
|
|
"""Execute a task and send notification on completion."""
|
|
async with self._lock:
|
|
task = self._tasks.get(task_id)
|
|
if not task:
|
|
return
|
|
task.status = TaskStatus.RUNNING
|
|
|
|
try:
|
|
result = await coro
|
|
async with self._lock:
|
|
task.status = TaskStatus.COMPLETED
|
|
task.completed_at = time.time()
|
|
task.result = str(result)[:2000] if result else None
|
|
|
|
logger.info("Task %s completed in %.1fs", task_id, task.elapsed)
|
|
|
|
except Exception as exc:
|
|
async with self._lock:
|
|
task.status = TaskStatus.FAILED
|
|
task.completed_at = time.time()
|
|
task.error = str(exc)[:500]
|
|
|
|
logger.exception("Task %s failed: %s", task_id, exc)
|
|
|
|
if task.notify_chat_id:
|
|
if self._notification_handler:
|
|
await self._notification_handler(task)
|
|
else:
|
|
await self._send_notification(task)
|
|
|
|
if on_complete and task.status == TaskStatus.COMPLETED:
|
|
try:
|
|
await on_complete(task)
|
|
except Exception:
|
|
logger.exception("on_complete callback failed for task %s", task_id)
|
|
|
|
async def _send_notification(self, task: BackgroundTask) -> None:
|
|
"""Send Feishu notification about task completion."""
|
|
from bot.feishu import send_text
|
|
|
|
if task.status == TaskStatus.COMPLETED:
|
|
emoji = "✅"
|
|
status_text = "done"
|
|
else:
|
|
emoji = "❌"
|
|
status_text = "failed"
|
|
|
|
elapsed = int(task.elapsed)
|
|
msg = f"{emoji} Task #{task.task_id} {status_text} ({elapsed}s)\n{task.description}"
|
|
|
|
if task.result:
|
|
truncated = task.result[:800]
|
|
if len(task.result) > 800:
|
|
truncated += "..."
|
|
msg += f"\n\n```\n{truncated}\n```"
|
|
elif task.error:
|
|
msg += f"\n\n**Error:** {task.error}"
|
|
|
|
try:
|
|
if task.notify_chat_id:
|
|
await send_text(task.notify_chat_id, "chat_id", msg)
|
|
except Exception:
|
|
logger.exception("Failed to send notification for task %s", task.task_id)
|
|
|
|
def get_task(self, task_id: str) -> Optional[BackgroundTask]:
|
|
return self._tasks.get(task_id)
|
|
|
|
def list_tasks(self, limit: int = 20) -> list[dict[str, Any]]:
|
|
tasks = sorted(
|
|
self._tasks.values(),
|
|
key=lambda t: t.started_at,
|
|
reverse=True,
|
|
)[:limit]
|
|
return [
|
|
{
|
|
"task_id": t.task_id,
|
|
"description": t.description,
|
|
"status": t.status.value,
|
|
"elapsed": int(t.elapsed),
|
|
"started_at": t.started_at,
|
|
}
|
|
for t in tasks
|
|
]
|
|
|
|
|
|
task_runner = TaskRunner()
|