From 5608bd0ef9f6d134a4f0a459d92715e4c37db252 Mon Sep 17 00:00:00 2001 From: YeonGyu-Kim Date: Sat, 13 Dec 2025 04:50:11 +0900 Subject: [PATCH] fix(antigravity): improve streaming retry logic and implement true SSE streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add isRetryableResponse() to detect SUBSCRIPTION_REQUIRED 403 errors for retry handling - Remove JSDoc comments from isRetryableError() for clarity - Add debug logging for request/response details (streaming flag, status, content-type) - Refactor transformStreamingResponse() to use TransformStream for true streaming - Replace buffering approach with incremental chunk processing - Implement createSseTransformStream() for line-by-line transformation - Reduces memory footprint and Time-To-First-Byte (TTFB) - Update SSE content-type detection to include alt=sse URL parameter - Simplify response transformation logic for non-streaming path - Add more granular debug logging for thought signature extraction 🤖 GENERATED WITH ASSISTANCE OF [OhMyOpenCode](https://github.com/code-yeongyu/oh-my-opencode) --- src/auth/antigravity/fetch.ts | 87 ++++++++++++++++++-------------- src/auth/antigravity/response.ts | 86 +++++++++++++++++-------------- 2 files changed, 96 insertions(+), 77 deletions(-) diff --git a/src/auth/antigravity/fetch.ts b/src/auth/antigravity/fetch.ts index 29267ab..5e9472e 100644 --- a/src/auth/antigravity/fetch.ts +++ b/src/auth/antigravity/fetch.ts @@ -63,16 +63,24 @@ function debugLog(message: string): void { } } -/** - * Check if an error is a retryable network/server error - */ function isRetryableError(status: number): boolean { - // 4xx client errors (except 429 rate limit) are not retryable - // 5xx server errors are retryable - // Network errors (status 0) are retryable - if (status === 0) return true // Network error - if (status === 429) return true // Rate limit - if (status >= 500 && status < 600) return true // Server errors + if (status === 0) return true + if (status === 429) return true + if (status >= 500 && status < 600) return true + return false +} + +async function isRetryableResponse(response: Response): Promise { + if (isRetryableError(response.status)) return true + if (response.status === 403) { + try { + const text = await response.clone().text() + if (text.includes("SUBSCRIPTION_REQUIRED") || text.includes("Gemini Code Assist license")) { + debugLog(`[RETRY] 403 SUBSCRIPTION_REQUIRED detected, will retry with next endpoint`) + return true + } + } catch {} + } return false } @@ -145,6 +153,8 @@ async function attemptFetch( thoughtSignature, }) + debugLog(`[REQ] streaming=${transformed.streaming}, url=${transformed.url}`) + const response = await fetch(transformed.url, { method: init.method || "POST", headers: transformed.headers, @@ -152,7 +162,11 @@ async function attemptFetch( signal: init.signal, }) - if (!response.ok && isRetryableError(response.status)) { + debugLog( + `[RESP] status=${response.status} content-type=${response.headers.get("content-type") ?? ""} url=${response.url}` + ) + + if (!response.ok && (await isRetryableResponse(response))) { debugLog(`Endpoint failed: ${endpoint} (status: ${response.status}), trying next`) return null } @@ -223,41 +237,36 @@ async function transformResponseWithThinking( result = await transformResponse(response) } + if (streaming) { + return result.response + } + try { const text = await result.response.clone().text() debugLog(`[TSIG][RESP] Response text length: ${text.length}`) - if (streaming) { - const signature = extractSignatureFromSsePayload(text) - debugLog(`[TSIG][RESP] SSE signature extracted: ${signature ? "yes" : "no"}`) - if (signature) { - setThoughtSignature(fetchInstanceId, signature) - debugLog(`[TSIG][STORE] Stored signature for ${fetchInstanceId}: ${signature.substring(0, 30)}...`) - } + const parsed = JSON.parse(text) as GeminiResponseBody + debugLog(`[TSIG][RESP] Parsed keys: ${Object.keys(parsed).join(", ")}`) + debugLog(`[TSIG][RESP] Has candidates: ${!!parsed.candidates}, count: ${parsed.candidates?.length ?? 0}`) + + const signature = extractSignatureFromResponse(parsed) + debugLog(`[TSIG][RESP] Signature extracted: ${signature ? signature.substring(0, 30) + "..." : "NONE"}`) + if (signature) { + setThoughtSignature(fetchInstanceId, signature) + debugLog(`[TSIG][STORE] Stored signature for ${fetchInstanceId}`) } else { - const parsed = JSON.parse(text) as GeminiResponseBody - debugLog(`[TSIG][RESP] Parsed keys: ${Object.keys(parsed).join(", ")}`) - debugLog(`[TSIG][RESP] Has candidates: ${!!parsed.candidates}, count: ${parsed.candidates?.length ?? 0}`) + debugLog(`[TSIG][WARN] No signature found in response!`) + } - const signature = extractSignatureFromResponse(parsed) - debugLog(`[TSIG][RESP] Signature extracted: ${signature ? signature.substring(0, 30) + "..." : "NONE"}`) - if (signature) { - setThoughtSignature(fetchInstanceId, signature) - debugLog(`[TSIG][STORE] Stored signature for ${fetchInstanceId}`) - } else { - debugLog(`[TSIG][WARN] No signature found in response!`) - } - - if (shouldIncludeThinking(modelName)) { - const thinkingResult = extractThinkingBlocks(parsed) - if (thinkingResult.hasThinking) { - const transformed = transformResponseThinking(parsed) - return new Response(JSON.stringify(transformed), { - status: result.response.status, - statusText: result.response.statusText, - headers: result.response.headers, - }) - } + if (shouldIncludeThinking(modelName)) { + const thinkingResult = extractThinkingBlocks(parsed) + if (thinkingResult.hasThinking) { + const transformed = transformResponseThinking(parsed) + return new Response(JSON.stringify(transformed), { + status: result.response.status, + statusText: result.response.statusText, + headers: result.response.headers, + }) } } } catch {} diff --git a/src/auth/antigravity/response.ts b/src/auth/antigravity/response.ts index 6b73c89..0a8fa68 100644 --- a/src/auth/antigravity/response.ts +++ b/src/auth/antigravity/response.ts @@ -339,31 +339,39 @@ export function transformStreamingPayload(payload: string): string { .join("\n") } +function createSseTransformStream(): TransformStream { + const decoder = new TextDecoder() + const encoder = new TextEncoder() + let buffer = "" + + return new TransformStream({ + transform(chunk, controller) { + buffer += decoder.decode(chunk, { stream: true }) + const lines = buffer.split("\n") + buffer = lines.pop() || "" + + for (const line of lines) { + const transformed = transformSseLine(line) + controller.enqueue(encoder.encode(transformed + "\n")) + } + }, + flush(controller) { + if (buffer) { + const transformed = transformSseLine(buffer) + controller.enqueue(encoder.encode(transformed)) + } + }, + }) +} + /** * Transforms a streaming SSE response from Antigravity to OpenAI format. * - * **⚠️ CURRENT IMPLEMENTATION: BUFFERING** - * This implementation reads the entire stream into memory before transforming. - * While functional, it does not preserve true streaming characteristics: - * - Blocks until entire response is received - * - Consumes memory proportional to response size - * - Increases Time-To-First-Byte (TTFB) - * - * **TODO: Future Enhancement** - * Implement true streaming using ReadableStream transformation: - * - Parse SSE chunks incrementally - * - Transform and yield chunks as they arrive - * - Reduce memory footprint and TTFB - * - * For streaming responses (current buffered approach): - * - Unwraps the `response` field from each SSE event - * - Returns transformed SSE text as new Response - * - Extracts usage metadata from headers - * - * Note: Does NOT handle thinking block extraction (Task 10) + * Uses TransformStream to process SSE chunks incrementally as they arrive. + * Each line is transformed immediately and yielded to the client. * * @param response - The SSE response from Antigravity API - * @returns TransformResult with transformed response and metadata + * @returns TransformResult with transformed streaming response */ export async function transformStreamingResponse(response: Response): Promise { const headers = new Headers(response.headers) @@ -402,7 +410,8 @@ export async function transformStreamingResponse(response: Response): Promise