PhoneWork/agent/scheduler.py
Yuyao Huang (Sam) 09b63341cd refactor: 统一使用现代类型注解替代传统类型注解
- 将 Dict、List 等传统类型注解替换为 dict、list 等现代类型注解
- 更新类型注解以更精确地反映变量类型
- 修复部分类型注解与实际使用不匹配的问题
- 优化部分代码逻辑以提高类型安全性
2026-03-28 14:27:21 +08:00

275 lines
8.8 KiB
Python

"""Simple scheduler for reminders and recurring tasks."""
from __future__ import annotations
import asyncio
import json
import logging
import time
import uuid
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__)
PERSISTENCE_FILE = Path(__file__).parent.parent / "scheduled_jobs.json"
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class ScheduledJob:
job_id: str
description: str
scheduled_at: float
delay_seconds: float
status: JobStatus = JobStatus.PENDING
is_recurring: bool = False
interval_seconds: Optional[float] = None
max_runs: int = 1
runs_completed: int = 0
notify_chat_id: Optional[str] = None
def to_dict(self) -> dict:
data = asdict(self)
data["status"] = self.status.value
return data
@classmethod
def from_dict(cls, data: dict) -> "ScheduledJob":
data = data.copy()
data["status"] = JobStatus(data.get("status", "pending"))
return cls(**data)
class Scheduler:
"""Singleton that manages scheduled jobs with Feishu notifications."""
def __init__(self) -> None:
self._jobs: dict[str, ScheduledJob] = {}
self._tasks: dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock()
self._started = False
async def start(self) -> None:
"""Load persisted jobs and reschedule pending ones."""
self._load()
self._started = True
now = time.time()
for job in list(self._jobs.values()):
if job.status == JobStatus.PENDING:
elapsed = now - job.scheduled_at
if job.is_recurring:
remaining = job.interval_seconds - (elapsed % job.interval_seconds) if job.interval_seconds else 0
task = asyncio.create_task(self._run_recurring(job.job_id, skip_initial=True, initial_delay=remaining))
self._tasks[job.job_id] = task
else:
remaining = max(0, job.delay_seconds - elapsed)
if remaining <= 0:
asyncio.create_task(self._run_once(job.job_id))
else:
task = asyncio.create_task(self._run_once(job.job_id, initial_delay=remaining))
self._tasks[job.job_id] = task
logger.info("Scheduler started with %d jobs", len(self._jobs))
def _load(self) -> None:
"""Load jobs from persistence file."""
if not PERSISTENCE_FILE.exists():
return
try:
with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for job_id, job_data in data.items():
job = ScheduledJob.from_dict(job_data)
if job.status not in (JobStatus.COMPLETED, JobStatus.CANCELLED):
self._jobs[job_id] = job
logger.info("Loaded %d jobs from %s", len(self._jobs), PERSISTENCE_FILE)
except Exception:
logger.exception("Failed to load scheduled jobs")
def _save(self) -> None:
"""Save jobs to persistence file."""
try:
data = {jid: job.to_dict() for jid, job in self._jobs.items()}
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception:
logger.exception("Failed to save scheduled jobs")
async def schedule_once(
self,
delay_seconds: float,
message: str,
notify_chat_id: Optional[str] = None,
) -> str:
"""Schedule a one-time reminder."""
job_id = str(uuid.uuid4())[:8]
job = ScheduledJob(
job_id=job_id,
description=message,
scheduled_at=time.time(),
delay_seconds=delay_seconds,
notify_chat_id=notify_chat_id,
)
async with self._lock:
self._jobs[job_id] = job
self._save()
task = asyncio.create_task(self._run_once(job_id))
self._tasks[job_id] = task
logger.info("Scheduled job %s: %s (in %ds)", job_id, message[:50], delay_seconds)
return job_id
async def schedule_recurring(
self,
interval_seconds: float,
message: str,
max_runs: int = 10,
notify_chat_id: Optional[str] = None,
) -> str:
"""Schedule a recurring reminder."""
job_id = str(uuid.uuid4())[:8]
job = ScheduledJob(
job_id=job_id,
description=message,
scheduled_at=time.time(),
delay_seconds=interval_seconds,
is_recurring=True,
interval_seconds=interval_seconds,
max_runs=max_runs,
notify_chat_id=notify_chat_id,
)
async with self._lock:
self._jobs[job_id] = job
self._save()
task = asyncio.create_task(self._run_recurring(job_id))
self._tasks[job_id] = task
logger.info("Scheduled recurring job %s: %s (every %ds, %d runs)",
job_id, message[:50], interval_seconds, max_runs)
return job_id
async def _run_once(self, job_id: str, initial_delay: Optional[float] = None) -> None:
"""Execute a one-time job after delay."""
job = self._jobs.get(job_id)
if not job:
return
delay = initial_delay if initial_delay is not None else job.delay_seconds
await asyncio.sleep(delay)
async with self._lock:
job.status = JobStatus.RUNNING
self._save()
await self._send_notification(job, job.description)
async with self._lock:
job.status = JobStatus.COMPLETED
job.runs_completed = 1
self._save()
logger.info("Job %s completed", job_id)
async def _run_recurring(self, job_id: str, skip_initial: bool = False, initial_delay: Optional[float] = None) -> None:
"""Execute a recurring job."""
job = self._jobs.get(job_id)
if not job:
return
interval = job.interval_seconds or 60
if skip_initial and initial_delay is not None:
await asyncio.sleep(initial_delay)
for run in range(job.max_runs):
if not (skip_initial and run == 0):
await asyncio.sleep(interval)
async with self._lock:
if job.status == JobStatus.CANCELLED:
break
job.status = JobStatus.RUNNING
self._save()
await self._send_notification(job, f"[{job.runs_completed + 1}/{job.max_runs}] {job.description}")
async with self._lock:
job.runs_completed += 1
if job.runs_completed < job.max_runs:
job.status = JobStatus.PENDING
else:
job.status = JobStatus.COMPLETED
self._save()
logger.info("Recurring job %s finished (%d runs)", job_id, job.runs_completed)
async def _send_notification(self, job: ScheduledJob, message: str) -> None:
"""Send Feishu notification."""
if not job.notify_chat_id:
return
from bot.feishu import send_text
try:
await send_text(job.notify_chat_id, "chat_id", f"⏰ **Reminder**\n\n{message}")
except Exception:
logger.exception("Failed to send notification for job %s", job.job_id)
def cancel(self, job_id: str) -> bool:
"""Cancel a scheduled job."""
job = self._jobs.get(job_id)
if not job:
return False
job.status = JobStatus.CANCELLED
self._save()
task = self._tasks.get(job_id)
if task:
task.cancel()
logger.info("Cancelled job %s", job_id)
return True
async def stop(self) -> None:
"""Stop all tasks and clear state."""
for task in self._tasks.values():
task.cancel()
self._tasks.clear()
async with self._lock:
self._jobs.clear()
if PERSISTENCE_FILE.exists():
PERSISTENCE_FILE.unlink()
logger.info("Scheduler stopped")
def list_jobs(self, limit: int = 20) -> list[dict]:
jobs = sorted(
[j for j in self._jobs.values() if j.status != JobStatus.COMPLETED],
key=lambda j: j.scheduled_at,
)[:limit]
return [
{
"job_id": j.job_id,
"description": j.description[:50],
"status": j.status.value,
"is_recurring": j.is_recurring,
"runs_completed": j.runs_completed,
"max_runs": j.max_runs,
}
for j in jobs
]
scheduler = Scheduler()