fix(antigravity): improve streaming retry logic and implement true SSE streaming

- Add isRetryableResponse() to detect SUBSCRIPTION_REQUIRED 403 errors for retry handling
- Remove JSDoc comments from isRetryableError() for clarity
- Add debug logging for request/response details (streaming flag, status, content-type)
- Refactor transformStreamingResponse() to use TransformStream for true streaming
  - Replace buffering approach with incremental chunk processing
  - Implement createSseTransformStream() for line-by-line transformation
  - Reduces memory footprint and Time-To-First-Byte (TTFB)
- Update SSE content-type detection to include alt=sse URL parameter
- Simplify response transformation logic for non-streaming path
- Add more granular debug logging for thought signature extraction

🤖 GENERATED WITH ASSISTANCE OF [OhMyOpenCode](https://github.com/code-yeongyu/oh-my-opencode)
This commit is contained in:
YeonGyu-Kim
2025-12-13 04:50:11 +09:00
parent abd90bbc9c
commit 5608bd0ef9
2 changed files with 96 additions and 77 deletions

View File

@@ -63,16 +63,24 @@ function debugLog(message: string): void {
} }
} }
/**
* Check if an error is a retryable network/server error
*/
function isRetryableError(status: number): boolean { function isRetryableError(status: number): boolean {
// 4xx client errors (except 429 rate limit) are not retryable if (status === 0) return true
// 5xx server errors are retryable if (status === 429) return true
// Network errors (status 0) are retryable if (status >= 500 && status < 600) return true
if (status === 0) return true // Network error return false
if (status === 429) return true // Rate limit }
if (status >= 500 && status < 600) return true // Server errors
async function isRetryableResponse(response: Response): Promise<boolean> {
if (isRetryableError(response.status)) return true
if (response.status === 403) {
try {
const text = await response.clone().text()
if (text.includes("SUBSCRIPTION_REQUIRED") || text.includes("Gemini Code Assist license")) {
debugLog(`[RETRY] 403 SUBSCRIPTION_REQUIRED detected, will retry with next endpoint`)
return true
}
} catch {}
}
return false return false
} }
@@ -145,6 +153,8 @@ async function attemptFetch(
thoughtSignature, thoughtSignature,
}) })
debugLog(`[REQ] streaming=${transformed.streaming}, url=${transformed.url}`)
const response = await fetch(transformed.url, { const response = await fetch(transformed.url, {
method: init.method || "POST", method: init.method || "POST",
headers: transformed.headers, headers: transformed.headers,
@@ -152,7 +162,11 @@ async function attemptFetch(
signal: init.signal, signal: init.signal,
}) })
if (!response.ok && isRetryableError(response.status)) { debugLog(
`[RESP] status=${response.status} content-type=${response.headers.get("content-type") ?? ""} url=${response.url}`
)
if (!response.ok && (await isRetryableResponse(response))) {
debugLog(`Endpoint failed: ${endpoint} (status: ${response.status}), trying next`) debugLog(`Endpoint failed: ${endpoint} (status: ${response.status}), trying next`)
return null return null
} }
@@ -223,18 +237,14 @@ async function transformResponseWithThinking(
result = await transformResponse(response) result = await transformResponse(response)
} }
if (streaming) {
return result.response
}
try { try {
const text = await result.response.clone().text() const text = await result.response.clone().text()
debugLog(`[TSIG][RESP] Response text length: ${text.length}`) debugLog(`[TSIG][RESP] Response text length: ${text.length}`)
if (streaming) {
const signature = extractSignatureFromSsePayload(text)
debugLog(`[TSIG][RESP] SSE signature extracted: ${signature ? "yes" : "no"}`)
if (signature) {
setThoughtSignature(fetchInstanceId, signature)
debugLog(`[TSIG][STORE] Stored signature for ${fetchInstanceId}: ${signature.substring(0, 30)}...`)
}
} else {
const parsed = JSON.parse(text) as GeminiResponseBody const parsed = JSON.parse(text) as GeminiResponseBody
debugLog(`[TSIG][RESP] Parsed keys: ${Object.keys(parsed).join(", ")}`) debugLog(`[TSIG][RESP] Parsed keys: ${Object.keys(parsed).join(", ")}`)
debugLog(`[TSIG][RESP] Has candidates: ${!!parsed.candidates}, count: ${parsed.candidates?.length ?? 0}`) debugLog(`[TSIG][RESP] Has candidates: ${!!parsed.candidates}, count: ${parsed.candidates?.length ?? 0}`)
@@ -259,7 +269,6 @@ async function transformResponseWithThinking(
}) })
} }
} }
}
} catch {} } catch {}
return result.response return result.response

