-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwaga-stream.py
More file actions
387 lines (329 loc) · 15.8 KB
/
Copy pathwaga-stream.py
File metadata and controls
387 lines (329 loc) · 15.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
waga-stream.py — 跑一次 headless claude,把过程实时刷到一张飞书卡片上。
借鉴 zarazhangrui/feishu-claude-code-bridge 的「流式单卡片」体验,缝进 waga 的
spawn worker:用 `claude -p --output-format stream-json` 跑,逐行解析事件,
把【运行状态 / 正文 / 工具调用 / 最终结果】patch 到同一张飞书交互卡片上,
替代 waga 原来「发一串离散文本」的刷屏式输出。
用法:
python waga-stream.py --name NAME --cwd DIR --sid SID [--first] \
--user OU_XXX "用户消息文本"
行为:
1. 先发一张「运行中」卡片(schema 2.0, streaming_mode),拿到 message_id。
2. spawn claude,逐行读 stream-json,累积状态,节流 patch 卡片(~1.2s 一次)。
3. 跑完 patch 成终态(绿=完成 / 红=错),打印最终结果文本到 stdout。
设计取舍:
· headless 弹不出权限框 → --dangerously-skip-permissions 自动放行(按『正常 session』用)。
· 卡片不放可点按钮:点击要事件回调服务器,waga 是轮询架构收不到回调。停止用
文字 `name: /stop`,卡片只显示状态。
· 按 no-emoji 规则:卡片不内联 emoji,靠头部颜色 + 文字标签表状态。
"""
import argparse
import json
import os
import shutil
import subprocess
import sys
import threading
import time
# Windows 下 Python stdout/stderr 默认 GBK 编码,claude 回复里有 emoji(✅ 等)时
# 最后 print 结果会 UnicodeEncodeError 崩溃(表现为"stream worker 异常")。强制 UTF-8。
for _s in (sys.stdout, sys.stderr):
try:
_s.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
# Windows 下 npm 全局命令是 .CMD,必须解析全路径——原生 python 的 subprocess
# 不会按 PATHEXT 查 .CMD,直接传名字会 WinError 2。Git Bash 里 `python` 还是
# WindowsApps 的坏桩,所以本脚本一律用 `py` 启动(见 waga-spawn.sh)。
LARK = shutil.which("lark-cli") or "lark-cli"
CLAUDE = shutil.which("claude") or "claude"
ENV = dict(os.environ, LARK_CLI_NO_PROXY="1")
# 注入给 headless claude 的角色说明(--append-system-prompt)。
# 解决「第一次跟新会话打招呼,它却大谈 /waga-on、om_xxx、监听器」的尴尬:
# headless claude 本身不知道自己被 waga 包着,所以会把底层管线当问题讲给用户。
# 这段告诉它:你是被远程驱动的、管线已替你处理、别提内部机制、开场给标准问候。
SYSTEM_PROMPT = (
"你是一个通过飞书 Waga 被远程访问的 Claude Code 会话。用户在外面用飞书私聊跟你对话,"
"你的回复会被外部管线自动转成飞书卡片发出去——监听器、卡片、表情、om_xxx 消息 id、"
"粘性目标、/waga-on 这些底层管线都已经替你处理好了,你既不需要也不要向用户提起它们,"
"更不要让用户去『挂监听器』或解释这些机制。"
"如果某条消息里夹带了历史聊天记录或 om_ 开头的 id,那是上下文噪音,忽略即可,别去逐条回应。"
"第一次有人跟你打招呼时,给一句简短自然的开场白:表明这是一个全新的会话、可以开始对话了、"
"并简述你能帮上什么,不要罗列内部机制,也不要长篇大论。"
)
# ⚠ 卡片正文用 <font color> 内联上色,但 `<` `>` 经 .CMD→cmd.exe 会被当重定向符
# 搞坏命令(bash 走 sh-shim 没事,python subprocess 走 .CMD 才坏)。绕法:直接用
# node 跑 lark-cli 的 run.js,argv 不过 cmd 就安全。lark_argv() 优先返回 node 直跑。
_NODE = shutil.which("node")
_LARK_RUNJS = os.path.expandvars(r"%APPDATA%\npm\node_modules\@larksuite\cli\scripts\run.js")
def lark_argv(*args):
if _NODE and os.path.exists(_LARK_RUNJS):
return [_NODE, _LARK_RUNJS, *args]
return [LARK, *args]
def _tmpdir():
# Git Bash 的 /tmp 实际就是 %TMP%(实测同一目录);py 用 TMP 才能跟 bash 监听器共享文件。
return os.environ.get("TMP") or os.environ.get("TEMP") or "/tmp"
# 已发卡片登记:每条卡片的 mid 追加到共享文件 waga_sent.txt(行格式 `mid|name`)。
# 监听器据此实现「引用回复某张卡 → 路由到发该卡的 session」(reply_to 路由)。
SENT_FILE = os.path.join(_tmpdir(), "waga_sent.txt")
def record_sent(name, mid):
if not mid:
return
try:
with open(SENT_FILE, "a", encoding="utf-8") as f:
f.write(f"{mid}|{name}\n")
except Exception:
pass
# ---- 卡片渲染 -------------------------------------------------------------
# 统一风格(用户 2026-05-22 拍板「A 内联蓝字」):不用彩色大头条,name 用内联蓝色
# 加粗写在正文最上面,状态用内联彩色小字(绿完成/红失败/灰处理中)标在末尾。轻、像聊天。
STATUS_LABEL = {"running": "运行中", "done": "完成", "error": "失败", "interrupted": "已中断"}
STATUS_COLOR = {"running": "grey", "done": "green", "error": "red", "interrupted": "grey"}
CARD_MD_CAP = 3500 # 卡片正文 markdown 上限,超长 head+tail 截断
def _clip(text, cap=CARD_MD_CAP):
if len(text) <= cap:
return text
head = cap * 2 // 3
tail = cap - head
return text[:head] + "\n\n…(中间省略)…\n\n" + text[-tail:]
def name_line(name):
"""内联蓝色加粗 name —— 所有 waga 卡片的统一开头(替代 [name] 方括号/大头条)。
若设了环境变量 WAGA_ENGINE_LABEL(如 cursor),在 name 后追加一个灰色 ·标签,
用来在飞书侧区分这张卡是哪个引擎拉起的(Cursor vs Claude)。Claude 侧不设=原样。"""
tag = f"<font color='blue'>**{name}**</font>"
label = os.environ.get("WAGA_ENGINE_LABEL", "").strip()
if label:
tag += f" <font color='grey'>· {label}</font>"
# 引擎标签右边再跟当前模型(WAGA_ENGINE_MODEL),如 · cursor · gemini-3.5-flash
model = os.environ.get("WAGA_ENGINE_MODEL", "").strip()
if model:
tag += f" <font color='grey'>· {model}</font>"
return tag
def build_card(name, state, body_text, tools, footer, elapsed):
"""state: running|done|error|interrupted。内联蓝名 + 正文 + 工具 + 内联彩色状态小字。"""
elements = [{"tag": "markdown", "content": name_line(name)}]
if body_text.strip():
elements.append({"tag": "markdown", "content": _clip(body_text.strip())})
if tools:
lines = []
for t in tools[-12:]:
mark = "·" if t.get("running") else "✓" # ✓ 是文字勾,非 emoji
detail = t.get("detail", "")
detail = (" " + detail) if detail else ""
lines.append(f"`{mark}` **{t['name']}**{detail}")
if len(tools) > 12:
lines.insert(0, f"_(前 {len(tools) - 12} 个工具调用已折叠)_")
elements.append({"tag": "hr"})
elements.append({"tag": "markdown", "content": "\n".join(lines)})
# 状态:内联彩色小字(颜色即状态,不用大头条)
if state == "running":
label = footer or "处理中…"
elif state == "done":
label = "完成"
elif state == "error":
label = "失败" + (f":{footer}" if footer else "")
else:
label = "已中断"
# running 不写秒数:步进卡的秒数无法实时跳动,冻着显得假(用户 2026-05-22)。
# 只在终态(done/error/中断)写总耗时,那是准确的最终值。
if elapsed is not None and state != "running":
label = f"{label} · {elapsed:.0f}s"
elements.append({"tag": "hr"})
elements.append({"tag": "markdown",
"content": f"<font color='{STATUS_COLOR[state]}'>{label}</font>"})
summary = body_text.strip().replace("\n", " ")[:40] or STATUS_LABEL[state]
return {
"schema": "2.0",
"config": {
"streaming_mode": state == "running",
"summary": {"content": f"{name}: {summary}"},
},
"body": {"elements": elements},
}
# ---- 飞书 IO --------------------------------------------------------------
def card_send(user_oid, card):
"""发一张交互卡片,返回 message_id 或 None"""
content = json.dumps(card, ensure_ascii=False)
p = subprocess.run(
lark_argv("im", "+messages-send", "--as", "bot", "--user-id", user_oid,
"--msg-type", "interactive", "--content", content),
capture_output=True, text=True, env=ENV, encoding="utf-8", errors="replace",
)
out = (p.stdout or "") + (p.stderr or "")
if '"ok": true' not in out and '"ok":true' not in out:
sys.stderr.write(f"[waga-stream] card_send failed: {out[:300]}\n")
return None
try:
# 抓 message_id
for tok in ('"message_id": "', '"message_id":"'):
i = out.find(tok)
if i >= 0:
j = out.find('"', i + len(tok))
return out[i + len(tok):j]
except Exception:
pass
return None
def card_patch(mid, card):
"""patch 已发出的卡片;失败时打到 stderr(不致命,下一帧会重试)"""
content = json.dumps(card, ensure_ascii=False)
body = json.dumps({"content": content}, ensure_ascii=False)
p = subprocess.run(
lark_argv("api", "PATCH", f"/open-apis/im/v1/messages/{mid}",
"--as", "bot", "--data", body),
capture_output=True, text=True, env=ENV, encoding="utf-8", errors="replace",
)
out = (p.stdout or "") + (p.stderr or "")
if '"code": 0' not in out and '"code":0' not in out:
sys.stderr.write(f"[waga-stream] card_patch failed: {out[:200]}\n")
def reacted_no(mid):
"""用户是否在这张卡上贴了 No 表情(reaction 即动作:No → 终止任务)。"""
if not mid:
return False
p = subprocess.run(
lark_argv("im", "reactions", "list", "--as", "bot",
"--params", json.dumps({"message_id": mid})),
capture_output=True, text=True, env=ENV, encoding="utf-8", errors="replace",
)
return '"No"' in (p.stdout or "")
# ---- stream-json 解析 -----------------------------------------------------
def run(name, cwd, sid, first, user_oid, message):
started = time.time()
body_text = ""
tools = [] # [{name, detail, running}]
footer = ""
mid = card_send(user_oid, build_card(name, "running", "", [], "启动中…", 0))
record_sent(name, mid)
last_patch = 0.0
def maybe_patch(force=False):
nonlocal last_patch
now = time.time()
if not force and now - last_patch < 1.2:
return
last_patch = now
if mid:
card_patch(mid, build_card(name, "running", body_text, tools, footer,
now - started))
cmd = [CLAUDE, "-p", "--output-format", "stream-json", "--verbose",
"--dangerously-skip-permissions",
"--append-system-prompt", SYSTEM_PROMPT]
cmd += (["--session-id", sid] if first else ["--resume", sid])
cmd += [message]
state = "done"
err_msg = ""
result_text = ""
try:
proc = subprocess.Popen(
cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, env=ENV, encoding="utf-8", errors="replace", bufsize=1,
)
except Exception as e:
state, err_msg = "error", f"无法启动 claude: {e}"
if mid:
card_patch(mid, build_card(name, "error", f"**⚠ 失败原因**\n{err_msg}", [], err_msg, 0))
print(err_msg)
return 1
# ⚠ stderr 必须在跑的同时排空:否则 claude 往 stderr 写满管道缓冲(~64KB)会阻塞,
# 整个 headless worker 静默挂死(正是要消灭的"卡住不报错")。后台线程持续 drain。
stderr_lines = []
def drain_err():
try:
for ln in proc.stderr:
stderr_lines.append(ln)
except Exception:
pass
threading.Thread(target=drain_err, daemon=True).start()
# reaction 即动作:后台线程轮询这张卡,用户贴 No → 杀 claude(终止任务)
watch = {"interrupted": False, "done": False}
def watcher():
while not watch["done"]:
if reacted_no(mid):
watch["interrupted"] = True
try:
proc.kill()
except Exception:
pass
return
time.sleep(3)
threading.Thread(target=watcher, daemon=True).start()
for line in proc.stdout:
line = line.strip()
if not line:
continue
try:
ev = json.loads(line)
except Exception:
continue
et = ev.get("type")
if et == "assistant":
for blk in ev.get("message", {}).get("content", []):
bt = blk.get("type")
if bt == "text":
body_text += blk.get("text", "")
elif bt == "tool_use":
tname = blk.get("name", "tool")
inp = blk.get("input", {})
detail = ""
for k in ("command", "file_path", "path", "pattern", "query", "url"):
if k in inp and isinstance(inp[k], str):
detail = inp[k][:80]
break
tools.append({"name": tname, "detail": detail, "running": True})
footer = f"调用 {tname}"
maybe_patch()
elif et == "user":
# tool_result 回来:把对应工具标记完成
for t in reversed(tools):
if t.get("running"):
t["running"] = False
break
footer = "处理结果…"
maybe_patch()
elif et == "result":
result_text = ev.get("result", "") or ""
if ev.get("is_error"):
state = "error"
err_msg = result_text[:800] or "未知错误"
else:
state = "done"
if not body_text.strip() and result_text.strip():
body_text = result_text
proc.wait()
watch["done"] = True
for t in tools:
t["running"] = False
time.sleep(0.1) # 让 drain 线程收尾
stderr_tail = "".join(stderr_lines)[-800:].strip()
if watch["interrupted"]:
state = "interrupted"
elif proc.returncode not in (0, None) and state != "error":
state = "error"
err_msg = stderr_tail or f"claude 退出码 {proc.returncode}"
elif state == "error" and stderr_tail and stderr_tail not in err_msg:
# 已是 error 但 stderr 有更具体内容 → 补上,让"什么原因"看得全
err_msg = (err_msg + "\n" + stderr_tail)[:800]
# 失败时把原因塞进卡片正文(大字可读),而不是只压在末尾小字状态行——
# 用户明确要求"得知道到底什么原因"。
final_body = body_text
if state == "error" and err_msg:
final_body = (body_text + "\n\n**⚠ 失败原因**\n" + err_msg).strip()
if mid:
card_patch(mid, build_card(name, state, final_body,
tools, err_msg if state == "error" else "",
time.time() - started))
# 给 spawn 循环回最终文本
print(result_text or body_text or "(claude 无输出)")
return 0 if state == "done" else 1
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--name", required=True)
ap.add_argument("--cwd", default=os.getcwd())
ap.add_argument("--sid", required=True)
ap.add_argument("--first", action="store_true")
ap.add_argument("--user", required=True)
ap.add_argument("message")
a = ap.parse_args()
sys.exit(run(a.name, a.cwd, a.sid, a.first, a.user, a.message))
if __name__ == "__main__":
main()