PhoneWork/bot/feishu.py
Yuyao Huang (Sam) 0eb29f2dcc feat: 初始化项目基础结构
添加项目基础文件和目录结构,包括:
- 初始化空包目录(bot/agent/orchestrator)
- 配置文件(config.py)和示例(keyring.example.yaml)
- 依赖文件(requirements.txt)
- 主程序入口(main.py)
- 调试脚本(debug_test.py)
- 文档说明(README.md)
- Git忽略文件(.gitignore)
- 核心功能模块(pty_process/manager/handler/feishu等)
2026-03-28 07:44:44 +08:00

80 lines
1.9 KiB
Python

"""Feishu API client: send messages back to users."""
from __future__ import annotations
import logging
import lark_oapi as lark
from lark_oapi.api.im.v1 import (
CreateMessageRequest,
CreateMessageRequestBody,
)
from config import FEISHU_APP_ID, FEISHU_APP_SECRET
logger = logging.getLogger(__name__)
# Max Feishu text message length
MAX_TEXT_LEN = 4000
def _make_client() -> lark.Client:
return (
lark.Client.builder()
.app_id(FEISHU_APP_ID)
.app_secret(FEISHU_APP_SECRET)
.log_level(lark.LogLevel.WARNING)
.build()
)
_client = _make_client()
def _truncate(text: str) -> str:
if len(text) <= MAX_TEXT_LEN:
return text
return text[: MAX_TEXT_LEN - 20] + "\n...[truncated]"
async def send_text(receive_id: str, receive_id_type: str, text: str) -> None:
"""
Send a plain-text message to a Feishu chat or user.
Args:
receive_id: chat_id or open_id depending on receive_id_type.
receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id".
text: message content.
"""
import json as _json
content = _json.dumps({"text": _truncate(text)}, ensure_ascii=False)
request = (
CreateMessageRequest.builder()
.receive_id_type(receive_id_type)
.request_body(
CreateMessageRequestBody.builder()
.receive_id(receive_id)
.msg_type("text")
.content(content)
.build()
)
.build()
)
import asyncio
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: _client.im.v1.message.create(request),
)
if not response.success():
logger.error(
"Feishu send_text failed: code=%s msg=%s",
response.code,
response.msg,
)
else:
logger.debug("Sent message to %s (%s)", receive_id, receive_id_type)