PhoneWork/agent/scheduler.py
Yuyao Huang (Sam) 52a9d085f7 feat: 重构数据存储路径并优化任务通知机制
将审计日志、会话数据和定时任务文件移动到统一的data目录下
为后台任务添加完成回调功能,优化CC任务完成后的通知流程
更新README和ROADMAP文档,标记已完成的功能项
2026-03-29 02:32:48 +08:00

276 lines
8.9 KiB
Python

"""Simple scheduler for reminders and recurring tasks."""
from __future__ import annotations
import asyncio
import json
import logging
import time
import uuid
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__)
PERSISTENCE_FILE = Path(__file__).parent.parent / "data" / "scheduled_jobs.json"
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class ScheduledJob:
job_id: str
description: str
scheduled_at: float
delay_seconds: float
status: JobStatus = JobStatus.PENDING
is_recurring: bool = False
interval_seconds: Optional[float] = None
max_runs: int = 1
runs_completed: int = 0
notify_chat_id: Optional[str] = None
def to_dict(self) -> dict:
data = asdict(self)
data["status"] = self.status.value
return data
@classmethod
def from_dict(cls, data: dict) -> "ScheduledJob":
data = data.copy()
data["status"] = JobStatus(data.get("status", "pending"))
return cls(**data)
class Scheduler:
"""Singleton that manages scheduled jobs with Feishu notifications."""
def __init__(self) -> None:
self._jobs: dict[str, ScheduledJob] = {}
self._tasks: dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock()
self._started = False
async def start(self) -> None:
"""Load persisted jobs and reschedule pending ones."""
self._load()
self._started = True
now = time.time()
for job in list(self._jobs.values()):
if job.status == JobStatus.PENDING:
elapsed = now - job.scheduled_at
if job.is_recurring:
remaining = job.interval_seconds - (elapsed % job.interval_seconds) if job.interval_seconds else 0
task = asyncio.create_task(self._run_recurring(job.job_id, skip_initial=True, initial_delay=remaining))
self._tasks[job.job_id] = task
else:
remaining = max(0, job.delay_seconds - elapsed)
if remaining <= 0:
asyncio.create_task(self._run_once(job.job_id))
else:
task = asyncio.create_task(self._run_once(job.job_id, initial_delay=remaining))
self._tasks[job.job_id] = task
logger.info("Scheduler started with %d jobs", len(self._jobs))
def _load(self) -> None:
"""Load jobs from persistence file."""
if not PERSISTENCE_FILE.exists():
return
try:
with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for job_id, job_data in data.items():
job = ScheduledJob.from_dict(job_data)
if job.status not in (JobStatus.COMPLETED, JobStatus.CANCELLED):
self._jobs[job_id] = job
logger.info("Loaded %d jobs from %s", len(self._jobs), PERSISTENCE_FILE)
except Exception:
logger.exception("Failed to load scheduled jobs")
def _save(self) -> None:
"""Save jobs to persistence file."""
try:
data = {jid: job.to_dict() for jid, job in self._jobs.items()}
PERSISTENCE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception:
logger.exception("Failed to save scheduled jobs")
async def schedule_once(
self,
delay_seconds: float,
message: str,
notify_chat_id: Optional[str] = None,
) -> str:
"""Schedule a one-time reminder."""
job_id = str(uuid.uuid4())[:8]
job = ScheduledJob(
job_id=job_id,
description=message,
scheduled_at=time.time(),
delay_seconds=delay_seconds,
notify_chat_id=notify_chat_id,
)
async with self._lock:
self._jobs[job_id] = job
self._save()
task = asyncio.create_task(self._run_once(job_id))
self._tasks[job_id] = task
logger.info("Scheduled job %s: %s (in %ds)", job_id, message[:50], delay_seconds)
return job_id
async def schedule_recurring(
self,
interval_seconds: float,
message: str,
max_runs: int = 10,
notify_chat_id: Optional[str] = None,
) -> str:
"""Schedule a recurring reminder."""
job_id = str(uuid.uuid4())[:8]
job = ScheduledJob(
job_id=job_id,
description=message,
scheduled_at=time.time(),
delay_seconds=interval_seconds,
is_recurring=True,
interval_seconds=interval_seconds,
max_runs=max_runs,
notify_chat_id=notify_chat_id,
)
async with self._lock:
self._jobs[job_id] = job
self._save()
task = asyncio.create_task(self._run_recurring(job_id))
self._tasks[job_id] = task
logger.info("Scheduled recurring job %s: %s (every %ds, %d runs)",
job_id, message[:50], interval_seconds, max_runs)
return job_id
async def _run_once(self, job_id: str, initial_delay: Optional[float] = None) -> None:
"""Execute a one-time job after delay."""
job = self._jobs.get(job_id)
if not job:
return
delay = initial_delay if initial_delay is not None else job.delay_seconds
await asyncio.sleep(delay)
async with self._lock:
job.status = JobStatus.RUNNING
self._save()
await self._send_notification(job, job.description)
async with self._lock:
job.status = JobStatus.COMPLETED
job.runs_completed = 1
self._save()
logger.info("Job %s completed", job_id)
async def _run_recurring(self, job_id: str, skip_initial: bool = False, initial_delay: Optional[float] = None) -> None:
"""Execute a recurring job."""
job = self._jobs.get(job_id)
if not job:
return
interval = job.interval_seconds or 60
if skip_initial and initial_delay is not None:
await asyncio.sleep(initial_delay)
for run in range(job.max_runs):
if not (skip_initial and run == 0):
await asyncio.sleep(interval)
async with self._lock:
if job.status == JobStatus.CANCELLED:
break
job.status = JobStatus.RUNNING
self._save()
await self._send_notification(job, f"[{job.runs_completed + 1}/{job.max_runs}] {job.description}")
async with self._lock:
job.runs_completed += 1
if job.runs_completed < job.max_runs:
job.status = JobStatus.PENDING
else:
job.status = JobStatus.COMPLETED
self._save()
logger.info("Recurring job %s finished (%d runs)", job_id, job.runs_completed)
async def _send_notification(self, job: ScheduledJob, message: str) -> None:
"""Send Feishu notification."""
if not job.notify_chat_id:
return
from bot.feishu import send_text
try:
await send_text(job.notify_chat_id, "chat_id", f"⏰ **Reminder**\n\n{message}")
except Exception:
logger.exception("Failed to send notification for job %s", job.job_id)
def cancel(self, job_id: str) -> bool:
"""Cancel a scheduled job."""
job = self._jobs.get(job_id)
if not job:
return False
job.status = JobStatus.CANCELLED
self._save()
task = self._tasks.get(job_id)
if task:
task.cancel()
logger.info("Cancelled job %s", job_id)
return True
async def stop(self) -> None:
"""Stop all tasks and clear state."""
for task in self._tasks.values():
task.cancel()
self._tasks.clear()
async with self._lock:
self._jobs.clear()
if PERSISTENCE_FILE.exists():
PERSISTENCE_FILE.unlink()
logger.info("Scheduler stopped")
def list_jobs(self, limit: int = 20) -> list[dict]:
jobs = sorted(
[j for j in self._jobs.values() if j.status != JobStatus.COMPLETED],
key=lambda j: j.scheduled_at,
)[:limit]
return [
{
"job_id": j.job_id,
"description": j.description[:50],
"status": j.status.value,
"is_recurring": j.is_recurring,
"runs_completed": j.runs_completed,
"max_runs": j.max_runs,
}
for j in jobs
]
scheduler = Scheduler()