llm-wiki source · src-nanobot-architecture v0.2.0 · 2026-05-20

nanobot 架构与设计思路

HKUDS 出品的极简个人 AI Agent 框架(Python ≥3.11, MIT, v0.2.0)— 事件驱动 8 态状态机 · 17 渠道 · 7+ 厂商 · MCP · ~16k 行可读小内核

architecture ai-agent llm python chat-bot mcp
原文 & 仓库
raw 分析:raw/nanobot-architecture-analysis.md · 上游: HKUDS/nanobot · 本次 ingest 来源:googs1025/nanobot(fork)

一句话定位

nanobot 是 HKUDS 的极简个人 AI Agent 框架,定位为「Claude Code / Codex / OpenClaw 风格的轻量级长跑 Agent」。 它通过一个事件驱动的 8 态 Agent 状态机,把 17 个聊天渠道、7+ 家 LLM 厂商、MCP、技能 / 记忆 / Cron / Heartbeat 编织成一个 ~16k 行的可读小内核—— 核心路径让开发者「能看懂、能改」,而周边能力(channel / provider / skill)通过 pkgutil 自动发现 + entry_points 插件机制接入,主代码不需要动。

核心架构图

flowchart TB subgraph EXT["💬 Chat Platforms (外部)"] direction LR P1[Telegram] ~~~ P2[Discord] ~~~ P3[Slack] ~~~ P4[Feishu] P5[WeChat / QQ / DingTalk / WhatsApp / WeCom / MSTeams] P6[Email · Matrix · WebSocket(WebUI) · CLI · ...] end subgraph CH["📡 Channels Layer · nanobot/channels/"] direction TB REG["registry.py
pkgutil.iter_modules
+ entry_points
⇒ built-in shadows ext."] MGR["manager.py — ChannelManager
_init_channels / start_all
_dispatch_outbound:
• coalesce _stream_delta
• _reasoning_* gated by show_reasoning
• SHA1 fingerprint 去重
• 指数退避 1s / 2s / 4s"] BASE["base.py — BaseChannel(ABC)
start / stop / send /
send_delta / send_reasoning_*
pairing.py: DM 未授权 → 配对码"] REG -->|discover_all| MGR end subgraph BUS["🚌 Message Bus · nanobot/bus/queue.py"] direction LR QIN["asyncio.Queue<InboundMessage>"] QOUT["asyncio.Queue<OutboundMessage>"] end subgraph LOOP["🧠 AgentLoop · nanobot/agent/loop.py · 事件驱动状态机"] direction TB RUN["run() consume_inbound"] PRIO{"priority cmd?
/stop /restart"} ACTIVE{"session 有
活动 task?"} INJ[["pending_queue
mid-turn 注入"]] DISP["_dispatch(msg)
per-session asyncio.Lock
+ Semaphore(default 3)"] subgraph SM["state machine · _TRANSITIONS table"] direction LR S1((RESTORE)) -->|ok| S2((COMPACT)) S2 -->|ok| S3((COMMAND)) S3 -.->|shortcut| S8((DONE)) S3 -->|dispatch| S4((BUILD)) S4 -->|ok| S5((RUN)) S5 -->|ok| S6((SAVE)) S6 -->|ok| S7((RESPOND)) end HOLD["持有:
ContextBuilder · ToolRegistry · AgentRunner
SubagentManager · SessionManager · Consolidator
AutoCompact · Dream · CommandRouter
mcp_stacks · _pending_queues · _active_tasks"] RUN --> PRIO PRIO -->|yes| S8 PRIO -->|no| ACTIVE ACTIVE -->|yes| INJ ACTIVE -->|no| DISP DISP --> S1 S5 -.->|checkpoint| RUNNER[AgentRunner] S7 -.->|OutMsg| QOUT end subgraph PROV["🔌 Providers Layer · nanobot/providers/"] direction TB FACT["factory.make_provider(config)
_make_provider_core() backend switch:
anthropic · azure_openai · bedrock · github_copilot
openai_codex · openai_compat(默认) · openai_responses"] FALL["FallbackProvider(primary, [fallbacks], factory)
• 请求级 failover
• circuit breaker · 3 fail × 60s cooldown
• has_streamed → 已吐字就放弃 failover
• _NON_FALLBACK (auth/quota/content_filter) 直接返回"] ABS["LLMProvider(ABC)
chat / chat_stream
+ 内置重试 · 结构化错误码"] FACT --> FALL FALL --> ABS end subgraph SUP["🛠️ 支撑设施 · 与 AgentLoop 平级"] direction LR C1["cron/service.py
at / every / cron
FileLock 持久化"] C2["heartbeat/service.py
2-phase LLM 决定
skip/run"] C3["api/server.py
OpenAI-Compatible API
+ SSE 流"] C4["cli/commands.py
Typer CLI
onboard/agent/gateway/..."] C5["nanobot.py
Programmatic facade
Nanobot.from_config().run()"] C6["agent/tools/
read_file/exec/grep
web_*/notebook/spawn/mcp"] C7["skills/
cron · github · memory
summarize · tmux · ..."] end EXT -->|inbound| CH CH -->|publish_inbound| QIN QIN -->|consume_inbound| LOOP LOOP -->|provider.chat / chat_stream| PROV PROV -.->|LLMResponse| LOOP LOOP -->|publish_outbound| QOUT QOUT --> CH CH -->|outbound| EXT LOOP -.->|平级协作| SUP classDef ext fill:#1b2333,stroke:#3a4256,color:#cbd5e1; classDef bus fill:#1a2d2a,stroke:#2f5d54,color:#a7e3d8; classDef loop fill:#241a2e,stroke:#5a3e72,color:#d9b9f0; classDef prov fill:#2e1a1a,stroke:#7a3e3e,color:#f0b9b9; classDef chs fill:#1a2440,stroke:#3e5a8a,color:#b9c8f0; class P1,P2,P3,P4,P5,P6 ext; class QIN,QOUT bus; class RUN,PRIO,ACTIVE,INJ,DISP,HOLD,RUNNER loop; class S1,S2,S3,S4,S5,S6,S7,S8 loop; class FACT,FALL,ABS prov; class REG,MGR,BASE chs;
图 1 · nanobot 整体架构(Channel ↔ Bus ↔ AgentLoop ↔ Provider,支撑设施平级旁挂)
📐 查看原始 ASCII 图(与上图等价)
┌──────────────────────────────────────────────────────────────────────────────┐
│                            Chat Platforms (外部)                              │
│  Telegram · Discord · Slack · Feishu · WeChat · QQ · Email · Matrix ·         │
│  DingTalk · WhatsApp · WeCom · MSTeams · WebSocket(WebUI) · CLI · ...         │
└────────────┬───────────────────────────────────────────────────▲─────────────┘
             │ inbound                                            │ outbound
┌────────────▼────────────────────────────────────────────────────┴─────────────┐
│  Channels Layer  (nanobot/channels/)                                           │
│  ┌──────────────────────────┐   ┌─────────────────────────────────────────┐   │
│  │  registry.py             │   │  manager.py — ChannelManager            │   │
│  │  pkgutil.iter_modules    │──▶│   _init_channels(): discover_all()      │   │
│  │  + entry_points          │   │   start_all(): channel.start()×N        │   │
│  │  ⇒ built-in shadows ext. │   │   _dispatch_outbound(): coalesce+retry  │   │
│  └──────────────────────────┘   │     - _stream_delta 合并                │   │
│                                 │     - _reasoning_* 仅在 show_reasoning  │   │
│                                 │     - fingerprint 去重 _send_with_retry │   │
│  base.py — BaseChannel(ABC)     │     - 指数退避 1s/2s/4s                 │   │
│  start / stop / send /          └─────────────────────────────────────────┘   │
│  send_delta / send_reasoning_*    pairing.py: DM 未授权 → 配对码              │
└────────────┬───────────────────────────────────────────────────▲──────────────┘
             │ publish_inbound                                    │ publish_outbound
┌────────────▼────────────────────────────────────────────────────┴──────────────┐
│  Message Bus  (nanobot/bus/queue.py)                                           │
│      asyncio.Queue[InboundMessage]   asyncio.Queue[OutboundMessage]            │
└────────────┬───────────────────────────────────────────────────▲───────────────┘
             │ consume_inbound                                    │
┌────────────▼────────────────────────────────────────────────────┴───────────────┐
│  AgentLoop  (nanobot/agent/loop.py)  — 事件驱动状态机                            │
│                                                                                  │
│   run() ──▶ inbound → priority cmd? ─yes─▶ inline dispatch (/stop /restart)      │
│                       │                                                          │
│                       no                                                         │
│                       ▼                                                          │
│              session 有活动 task? ─yes─▶ pending_queue (mid-turn 注入)            │
│                       │                                                          │
│                       no                                                         │
│                       ▼                                                          │
│   _dispatch(msg) ── per-session asyncio.Lock + 全局 Semaphore(默认 3)            │
│        │                                                                         │
│        ▼  state machine (_TRANSITIONS table)                                     │
│   ┌──────────┐ ok ┌─────────┐ ok ┌─────────┐ shortcut ┌──────┐                   │
│   │ RESTORE  │───▶│ COMPACT │───▶│ COMMAND │─────────▶│ DONE │                   │
│   └──────────┘    └─────────┘    └─────────┘          └──────┘                   │
│                                       │ dispatch                                 │
│                                       ▼                                          │
│                                  ┌────────┐ ok ┌─────┐ ok ┌──────┐ ok ┌───────┐  │
│                                  │ BUILD  │───▶│ RUN │───▶│ SAVE │───▶│RESPOND│  │
│                                  └────────┘    └──┬──┘    └──────┘    └───┬───┘  │
│                                                   │ checkpoint            │      │
│                                                   ▼                       ▼      │
│                                            AgentRunner               OutMsg→Bus  │
│                                                                                  │
│   持有:ContextBuilder · ToolRegistry · AgentRunner · SubagentManager            │
│          SessionManager · Consolidator · AutoCompact · Dream · CommandRouter     │
│          mcp_stacks · _pending_queues · _active_tasks · _concurrency_gate        │
└────────────┬─────────────────────────────────────────────────────────────────────┘
             │ provider.chat / chat_stream
┌────────────▼───────────────────────────────────────────────────────────────────┐
│  Providers Layer  (nanobot/providers/)                                          │
│     factory.make_provider(config)                                               │
│       └─▶ _make_provider_core() — backend switch:                               │
│             anthropic | azure_openai | bedrock | github_copilot                 │
│             openai_codex | openai_compat (默认) | openai_responses              │
│       └─▶ FallbackProvider(primary, [fallbacks], factory)                       │
│             - 请求级 failover;circuit breaker (3 fail × 60s cooldown)          │
│             - has_streamed → 已吐字就放弃失败转移,避免重复输出                  │
│             - _NON_FALLBACK 错误(auth/quota/content_filter)直接返回            │
│   LLMProvider(ABC): chat / chat_stream / 内置重试政策 + 结构化错误码            │
└────────────────────────────────────────────────────────────────────────────────┘

模块分层

层 / 模块职责
CLI / SDK 入口Typer CLI(onboard / agent / gateway / ...)+ 程序化外观 Nanobot.from_config().run()
渠道层BaseChannel 抽象;registry.discover_all 用 pkgutil + entry_points 自动发现;ChannelManager 启动 / 路由 / 重试 / 流式合并 / 去重
消息总线两条 asyncio.QueueInboundMessage / OutboundMessage dataclass,channel 与 agent 解耦的唯一桥梁
Agent 内核AgentLoop 8 态状态机;AgentRunner provider-agnostic tool-using 循环;checkpoint / mid-turn 注入
上下文构建ContextBuilder 拼 system prompt;SkillsLoader workspace + builtin 合并;MemoryStore + Consolidator + Dream 两阶段记忆
工具集ToolRegistry 注册 filesystem / shell / web / search / mcp / notebook / spawn / message / ...,按 OpenAI tool-call schema 暴露
Provider 层LLMProvider ABC + 内置重试;make_provider 工厂;FallbackProvider 请求级 failover + 熔断;spec 注册表标记 OAuth/local/direct
会话Session 持久化到 ~/.nanobot/sessions/<key>.jsongoal_state 支撑 /goal 长目标
命令路由三档优先级:priority(/stop)、exact、prefix;14 个内置斜杠命令(/new /model /history /goal /dream* /pairing /help /status /restart
调度 & 主动唤起CronService(at/every/cron + ZoneInfo + FileLock);HeartbeatService 周期 LLM 决定 skip/run
OpenAI 兼容 API把 nanobot 当成上游 LLM 暴露给外部工具,支持 SSE 流
WebUIWebUI 编译产物随 wheel 发布;WebSocket 渠道托管 + 静态文件

分层关键约束

关键数据流

Telegram 用户消息从触发到回复的端到端路径:

sequenceDiagram autonumber participant U as 👤 Telegram 用户 participant TC as TelegramChannel
(channels/base.py:199) participant BUS as MessageBus
(bus/queue.py) participant AL as AgentLoop.run()
(loop.py:789) participant DP as _dispatch + SM
(loop.py:864) participant AR as AgentRunner.run()
(runner.py:112) participant FP as FallbackProvider participant CM as ChannelManager
_dispatch_outbound
(channels/manager.py:275) U->>TC: msg Note over TC: is_allowed()
allowFrom / 配对码兜底
supports_streaming → meta._wants_stream TC->>BUS: publish_inbound(InboundMessage) BUS->>AL: consume_inbound alt priority cmd (/stop /restart /status) AL-->>DP: 立即派发
取消 active_tasks else session 有 pending AL-->>AL: put → pending_queue
(mid-turn 注入) else 新任务 AL->>DP: asyncio.create_task(_dispatch) end Note over DP: Lock(session) ∩ Semaphore(3)
注册 pending_queue DP->>DP: RESTORE → COMPACT → COMMAND DP->>DP: BUILD → RUN DP->>AR: spec loop for iteration AR->>FP: chat / chat_stream Note over FP: transparently failover
has_streamed 防拼接错乱 FP-->>AR: LLMResponse alt has_tool_calls AR->>AR: 并行 / 串行执行工具
_emit_checkpoint 持久化
on_progress / on_stream → bus AR->>AR: _try_drain_injections
把 pending_queue 新消息插入 else finish_reason == "stop" AR-->>DP: break end end DP->>DP: SAVE: session.add_message
sessions.save · consolidator DP->>BUS: RESPOND: publish_outbound(OutboundMessage) BUS->>CM: consume_outbound Note over CM: _coalesce_stream_deltas 合并连续 delta
SHA1 指纹去重
_reasoning_* gated by show_reasoning
_send_with_retry 1s/2s/4s CM->>TC: send / send_delta / send_reasoning_delta TC->>U: reply
图 2 · 端到端数据流(用户消息 → 渠道 → 总线 → Agent 状态机 → Runner ↔ Provider → 回流)
📐 查看原始 ASCII 图(与上图等价)
[Telegram 用户] ──msg──▶ TelegramChannel._handle_message()  (channels/base.py:199)
                            │   ├─ is_allowed() → allowFrom / 配对码兜底
                            │   └─ supports_streaming → meta["_wants_stream"]=True
                            ▼
                       bus.publish_inbound(InboundMessage)        (bus/queue.py)
                            ▼
                       AgentLoop.run() consume_inbound            (loop.py:789)
                            ├─ priority cmd? → 直接派发 (/stop /restart /status)
                            ├─ session 有 pending? → put 到该 session 的注入队列
                            └─ asyncio.create_task(_dispatch(msg))
                            ▼
                       _dispatch():  Lock(session) ∩ Semaphore(3)  (loop.py:864)
                            │   注册 pending_queue → 接收 mid-turn 注入
                            ▼
                       状态机:RESTORE → COMPACT → COMMAND → BUILD → RUN → SAVE → RESPOND
                            │
                            ▼ RUN 阶段
                       AgentRunner.run(spec)            (agent/runner.py:112)
                            │
                  ┌─────────┴─────────┐
                  │   for iteration:   │
                  │     provider.chat / chat_stream → LLMResponse
                  │     ├─ FallbackProvider 在此 transparently failover
                  │     ├─ has_tool_calls? → 并行 / 串行执行工具
                  │     │     - 工具结果回填到 messages
                  │     │     - 每次执行后 _emit_checkpoint (持久化到 session.metadata)
                  │     │     - on_progress / on_stream → bus.publish_outbound
                  │     ├─ _try_drain_injections → 把 pending_queue 里的用户新消息
                  │     │     插入到当前对话末尾(保持 role 交替)
                  │     └─ finish_reason == "stop" → break
                  └─────────┬─────────┘
                            ▼
                       SAVE: session.add_message / sessions.save / consolidator
                       RESPOND: assemble OutboundMessage → bus.publish_outbound
                            ▼
                       ChannelManager._dispatch_outbound()       (channels/manager.py:275)
                            ├─ _coalesce_stream_deltas: 合并连续 _stream_delta
                            ├─ _should_suppress_outbound: SHA1 指纹去重
                            ├─ _reasoning_delta/_end → 仅在 channel.show_reasoning=True
                            └─ _send_with_retry: 1s/2s/4s 指数退避
                            ▼
                       TelegramChannel.send / send_delta / send_reasoning_delta
                            ▼
                       [Telegram 用户]

中断与恢复路径

flowchart TD U["👤 用户发送 /stop"] --> R["AgentLoop.run() 检到 priority cmd"] R --> D["commands.dispatch_priority(cmd_stop)"] D --> C["取消该 session 的 active_tasks"] C --> E["_dispatch() 收到 CancelledError"] E --> S1["session = sessions.get_or_create(key)"] S1 --> S2["_restore_runtime_checkpoint(session)"] S2 -. 读取 .-> META[("session.metadata
runtime_checkpoint:
• phase (final_response / tool_pending)
• iteration
• assistant_message (已生成的部分回复)
• completed_tool_results
• pending_tool_calls")] S2 --> S3["_clear_pending_user_turn(session)"] S3 --> S4["sessions.save(session)"] S4 --> F["finally: 把 pending_queue 里残留的
InboundMessage 重新 publish_inbound 回总线"] F --> N["✉️ 下一次 inbound"] N --> RESTORE["_state_restore (loop.py:1220)"] RESTORE --> H["读 runtime_checkpoint + pending_user_turn
把上次中断的上下文物化进 history"] H --> CONT["新消息接着这段历史继续推理"] CKPT[["💾 每次工具执行后
_emit_checkpoint 已写入"]] -. 提前持久化 .-> META classDef hot fill:#3a1f1f,stroke:#7a3e3e,color:#f0b9b9; classDef cold fill:#1f2a3a,stroke:#3e5a7a,color:#b9d4f0; classDef store fill:#2a2a1f,stroke:#7a7a3e,color:#f0e8b9; class U,R,D,C,E hot; class S1,S2,S3,S4,F,N,RESTORE,H,CONT cold; class META,CKPT store;
图 3 · 中断恢复(checkpoint 持续写入 → /stop 取消 → restore 续接 — 把"恢复"做成状态机一等公民)
📐 查看原始 ASCII 图(与上图等价)
用户发送 /stop ──▶ AgentLoop.run() 检到 priority cmd
                    └─▶ commands.dispatch_priority(cmd_stop)
                          └─▶ 取消该 session 的 active_tasks
                                  │
                                  ▼
                          _dispatch() 收到 CancelledError
                                  │
                                  ├─ session = sessions.get_or_create(key)
                                  ├─ _restore_runtime_checkpoint(session)
                                  │   ↑ runtime_checkpoint 在每次工具执行后 _emit_checkpoint 时
                                  │     已经写进了 session.metadata,包含:
                                  │     · phase (final_response / tool_pending)
                                  │     · iteration
                                  │     · assistant_message(已生成的部分回复)
                                  │     · completed_tool_results
                                  │     · pending_tool_calls
                                  ├─ _clear_pending_user_turn(session)
                                  └─ sessions.save(session)
                                  ▼
                          finally: 把 pending_queue 里残留的 InboundMessage
                                   重新 publish_inbound 回总线(不丢消息)

下一次 inbound 时 ──▶ _state_restore (loop.py:1220) 读 runtime_checkpoint
                        + pending_user_turn,把上次中断的上下文物化进 history,
                        新消息接着这段历史继续推理。

补充

设计决策与哲学

关键组件深入解读

AgentLoop 状态机(nanobot/agent/loop.py)

AgentLoop 是 ~1600 行的核心类,构造时一次性装配整个 agent 运行所需的所有协作者。 run() 是无限循环:从 bus 拿 InboundMessage → priority 命令短路 → 检查 session 是否已有 task(有则路由到 pending_queue 做 mid-turn 注入)→ 否则 asyncio.create_task(_dispatch(msg))_dispatch 在锁 + 信号量保护下进入状态机,状态机由 _TRANSITIONS 表驱动。

最有意思的细节是 checkpoint:runner 在每次工具执行后把当前轮 phase / iteration / assistant_message / completed_tool_results / pending_tool_calls 写进 session.metadata["runtime_checkpoint"]。 一旦 task 被 /stop 取消,_dispatch 的 except 分支会调 _restore_runtime_checkpoint 把"半成品"物化回 session 历史;下次 inbound 时 _state_restore 读出来继续。 这把「中断恢复」从异常处理变成了状态机一等公民。

FallbackProvider(nanobot/providers/fallback_provider.py)

273 行的 FallbackProvider 是一个标准的装饰器模式:实现 LLMProvider 接口,内部持有 primary + 若干 fallback preset + provider_factory 回调。 chat_streamhas_streamed: list[bool] = [False] 通过包装 on_content_delta 回调追踪是否已经向用户吐字——这是判断能否安全 failover 的核心信号。 _should_fallback 是一个细致的多维分类器:HTTP 状态码(400/401/403/404/422 → 不 fallback;408/409/429 + 5xx → fallback)→ error_kind / error_type / error_code 集合匹配 → 错误文本 token 兜底。