技术设计文档 · 2026-06-26 · v2 (Codex Review 后修订)
当前看板任务的生命周期如下:
blade kanban create 创建任务_maybe_auto_run_task 自动创建无头会话 B 执行任务submit_result,任务状态更新为 review缺失环节:步骤 4 完成后,前台智能体(会话 A)完全不知道任务已完成。agent-board:board-task:changed 事件只广播到浏览器的 project room,不会进入会话 A 的对话上下文。用户必须手动告知前台智能体。
<system-notification> 消息注入模式和 agent-board:board-task:changed 事件,不增加 SDK 适配负担在 _run_task_headless 的 finally 块中,无头任务完成后,找到该任务的来源会话(origin_session_id),向其注入一条 <system-notification> 消息。通知以 asyncio.create_task() 异步调度,不阻塞 finally 块的收尾流程。
无头会话 B 完成
│
▼
_run_task_headless finally 块
│
├── 1. sync_task_runtime_events() ← 已有
├── 2. _settle_session_from_submit_result() ← 已有
├── 3. 更新任务状态 → review ← 已有
├── 4. emit_board_task_changed() ← 已有(通知浏览器)
│
└── 5. asyncio.create_task(_notify_origin_session(...)) ← 新增
│ fire-and-forget
▼
_notify_origin_session()
│
├── 查找 origin_session_id
│
├── 会话正在对话中?
│ └── YES → append_pending_user_message()
│
└── 会话空闲?
└── YES → engine.chat(notification_msg, event_handler=bridge)
│
└── 捕获 RuntimeError("already running")?
└── 降级 → append_pending_user_message()
复用已有的 <system-notification> 格式(与后台 Bash 任务完成通知一致):
<system-notification>
看板任务「{task.title}」(#{task.id}) 已完成执行,状态已更新为「待审核」。
结果摘要:{summary}
PR 链接:{pr_url} ← 仅当存在时
错误信息:{err_msg} ← 仅当失败时
请根据结果进行后续处理。
</system-notification>
Codex Review 改进:原方案遍历项目所有会话,存在历史会话膨胀和多任务通知丢失风险。改为在 task 上记录 origin_session_id,点对点通知创建者。
具体做法:
_ensure_session_board_project() 或 _maybe_auto_run_task() 创建任务时,记录发起任务的前台 session_id 到 task 的 origin_session_id 字段origin_session_id 指向的会话origin_session_id 为空(历史任务),回退到项目会话遍历方式关于 origin_session_id 的获取:看板任务通常由 CLI blade kanban create 创建(走 REST API),REST 请求本身不包含"发起方 session_id"。两种获取方式:
blade kanban create 命令的执行环境中有 BLADE_SESSION_ID 环境变量(沙盒注入),在请求 body 中附带 origin_session_idX-Session-Id header 传入(更通用)推荐方案 1(CLI 传入),因为 CLI 是智能体创建任务的唯一路径,改动最小。
| 文件 | 改动 | 说明 |
|---|---|---|
host/agent_board/models.py |
修改 | AgentBoardTask 新增 origin_session_id: str | None 字段 |
host/agent_board/schema.py |
修改 | agent_board_task 表新增 origin_session_id TEXT 列 |
host/agent_board/store.py |
修改 | create_task() 接受 origin_session_id 参数 |
server/routes/agent_board.py |
修改 | ① CreateBoardTaskRequest 新增 origin_session_id② _run_task_headless finally 块调用通知 |
server/routes/_agent_board_notify.py |
新增 | notify_origin_session() 和 build_task_notification_message() |
cli/internal/cmd/kanban.go |
修改 | kanban create 自动读取 BLADE_SESSION_ID 并传入 origin_session_id |
def build_task_notification_message(task, runtime_summary) -> str: """构造 <system-notification> 消息。""" summary = runtime_summary.get("runtime_description", "") err_msg = runtime_summary.get("runtime_err_msg", "") pr_url = runtime_summary.get("runtime_pr_url", "") status_label = "失败" if err_msg else "待审核" parts = [ f"看板任务「{task.title}」(#{task.id}) 已完成执行," f"状态已更新为「{status_label}」。", ] if summary: parts.append(f"结果摘要:{summary}") if pr_url: parts.append(f"PR 链接:{pr_url}") if err_msg: parts.append(f"错误信息:{err_msg}") parts.append("请根据结果进行后续处理。") body = "\n".join(parts) return f"<system-notification>\n{body}\n</system-notification>" async def notify_origin_session( task: AgentBoardTask, exclude_session_id: str | None, ) -> None: """无头任务完成后,向来源会话注入通知。异步调度,不阻塞 finally。""" # 点对点通知:优先使用 origin_session_id target_sid = task.origin_session_id if not target_sid or target_sid == exclude_session_id: # 回退:从项目会话中找 conversation 类型 target_sid = _find_fallback_conversation_session( task.project_id, exclude_session_id ) if not target_sid: return engine = get_engine() session = engine.session_manager.index.get(target_sid) if session is None: return runtime_summary = task_runtime_summary(task) message = build_task_notification_message(task, runtime_summary) # 会话正在对话中 → 追加待处理消息 if engine._has_running_chat(target_sid): append_pending_user_message(target_sid, message) return # 会话空闲 → 启动新对话轮次 sio = get_socketio_server() bridge = create_socket_bridge(sio, engine, target_sid) try: await engine.chat(target_sid, message, event_handler=bridge) except RuntimeError as exc: if "already running" in str(exc): # TOCTOU 竞态:判断时空闲,调用时已有 chat → 降级为消息注入 append_pending_user_message(target_sid, message) else: logger.exception("notify origin session %s failed", target_sid) except Exception: logger.exception("notify origin session %s failed", target_sid)
async def _run_auto_task() -> None: try: await engine.chat(...) except Exception: ... finally: get_background_sessions().pop(session_id, None) store = get_agent_board_store() updated_task = store.get_task(task.id, project_id) if updated_task is not None: sync_task_runtime_events(updated_task) _settle_session_from_submit_result(updated_task) if updated_task.status == AgentBoardTaskStatus.RUNNING: updated_task = store.update_task( updated_task.id, project_id, status=AgentBoardTaskStatus.REVIEW, ) if updated_task is not None: await emit_board_task_changed(project_id, updated_task, "updated") # ── 新增:异步通知前台会话(fire-and-forget)── asyncio.create_task( notify_origin_session(updated_task, exclude_session_id=session_id), name=f"board-notify:{updated_task.id}", )
// kanban create 命令中,自动读取 BLADE_SESSION_ID originSessionID := os.Getenv("BLADE_SESSION_ID") body := map[string]interface{}{ "title": title, "description": description, "assignee_type": assigneeType, "assignee_name": assigneeName, } if originSessionID != "" { body["origin_session_id"] = originSessionID }
| 能力 | 已有机制 | 来源 |
|---|---|---|
| 通知消息格式 | <system-notification> |
_engine_runtime.py 后台 Bash 任务通知 |
| 空闲会话唤醒 | engine.chat() + SocketBridge |
_on_background_task_completed 模式 |
| 对话中会话注入 | append_pending_user_message() |
chat:append Socket.IO 事件链路 |
| 浏览器通知 | agent-board:board-task:changed |
已有 Socket.IO 事件 |
| 前端渲染 | turn:start/patch/end |
SocketBridge 投影事件 |
| 异步调度模式 | asyncio.create_task() |
_on_background_task_completed 中的 _background_wakeup_tasks |
不引入任何新 Socket.IO 事件或协议变更,SDK 调用方无需适配。
任务卡片状态从 "执行中" 自动变为 "待审核"(已有 agent-board:board-task:changed)
前台智能体的对话框中出现新的对话轮次:智能体收到系统通知,自动审阅结果并向用户汇报
用户看到整个流程是自动衔接的 — 任务分配、执行、结果回传、审阅,全部由系统驱动
通知消息写入前台会话的历史记录,智能体的响应也持久化
用户重新打开浏览器,加载会话历史时能看到完整的通知和智能体响应
| 问题 | Codex 反馈 | 处理方式 |
|---|---|---|
| finally 阻塞 | 直接 await engine.chat() 会阻塞 finally 块,每个符合条件的 session 都要跑完整个 LLM turn |
改用 asyncio.create_task() fire-and-forget 调度通知,finally 块立即返回 |
| TOCTOU 竞态 | 检查 _has_running_chat() 时空闲,但 engine.chat() 调用时已有其他 chat 启动,通知被丢弃 |
捕获 RuntimeError("already running") 后降级为 append_pending_user_message(),确保通知不丢失 |
| 历史会话膨胀 | 遍历所有 project sessions 会触发大量无效 LLM turn,成本和延迟线性放大 | 改为点对点通知:task 记录 origin_session_id,只通知创建者;回退时只取最近的一个 conversation session |
| 多任务同时完成 | 两个任务同时完成并尝试唤醒同一个 session,第二个被静默丢弃 | TOCTOU 降级机制覆盖了此场景:第二个通知走 append_pending_user_message,在下一次工具调用后被消费 |
| 模式一致性 | _on_background_task_completed 有去重和 recheck 机制,建议在 host 层封装 |
暂不在 host 层封装:看板通知是 server 层的 agent_board 功能,不属于 engine 核心职责。如果后续有更多跨会话通知需求,再统一抽象 |
| pending message 时机 | 如果注入时 loop 已进入收尾阶段,通知会滞留到下一次 chat | 可接受的延迟:通知会在下一轮用户消息或后台唤醒时被消费。比丢失好得多 |
| 场景 | 处理方式 |
|---|---|
origin_session_id 为空(历史任务或 UI 创建) |
回退到项目 conversation session 遍历,取最近一个 |
| 目标会话正在对话中 | append_pending_user_message() 注入消息队列 |
| 目标会话空闲 | asyncio.create_task 调度 engine.chat() |
| 目标会话已过期/不存在 | session_manager.index.get() 返回 None,跳过 |
| TOCTOU 竞态(判断空闲后被抢占) | 捕获 RuntimeError("already running"),降级为 append_pending_user_message |
| 多任务同时完成通知同一会话 | 第一个启动 chat,后续的降级为 pending message 排队 |
| 无头任务执行失败 | 通知消息中包含 err_msg,前台智能体据此决定下一步 |
| 通知本身失败 | catch 异常并 log,不影响任务状态更新和看板广播 |
board-task:completed 等新事件agent_loop 或事件系统AgentBoardTask model + schema 新增 origin_session_id 字段blade kanban create 自动传入 BLADE_SESSION_IDcreate_board_task 接受并存储 origin_session_idserver/routes/_agent_board_notify.py_run_task_headless finally 块,asyncio.create_task 调度通知