PhoneWork/agent/scheduler.py
Yuyao Huang (Sam) 8ecc701d5e feat: 添加任务调度器、后台任务运行器及多种工具支持
实现后台任务调度器(scheduler.py)和任务运行器(task_runner.py),支持长时间运行任务的异步执行和状态跟踪
新增多种工具支持:Shell命令执行、文件操作(读写/搜索/发送)、网页搜索/问答、定时提醒等
扩展README和ROADMAP文档,描述新功能和未来多主机架构规划
在配置文件中添加METASO_API_KEY支持秘塔AI搜索功能
优化代理逻辑,自动识别通用问题直接回答而不创建会话
2026-03-28 13:45:20 +08:00

275 lines
8.8 KiB
Python

"""Simple scheduler for reminders and recurring tasks."""
from __future__ import annotations
import asyncio
import json
import logging
import time
import uuid
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__)
PERSISTENCE_FILE = Path(__file__).parent.parent / "scheduled_jobs.json"
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class ScheduledJob:
job_id: str
description: str
scheduled_at: float
delay_seconds: float
status: JobStatus = JobStatus.PENDING
is_recurring: bool = False
interval_seconds: Optional[float] = None
max_runs: int = 1
runs_completed: int = 0
notify_chat_id: Optional[str] = None
def to_dict(self) -> dict:
data = asdict(self)
data["status"] = self.status.value
return data
@classmethod
def from_dict(cls, data: dict) -> "ScheduledJob":
data = data.copy()
data["status"] = JobStatus(data.get("status", "pending"))
return cls(**data)
class Scheduler:
"""Singleton that manages scheduled jobs with Feishu notifications."""
def __init__(self) -> None:
self._jobs: Dict[str, ScheduledJob] = {}
self._tasks: Dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock()
self._started = False
async def start(self) -> None:
"""Load persisted jobs and reschedule pending ones."""
self._load()
self._started = True
now = time.time()
for job in list(self._jobs.values()):
if job.status == JobStatus.PENDING:
elapsed = now - job.scheduled_at
if job.is_recurring:
remaining = job.interval_seconds - (elapsed % job.interval_seconds) if job.interval_seconds else 0
task = asyncio.create_task(self._run_recurring(job.job_id, skip_initial=True, initial_delay=remaining))
self._tasks[job.job_id] = task
else:
remaining = max(0, job.delay_seconds - elapsed)
if remaining <= 0:
asyncio.create_task(self._run_once(job.job_id))
else:
task = asyncio.create_task(self._run_once(job.job_id, initial_delay=remaining))
self._tasks[job.job_id] = task
logger.info("Scheduler started with %d jobs", len(self._jobs))
def _load(self) -> None:
"""Load jobs from persistence file."""
if not PERSISTENCE_FILE.exists():
return
try:
with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for job_id, job_data in data.items():
job = ScheduledJob.from_dict(job_data)
if job.status not in (JobStatus.COMPLETED, JobStatus.CANCELLED):
self._jobs[job_id] = job
logger.info("Loaded %d jobs from %s", len(self._jobs), PERSISTENCE_FILE)
except Exception:
logger.exception("Failed to load scheduled jobs")
def _save(self) -> None:
"""Save jobs to persistence file."""
try:
data = {jid: job.to_dict() for jid, job in self._jobs.items()}
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception:
logger.exception("Failed to save scheduled jobs")
async def schedule_once(
self,
delay_seconds: float,
message: str,
notify_chat_id: Optional[str] = None,
) -> str:
"""Schedule a one-time reminder."""
job_id = str(uuid.uuid4())[:8]
job = ScheduledJob(
job_id=job_id,
description=message,
scheduled_at=time.time(),
delay_seconds=delay_seconds,
notify_chat_id=notify_chat_id,
)
async with self._lock:
self._jobs[job_id] = job
self._save()
task = asyncio.create_task(self._run_once(job_id))
self._tasks[job_id] = task
logger.info("Scheduled job %s: %s (in %ds)", job_id, message[:50], delay_seconds)
return job_id
async def schedule_recurring(
self,
interval_seconds: float,
message: str,
max_runs: int = 10,
notify_chat_id: Optional[str] = None,
) -> str:
"""Schedule a recurring reminder."""
job_id = str(uuid.uuid4())[:8]
job = ScheduledJob(
job_id=job_id,
description=message,
scheduled_at=time.time(),
delay_seconds=interval_seconds,
is_recurring=True,
interval_seconds=interval_seconds,
max_runs=max_runs,
notify_chat_id=notify_chat_id,
)
async with self._lock:
self._jobs[job_id] = job
self._save()
task = asyncio.create_task(self._run_recurring(job_id))
self._tasks[job_id] = task
logger.info("Scheduled recurring job %s: %s (every %ds, %d runs)",
job_id, message[:50], interval_seconds, max_runs)
return job_id
async def _run_once(self, job_id: str, initial_delay: Optional[float] = None) -> None:
"""Execute a one-time job after delay."""
job = self._jobs.get(job_id)
if not job:
return
delay = initial_delay if initial_delay is not None else job.delay_seconds
await asyncio.sleep(delay)
async with self._lock:
job.status = JobStatus.RUNNING
self._save()
await self._send_notification(job, job.description)
async with self._lock:
job.status = JobStatus.COMPLETED
job.runs_completed = 1
self._save()
logger.info("Job %s completed", job_id)
async def _run_recurring(self, job_id: str, skip_initial: bool = False, initial_delay: Optional[float] = None) -> None:
"""Execute a recurring job."""
job = self._jobs.get(job_id)
if not job:
return
interval = job.interval_seconds or 60
if skip_initial and initial_delay is not None:
await asyncio.sleep(initial_delay)
for run in range(job.max_runs):
if not (skip_initial and run == 0):
await asyncio.sleep(interval)
async with self._lock:
if job.status == JobStatus.CANCELLED:
break
job.status = JobStatus.RUNNING
self._save()
await self._send_notification(job, f"[{job.runs_completed + 1}/{job.max_runs}] {job.description}")
async with self._lock:
job.runs_completed += 1
if job.runs_completed < job.max_runs:
job.status = JobStatus.PENDING
else:
job.status = JobStatus.COMPLETED
self._save()
logger.info("Recurring job %s finished (%d runs)", job_id, job.runs_completed)
async def _send_notification(self, job: ScheduledJob, message: str) -> None:
"""Send Feishu notification."""
if not job.notify_chat_id:
return
from bot.feishu import send_text
try:
await send_text(job.notify_chat_id, "chat_id", f"⏰ **Reminder**\n\n{message}")
except Exception:
logger.exception("Failed to send notification for job %s", job.job_id)
def cancel(self, job_id: str) -> bool:
"""Cancel a scheduled job."""
job = self._jobs.get(job_id)
if not job:
return False
job.status = JobStatus.CANCELLED
self._save()
task = self._tasks.get(job_id)
if task:
task.cancel()
logger.info("Cancelled job %s", job_id)
return True
async def stop(self) -> None:
"""Stop all tasks and clear state."""
for task in self._tasks.values():
task.cancel()
self._tasks.clear()
async with self._lock:
self._jobs.clear()
if PERSISTENCE_FILE.exists():
PERSISTENCE_FILE.unlink()
logger.info("Scheduler stopped")
def list_jobs(self, limit: int = 20) -> list[dict]:
jobs = sorted(
[j for j in self._jobs.values() if j.status != JobStatus.COMPLETED],
key=lambda j: j.scheduled_at,
)[:limit]
return [
{
"job_id": j.job_id,
"description": j.description[:50],
"status": j.status.value,
"is_recurring": j.is_recurring,
"runs_completed": j.runs_completed,
"max_runs": j.max_runs,
}
for j in jobs
]
scheduler = Scheduler()