看板任务完成通知机制

技术设计文档 · 2026-06-26 · v2 (Codex Review 后修订)

问题

当前看板任务的生命周期如下:

  1. 前台智能体(会话 A)通过 blade kanban create 创建任务
  2. 服务端 _maybe_auto_run_task 自动创建无头会话 B 执行任务
  3. 会话 A 完成当前对话轮次后进入空闲
  4. 无头会话 B 执行完毕,调用 submit_result,任务状态更新为 review

缺失环节:步骤 4 完成后,前台智能体(会话 A)完全不知道任务已完成。agent-board:board-task:changed 事件只广播到浏览器的 project room,不会进入会话 A 的对话上下文。用户必须手动告知前台智能体。

设计目标

核心方案

概述

_run_task_headlessfinally 块中,无头任务完成后,找到该任务的来源会话(origin_session_id),向其注入一条 <system-notification> 消息。通知以 asyncio.create_task() 异步调度,不阻塞 finally 块的收尾流程。

通知时序

无头会话 B 完成
    │
    ▼
_run_task_headless finally 块
    │
    ├── 1. sync_task_runtime_events()            ← 已有
    ├── 2. _settle_session_from_submit_result()   ← 已有
    ├── 3. 更新任务状态 → review                 ← 已有
    ├── 4. emit_board_task_changed()             ← 已有(通知浏览器)
    │
    └── 5. asyncio.create_task(_notify_origin_session(...))  ← 新增
            │                                          fire-and-forget
            ▼
        _notify_origin_session()
            │
            ├── 查找 origin_session_id
            │
            ├── 会话正在对话中?
            │   └── YES → append_pending_user_message()
            │
            └── 会话空闲?
                └── YES → engine.chat(notification_msg, event_handler=bridge)
                    │
                    └── 捕获 RuntimeError("already running")?
                        └── 降级 → append_pending_user_message()

通知消息格式

复用已有的 <system-notification> 格式(与后台 Bash 任务完成通知一致):

