From 4debb574020314213e90855617d70594c4595d0e Mon Sep 17 00:00:00 2001 From: YeonGyu-Kim Date: Sat, 6 Dec 2025 11:17:55 +0900 Subject: [PATCH] feat(hooks): add pulse-monitor for token stall detection and auto-recovery - Detect token stalls via message.part.updated heartbeat monitoring - Support thinking/reasoning block detection with extended timeout - Auto-recover: abort + 'continue' prompt on 5min stall - Pause monitoring during tool execution --- src/hooks/index.ts | 1 + src/hooks/pulse-monitor.ts | 142 +++++++++++++++++++++++++++++++++++++ src/index.ts | 6 +- 3 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 src/hooks/pulse-monitor.ts diff --git a/src/hooks/index.ts b/src/hooks/index.ts index 7757364..96078ed 100644 --- a/src/hooks/index.ts +++ b/src/hooks/index.ts @@ -4,3 +4,4 @@ export { createSessionNotification } from "./session-notification" export { createSessionRecoveryHook } from "./session-recovery" export { createCommentCheckerHooks } from "./comment-checker" export { createGrepOutputTruncatorHook } from "./grep-output-truncator" +export { createPulseMonitorHook } from "./pulse-monitor" diff --git a/src/hooks/pulse-monitor.ts b/src/hooks/pulse-monitor.ts new file mode 100644 index 0000000..c7c9a35 --- /dev/null +++ b/src/hooks/pulse-monitor.ts @@ -0,0 +1,142 @@ +import type { PluginInput } from "@opencode-ai/plugin" + +export function createPulseMonitorHook(ctx: PluginInput) { + const STANDARD_TIMEOUT = 5 * 60 * 1000 // 5 minutes + const THINKING_TIMEOUT = 5 * 60 * 1000 // 5 minutes + const CHECK_INTERVAL = 5 * 1000 // 5 seconds + + let lastHeartbeat = Date.now() + let isMonitoring = false + let currentSessionID: string | null = null + let monitorTimer: ReturnType | null = null + let isThinking = false + + const startMonitoring = (sessionID: string) => { + if (currentSessionID !== sessionID) { + currentSessionID = sessionID + // Reset thinking state when switching sessions or starting new + isThinking = false + } + + lastHeartbeat = Date.now() + + if (!isMonitoring) { + isMonitoring = true + if (monitorTimer) clearInterval(monitorTimer) + + monitorTimer = setInterval(async () => { + if (!isMonitoring || !currentSessionID) return + + const timeSinceLastHeartbeat = Date.now() - lastHeartbeat + const currentTimeout = isThinking ? THINKING_TIMEOUT : STANDARD_TIMEOUT + + if (timeSinceLastHeartbeat > currentTimeout) { + await recoverStalledSession(currentSessionID, timeSinceLastHeartbeat, isThinking) + } + }, CHECK_INTERVAL) + } + } + + const stopMonitoring = () => { + isMonitoring = false + if (monitorTimer) { + clearInterval(monitorTimer) + monitorTimer = null + } + } + + const updateHeartbeat = (isThinkingUpdate?: boolean) => { + if (isMonitoring) { + lastHeartbeat = Date.now() + if (isThinkingUpdate !== undefined) { + isThinking = isThinkingUpdate + } + } + } + + const recoverStalledSession = async (sessionID: string, stalledDuration: number, wasThinking: boolean) => { + stopMonitoring() + + try { + const durationSec = Math.round(stalledDuration/1000) + const typeStr = wasThinking ? "Thinking" : "Standard" + + // 1. Notify User + await ctx.client.tui.showToast({ + body: { + title: "Pulse Monitor: Cardiac Arrest", + message: `Session stalled (${typeStr}) for ${durationSec}s. Defibrillating...`, + variant: "error", + duration: 5000 + } + }).catch(() => {}) + + // 2. Abort current generation (Defibrillation shock) + await ctx.client.session.abort({ path: { id: sessionID } }).catch(() => {}) + + // 3. Wait a bit for state to settle + await new Promise(resolve => setTimeout(resolve, 1500)) + + // 4. Prompt "continue" to kickstart (CPR) + await ctx.client.session.prompt({ + path: { id: sessionID }, + body: { parts: [{ type: "text", text: "The connection was unstable and stalled. Please continue from where you left off." }] }, + query: { directory: ctx.directory } + }) + + // Resume monitoring + startMonitoring(sessionID) + + } catch (err) { + console.error("[PulseMonitor] Recovery failed:", err) + // If recovery fails, we stop monitoring to avoid loops + stopMonitoring() + } + } + + return { + event: async (input: { event: any }) => { + const { event } = input + const props = event.properties as Record | undefined + + // Monitor both session updates and part updates to capture token flow + if (event.type === "session.updated" || event.type === "message.part.updated") { + // Try to get sessionID from various common locations + const sessionID = props?.info?.id || props?.sessionID + + if (sessionID) { + if (!isMonitoring) startMonitoring(sessionID) + + // Check for thinking indicators in the payload + let thinkingUpdate: boolean | undefined = undefined + + if (event.type === "message.part.updated") { + const part = props?.part + if (part) { + const THINKING_TYPES = ["thinking", "redacted_thinking", "reasoning"] + if (THINKING_TYPES.includes(part.type)) { + thinkingUpdate = true + } else if (part.type === "text" || part.type === "tool_use") { + thinkingUpdate = false + } + } + } + + updateHeartbeat(thinkingUpdate) + } + } else if (event.type === "session.idle" || event.type === "session.error" || event.type === "session.stopped") { + stopMonitoring() + } + }, + "tool.execute.before": async () => { + // Pause monitoring while tool runs locally (tools can take time) + stopMonitoring() + }, + "tool.execute.after": async (input: { sessionID: string }) => { + // Resume monitoring after tool finishes + if (input.sessionID) { + startMonitoring(input.sessionID) + } + } + } +} diff --git a/src/index.ts b/src/index.ts index 2c0ac73..a0cad0f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ import type { Plugin } from "@opencode-ai/plugin" import { createBuiltinAgents } from "./agents" -import { createTodoContinuationEnforcer, createContextWindowMonitorHook, createSessionRecoveryHook, createCommentCheckerHooks, createGrepOutputTruncatorHook } from "./hooks" +import { createTodoContinuationEnforcer, createContextWindowMonitorHook, createSessionRecoveryHook, createCommentCheckerHooks, createGrepOutputTruncatorHook, createPulseMonitorHook } from "./hooks" import { updateTerminalTitle } from "./features/terminal" import { builtinTools } from "./tools" import { createBuiltinMcps } from "./mcp" @@ -43,6 +43,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => { const todoContinuationEnforcer = createTodoContinuationEnforcer(ctx) const contextWindowMonitor = createContextWindowMonitorHook(ctx) const sessionRecovery = createSessionRecoveryHook(ctx) + const pulseMonitor = createPulseMonitorHook(ctx) const commentChecker = createCommentCheckerHooks() const grepOutputTruncator = createGrepOutputTruncatorHook(ctx) @@ -80,6 +81,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => { event: async (input) => { await todoContinuationEnforcer(input) await contextWindowMonitor.event(input) + await pulseMonitor.event(input) const { event } = input const props = event.properties as Record | undefined @@ -172,6 +174,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => { }, "tool.execute.before": async (input, output) => { + await pulseMonitor["tool.execute.before"]() await commentChecker["tool.execute.before"](input, output) if (input.sessionID === mainSessionID) { @@ -186,6 +189,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => { }, "tool.execute.after": async (input, output) => { + await pulseMonitor["tool.execute.after"](input) await grepOutputTruncator["tool.execute.after"](input, output) await contextWindowMonitor["tool.execute.after"](input, output) await commentChecker["tool.execute.after"](input, output)