refactor(session-recovery): process entire message history for empty/thinking block recovery
- Scan all non-final assistant messages for empty content, orphan thinking blocks, and disabled thinking - Add storage utility functions: findMessagesWithThinkingBlocks, findMessagesWithOrphanThinking, stripThinkingParts, prependThinkingPart - Fix: Previously only processed single failed message, now handles multiple broken messages in history - Improve: Use filesystem-based recovery instead of unreliable SDK APIs
This commit is contained in:
@@ -1,6 +1,13 @@
|
|||||||
import type { PluginInput } from "@opencode-ai/plugin"
|
import type { PluginInput } from "@opencode-ai/plugin"
|
||||||
import type { createOpencodeClient } from "@opencode-ai/sdk"
|
import type { createOpencodeClient } from "@opencode-ai/sdk"
|
||||||
import { findFirstEmptyMessage, injectTextPart } from "./storage"
|
import {
|
||||||
|
findEmptyMessages,
|
||||||
|
findMessagesWithOrphanThinking,
|
||||||
|
findMessagesWithThinkingBlocks,
|
||||||
|
injectTextPart,
|
||||||
|
prependThinkingPart,
|
||||||
|
stripThinkingParts,
|
||||||
|
} from "./storage"
|
||||||
import type { MessageData } from "./types"
|
import type { MessageData } from "./types"
|
||||||
|
|
||||||
type Client = ReturnType<typeof createOpencodeClient>
|
type Client = ReturnType<typeof createOpencodeClient>
|
||||||
@@ -109,76 +116,46 @@ async function recoverToolResultMissing(
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function recoverThinkingBlockOrder(
|
async function recoverThinkingBlockOrder(
|
||||||
client: Client,
|
_client: Client,
|
||||||
sessionID: string,
|
sessionID: string,
|
||||||
failedAssistantMsg: MessageData,
|
_failedAssistantMsg: MessageData,
|
||||||
directory: string
|
_directory: string
|
||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
const messageID = failedAssistantMsg.info?.id
|
const orphanMessages = findMessagesWithOrphanThinking(sessionID)
|
||||||
if (!messageID) {
|
|
||||||
|
if (orphanMessages.length === 0) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
const existingParts = failedAssistantMsg.parts || []
|
let anySuccess = false
|
||||||
const patchedParts: MessagePart[] = [{ type: "thinking", thinking: "" } as ThinkingPart, ...existingParts]
|
for (const messageID of orphanMessages) {
|
||||||
|
if (prependThinkingPart(sessionID, messageID)) {
|
||||||
|
anySuccess = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
return anySuccess
|
||||||
// @ts-expect-error - Experimental API
|
|
||||||
await client.message?.update?.({
|
|
||||||
path: { id: messageID },
|
|
||||||
body: { parts: patchedParts },
|
|
||||||
})
|
|
||||||
return true
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// @ts-expect-error - Experimental API
|
|
||||||
await client.session.patch?.({
|
|
||||||
path: { id: sessionID },
|
|
||||||
body: { messageID, parts: patchedParts },
|
|
||||||
})
|
|
||||||
return true
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
return await fallbackRevertStrategy(client, sessionID, failedAssistantMsg, directory)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function recoverThinkingDisabledViolation(
|
async function recoverThinkingDisabledViolation(
|
||||||
client: Client,
|
_client: Client,
|
||||||
sessionID: string,
|
sessionID: string,
|
||||||
failedAssistantMsg: MessageData
|
_failedAssistantMsg: MessageData
|
||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
const messageID = failedAssistantMsg.info?.id
|
const messagesWithThinking = findMessagesWithThinkingBlocks(sessionID)
|
||||||
if (!messageID) {
|
|
||||||
|
if (messagesWithThinking.length === 0) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
const existingParts = failedAssistantMsg.parts || []
|
let anySuccess = false
|
||||||
const strippedParts = existingParts.filter((p) => p.type !== "thinking" && p.type !== "redacted_thinking")
|
for (const messageID of messagesWithThinking) {
|
||||||
|
if (stripThinkingParts(messageID)) {
|
||||||
if (strippedParts.length === 0) {
|
anySuccess = true
|
||||||
return false
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
return anySuccess
|
||||||
// @ts-expect-error - Experimental API
|
|
||||||
await client.message?.update?.({
|
|
||||||
path: { id: messageID },
|
|
||||||
body: { parts: strippedParts },
|
|
||||||
})
|
|
||||||
return true
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// @ts-expect-error - Experimental API
|
|
||||||
await client.session.patch?.({
|
|
||||||
path: { id: sessionID },
|
|
||||||
body: { messageID, parts: strippedParts },
|
|
||||||
})
|
|
||||||
return true
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function recoverEmptyContentMessage(
|
async function recoverEmptyContentMessage(
|
||||||
@@ -187,10 +164,22 @@ async function recoverEmptyContentMessage(
|
|||||||
failedAssistantMsg: MessageData,
|
failedAssistantMsg: MessageData,
|
||||||
_directory: string
|
_directory: string
|
||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
const emptyMessageID = findFirstEmptyMessage(sessionID) || failedAssistantMsg.info?.id
|
const emptyMessageIDs = findEmptyMessages(sessionID)
|
||||||
if (!emptyMessageID) return false
|
|
||||||
|
|
||||||
return injectTextPart(sessionID, emptyMessageID, "(interrupted)")
|
if (emptyMessageIDs.length === 0) {
|
||||||
|
const fallbackID = failedAssistantMsg.info?.id
|
||||||
|
if (!fallbackID) return false
|
||||||
|
return injectTextPart(sessionID, fallbackID, "(interrupted)")
|
||||||
|
}
|
||||||
|
|
||||||
|
let anySuccess = false
|
||||||
|
for (const messageID of emptyMessageIDs) {
|
||||||
|
if (injectTextPart(sessionID, messageID, "(interrupted)")) {
|
||||||
|
anySuccess = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return anySuccess
|
||||||
}
|
}
|
||||||
|
|
||||||
async function fallbackRevertStrategy(
|
async function fallbackRevertStrategy(
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { existsSync, mkdirSync, readdirSync, readFileSync, writeFileSync } from "node:fs"
|
import { existsSync, mkdirSync, readdirSync, readFileSync, unlinkSync, writeFileSync } from "node:fs"
|
||||||
import { join } from "node:path"
|
import { join } from "node:path"
|
||||||
import { MESSAGE_STORAGE, PART_STORAGE, THINKING_TYPES, META_TYPES } from "./constants"
|
import { MESSAGE_STORAGE, PART_STORAGE, THINKING_TYPES, META_TYPES } from "./constants"
|
||||||
import type { StoredMessageMeta, StoredPart, StoredTextPart } from "./types"
|
import type { StoredMessageMeta, StoredPart, StoredTextPart } from "./types"
|
||||||
@@ -136,3 +136,100 @@ export function findFirstEmptyMessage(sessionID: string): string | null {
|
|||||||
const emptyIds = findEmptyMessages(sessionID)
|
const emptyIds = findEmptyMessages(sessionID)
|
||||||
return emptyIds.length > 0 ? emptyIds[0] : null
|
return emptyIds.length > 0 ? emptyIds[0] : null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function findMessagesWithThinkingBlocks(sessionID: string): string[] {
|
||||||
|
const messages = readMessages(sessionID)
|
||||||
|
const result: string[] = []
|
||||||
|
|
||||||
|
for (let i = 0; i < messages.length; i++) {
|
||||||
|
const msg = messages[i]
|
||||||
|
if (msg.role !== "assistant") continue
|
||||||
|
|
||||||
|
const isLastMessage = i === messages.length - 1
|
||||||
|
if (isLastMessage) continue
|
||||||
|
|
||||||
|
const parts = readParts(msg.id)
|
||||||
|
const hasThinking = parts.some((p) => THINKING_TYPES.has(p.type))
|
||||||
|
if (hasThinking) {
|
||||||
|
result.push(msg.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
export function findMessagesWithOrphanThinking(sessionID: string): string[] {
|
||||||
|
const messages = readMessages(sessionID)
|
||||||
|
const result: string[] = []
|
||||||
|
|
||||||
|
for (let i = 0; i < messages.length; i++) {
|
||||||
|
const msg = messages[i]
|
||||||
|
if (msg.role !== "assistant") continue
|
||||||
|
|
||||||
|
const isLastMessage = i === messages.length - 1
|
||||||
|
if (isLastMessage) continue
|
||||||
|
|
||||||
|
const parts = readParts(msg.id)
|
||||||
|
if (parts.length === 0) continue
|
||||||
|
|
||||||
|
const sortedParts = [...parts].sort((a, b) => a.id.localeCompare(b.id))
|
||||||
|
const firstPart = sortedParts[0]
|
||||||
|
|
||||||
|
const hasThinking = parts.some((p) => THINKING_TYPES.has(p.type))
|
||||||
|
const firstIsThinking = THINKING_TYPES.has(firstPart.type)
|
||||||
|
|
||||||
|
if (hasThinking && !firstIsThinking) {
|
||||||
|
result.push(msg.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
export function prependThinkingPart(sessionID: string, messageID: string): boolean {
|
||||||
|
const partDir = join(PART_STORAGE, messageID)
|
||||||
|
|
||||||
|
if (!existsSync(partDir)) {
|
||||||
|
mkdirSync(partDir, { recursive: true })
|
||||||
|
}
|
||||||
|
|
||||||
|
const partId = `prt_0000000000_thinking`
|
||||||
|
const part = {
|
||||||
|
id: partId,
|
||||||
|
sessionID,
|
||||||
|
messageID,
|
||||||
|
type: "thinking",
|
||||||
|
thinking: "",
|
||||||
|
synthetic: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
writeFileSync(join(partDir, `${partId}.json`), JSON.stringify(part, null, 2))
|
||||||
|
return true
|
||||||
|
} catch {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function stripThinkingParts(messageID: string): boolean {
|
||||||
|
const partDir = join(PART_STORAGE, messageID)
|
||||||
|
if (!existsSync(partDir)) return false
|
||||||
|
|
||||||
|
let anyRemoved = false
|
||||||
|
for (const file of readdirSync(partDir)) {
|
||||||
|
if (!file.endsWith(".json")) continue
|
||||||
|
try {
|
||||||
|
const filePath = join(partDir, file)
|
||||||
|
const content = readFileSync(filePath, "utf-8")
|
||||||
|
const part = JSON.parse(content) as StoredPart
|
||||||
|
if (THINKING_TYPES.has(part.type)) {
|
||||||
|
unlinkSync(filePath)
|
||||||
|
anyRemoved = true
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return anyRemoved
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user