<system-notification>
看板任务「{task.title}」(#{task.id}) 已完成执行,状态已更新为「待审核」。
结果摘要:{summary}
PR 链接:{pr_url}       ← 仅当存在时
错误信息:{err_msg}      ← 仅当失败时
请根据结果进行后续处理。
</system-notification>

前台会话查找策略:点对点通知

Codex Review 改进:原方案遍历项目所有会话,存在历史会话膨胀和多任务通知丢失风险。改为在 task 上记录 origin_session_id,点对点通知创建者。

具体做法:

  1. _ensure_session_board_project()_maybe_auto_run_task() 创建任务时,记录发起任务的前台 session_id 到 task 的 origin_session_id 字段
  2. 任务完成时,直接通知 origin_session_id 指向的会话
  3. 如果 origin_session_id 为空(历史任务),回退到项目会话遍历方式

关于 origin_session_id 的获取:看板任务通常由 CLI blade kanban create 创建(走 REST API),REST 请求本身不包含"发起方 session_id"。两种获取方式:

  1. CLI 传入blade kanban create 命令的执行环境中有 BLADE_SESSION_ID 环境变量(沙盒注入),在请求 body 中附带 origin_session_id
  2. REST header:通过 X-Session-Id header 传入(更通用)

推荐方案 1(CLI 传入),因为 CLI 是智能体创建任务的唯一路径,改动最小。

代码变更

变更文件清单

文件 改动 说明
host/agent_board/models.py 修改 AgentBoardTask 新增 origin_session_id: str | None 字段
host/agent_board/schema.py 修改 agent_board_task 表新增 origin_session_id TEXT
host/agent_board/store.py 修改 create_task() 接受 origin_session_id 参数
server/routes/agent_board.py 修改 CreateBoardTaskRequest 新增 origin_session_id
_run_task_headless finally 块调用通知
server/routes/_agent_board_notify.py 新增 notify_origin_session()build_task_notification_message()
cli/internal/cmd/kanban.go 修改 kanban create 自动读取 BLADE_SESSION_ID 并传入 origin_session_id

核心函数伪代码

_agent_board_notify.py

def build_task_notification_message(task, runtime_summary) -> str:
    """构造 <system-notification> 消息。"""
    summary = runtime_summary.get("runtime_description", "")
    err_msg = runtime_summary.get("runtime_err_msg", "")
    pr_url  = runtime_summary.get("runtime_pr_url", "")
    status_label = "失败" if err_msg else "待审核"

    parts = [
        f"看板任务「{task.title}」(#{task.id}) 已完成执行,"
        f"状态已更新为「{status_label}」。",
    ]
    if summary:
        parts.append(f"结果摘要:{summary}")
    if pr_url:
        parts.append(f"PR 链接:{pr_url}")
    if err_msg:
        parts.append(f"错误信息:{err_msg}")
    parts.append("请根据结果进行后续处理。")
    body = "\n".join(parts)
    return f"<system-notification>\n{body}\n</system-notification>"


async def notify_origin_session(
    task: AgentBoardTask,
    exclude_session_id: str | None,
) -> None:
    """无头任务完成后,向来源会话注入通知。异步调度,不阻塞 finally。"""
    # 点对点通知:优先使用 origin_session_id
    target_sid = task.origin_session_id
    if not target_sid or target_sid == exclude_session_id:
        # 回退:从项目会话中找 conversation 类型
        target_sid = _find_fallback_conversation_session(
            task.project_id, exclude_session_id
        )
    if not target_sid:
        return

    engine = get_engine()
    session = engine.session_manager.index.get(target_sid)
    if session is None:
        return

    runtime_summary = task_runtime_summary(task)
    message = build_task_notification_message(task, runtime_summary)

    # 会话正在对话中 → 追加待处理消息
    if engine._has_running_chat(target_sid):
        append_pending_user_message(target_sid, message)
        return

    # 会话空闲 → 启动新对话轮次
    sio = get_socketio_server()
    bridge = create_socket_bridge(sio, engine, target_sid)
    try:
        await engine.chat(target_sid, message, event_handler=bridge)
    except RuntimeError as exc:
        if "already running" in str(exc):
            # TOCTOU 竞态:判断时空闲,调用时已有 chat → 降级为消息注入
            append_pending_user_message(target_sid, message)
        else:
            logger.exception("notify origin session %s failed", target_sid)
    except Exception:
        logger.exception("notify origin session %s failed", target_sid)

agent_board.py — _run_task_headless 修改

async def _run_auto_task() -> None:
    try:
        await engine.chat(...)
    except Exception:
        ...
    finally:
        get_background_sessions().pop(session_id, None)
        store = get_agent_board_store()
        updated_task = store.get_task(task.id, project_id)
        if updated_task is not None:
            sync_task_runtime_events(updated_task)
            _settle_session_from_submit_result(updated_task)
            if updated_task.status == AgentBoardTaskStatus.RUNNING:
                updated_task = store.update_task(
                    updated_task.id, project_id,
                    status=AgentBoardTaskStatus.REVIEW,
                )
            if updated_task is not None:
                await emit_board_task_changed(project_id, updated_task, "updated")
                # ── 新增:异步通知前台会话(fire-and-forget)──
                asyncio.create_task(
                    notify_origin_session(updated_task, exclude_session_id=session_id),
                    name=f"board-notify:{updated_task.id}",
                )

cli/internal/cmd/kanban.go — create 命令修改

// kanban create 命令中,自动读取 BLADE_SESSION_ID
originSessionID := os.Getenv("BLADE_SESSION_ID")
body := map[string]interface{}{
    "title":       title,
    "description": description,
    "assignee_type": assigneeType,
    "assignee_name": assigneeName,
}
if originSessionID != "" {
    body["origin_session_id"] = originSessionID
}

复用已有机制

能力 已有机制 来源
通知消息格式 <system-notification> _engine_runtime.py 后台 Bash 任务通知
空闲会话唤醒 engine.chat() + SocketBridge _on_background_task_completed 模式
对话中会话注入 append_pending_user_message() chat:append Socket.IO 事件链路
浏览器通知 agent-board:board-task:changed 已有 Socket.IO 事件
前端渲染 turn:start/patch/end SocketBridge 投影事件
异步调度模式 asyncio.create_task() _on_background_task_completed 中的 _background_wakeup_tasks

不引入任何新 Socket.IO 事件或协议变更,SDK 调用方无需适配。

用户可见效果

场景 1:用户开着浏览器观察

看板

任务卡片状态从 "执行中" 自动变为 "待审核"(已有 agent-board:board-task:changed

对话

前台智能体的对话框中出现新的对话轮次:智能体收到系统通知,自动审阅结果并向用户汇报

体验

用户看到整个流程是自动衔接的 — 任务分配、执行、结果回传、审阅,全部由系统驱动

场景 2:用户关闭了浏览器

服务端

通知消息写入前台会话的历史记录,智能体的响应也持久化

再次打开

用户重新打开浏览器,加载会话历史时能看到完整的通知和智能体响应

Codex Review 反馈及处理

问题 Codex 反馈 处理方式
finally 阻塞 直接 await engine.chat() 会阻塞 finally 块,每个符合条件的 session 都要跑完整个 LLM turn 改用 asyncio.create_task() fire-and-forget 调度通知,finally 块立即返回
TOCTOU 竞态 检查 _has_running_chat() 时空闲,但 engine.chat() 调用时已有其他 chat 启动,通知被丢弃 捕获 RuntimeError("already running") 后降级为 append_pending_user_message(),确保通知不丢失
历史会话膨胀 遍历所有 project sessions 会触发大量无效 LLM turn,成本和延迟线性放大 改为点对点通知:task 记录 origin_session_id,只通知创建者;回退时只取最近的一个 conversation session
多任务同时完成 两个任务同时完成并尝试唤醒同一个 session,第二个被静默丢弃 TOCTOU 降级机制覆盖了此场景:第二个通知走 append_pending_user_message,在下一次工具调用后被消费
模式一致性 _on_background_task_completed 有去重和 recheck 机制,建议在 host 层封装 暂不在 host 层封装:看板通知是 server 层的 agent_board 功能,不属于 engine 核心职责。如果后续有更多跨会话通知需求,再统一抽象
pending message 时机 如果注入时 loop 已进入收尾阶段,通知会滞留到下一次 chat 可接受的延迟:通知会在下一轮用户消息或后台唤醒时被消费。比丢失好得多

边界情况处理

场景 处理方式
origin_session_id 为空(历史任务或 UI 创建) 回退到项目 conversation session 遍历,取最近一个
目标会话正在对话中 append_pending_user_message() 注入消息队列
目标会话空闲 asyncio.create_task 调度 engine.chat()
目标会话已过期/不存在 session_manager.index.get() 返回 None,跳过
TOCTOU 竞态(判断空闲后被抢占) 捕获 RuntimeError("already running"),降级为 append_pending_user_message
多任务同时完成通知同一会话 第一个启动 chat,后续的降级为 pending message 排队
无头任务执行失败 通知消息中包含 err_msg,前台智能体据此决定下一步
通知本身失败 catch 异常并 log,不影响任务状态更新和看板广播

不做什么

实现计划

  1. AgentBoardTask model + schema 新增 origin_session_id 字段
  2. CLI blade kanban create 自动传入 BLADE_SESSION_ID
  3. REST API create_board_task 接受并存储 origin_session_id
  4. 新增 server/routes/_agent_board_notify.py
  5. 修改 _run_task_headless finally 块,asyncio.create_task 调度通知
  6. 添加测试用例(空闲唤醒 / 对话中注入 / TOCTOU 降级 / origin 为空回退)