View File

@@ -339,31 +339,39 @@ export function transformStreamingPayload(payload: string): string {
.join("\n") .join("\n")
} }
function createSseTransformStream(): TransformStream<Uint8Array, Uint8Array> {
const decoder = new TextDecoder()
const encoder = new TextEncoder()
let buffer = ""
return new TransformStream({
transform(chunk, controller) {
buffer += decoder.decode(chunk, { stream: true })
const lines = buffer.split("\n")
buffer = lines.pop() || ""
for (const line of lines) {
const transformed = transformSseLine(line)
controller.enqueue(encoder.encode(transformed + "\n"))
}
},
flush(controller) {
if (buffer) {
const transformed = transformSseLine(buffer)
controller.enqueue(encoder.encode(transformed))
}
},
})
}
/** /**
* Transforms a streaming SSE response from Antigravity to OpenAI format. * Transforms a streaming SSE response from Antigravity to OpenAI format.
* *
* **⚠️ CURRENT IMPLEMENTATION: BUFFERING** * Uses TransformStream to process SSE chunks incrementally as they arrive.
* This implementation reads the entire stream into memory before transforming. * Each line is transformed immediately and yielded to the client.
* While functional, it does not preserve true streaming characteristics:
* - Blocks until entire response is received
* - Consumes memory proportional to response size
* - Increases Time-To-First-Byte (TTFB)
*
* **TODO: Future Enhancement**
* Implement true streaming using ReadableStream transformation:
* - Parse SSE chunks incrementally
* - Transform and yield chunks as they arrive
* - Reduce memory footprint and TTFB
*
* For streaming responses (current buffered approach):
* - Unwraps the `response` field from each SSE event
* - Returns transformed SSE text as new Response
* - Extracts usage metadata from headers
*
* Note: Does NOT handle thinking block extraction (Task 10)
* *
* @param response - The SSE response from Antigravity API * @param response - The SSE response from Antigravity API
* @returns TransformResult with transformed response and metadata * @returns TransformResult with transformed streaming response
*/ */
export async function transformStreamingResponse(response: Response): Promise<TransformResult> { export async function transformStreamingResponse(response: Response): Promise<TransformResult> {
const headers = new Headers(response.headers) const headers = new Headers(response.headers)
@@ -402,7 +410,8 @@ export async function transformStreamingResponse(response: Response): Promise<Tr
// Check content type // Check content type
const contentType = response.headers.get("content-type") ?? "" const contentType = response.headers.get("content-type") ?? ""
const isEventStream = contentType.includes("text/event-stream") const isEventStream =
contentType.includes("text/event-stream") || response.url.includes("alt=sse")
if (!isEventStream) { if (!isEventStream) {
// Not SSE, delegate to non-streaming transform // Not SSE, delegate to non-streaming transform
@@ -434,24 +443,25 @@ export async function transformStreamingResponse(response: Response): Promise<Tr
} }
} }
// Handle SSE stream if (!response.body) {
// NOTE: Current implementation buffers entire stream - see JSDoc for details return { response, usage }
try { }
const text = await response.text()
const transformed = transformStreamingPayload(text) headers.delete("content-length")
headers.delete("content-encoding")
headers.set("content-type", "text/event-stream; charset=utf-8")
const transformStream = createSseTransformStream()
const transformedBody = response.body.pipeThrough(transformStream)
return { return {
response: new Response(transformed, { response: new Response(transformedBody, {
status: response.status, status: response.status,
statusText: response.statusText, statusText: response.statusText,
headers, headers,
}), }),
usage, usage,
} }
} catch {
// If reading fails, return original response
return { response, usage }
}
} }
/** /**
@@ -462,7 +472,7 @@ export async function transformStreamingResponse(response: Response): Promise<Tr
*/ */
export function isStreamingResponse(response: Response): boolean { export function isStreamingResponse(response: Response): boolean {
const contentType = response.headers.get("content-type") ?? "" const contentType = response.headers.get("content-type") ?? ""
return contentType.includes("text/event-stream") return contentType.includes("text/event-stream") || response.url.includes("alt=sse")
} }
/** /**