diff --git a/src/auth/antigravity/fetch.ts b/src/auth/antigravity/fetch.ts index 80f6f2f..7d6a58c 100644 --- a/src/auth/antigravity/fetch.ts +++ b/src/auth/antigravity/fetch.ts @@ -21,9 +21,19 @@ import { ANTIGRAVITY_ENDPOINT_FALLBACKS } from "./constants" import { fetchProjectContext, clearProjectContextCache } from "./project" import { isTokenExpired, refreshAccessToken, parseStoredToken, formatTokenForStorage } from "./token" import { transformRequest } from "./request" -import { transformResponse, transformStreamingResponse, isStreamingResponse } from "./response" +import { + transformResponse, + transformStreamingResponse, + isStreamingResponse, + extractSignatureFromSsePayload, +} from "./response" import { normalizeToolsForGemini, type OpenAITool } from "./tools" import { extractThinkingBlocks, shouldIncludeThinking, transformResponseThinking } from "./thinking" +import { + getThoughtSignature, + setThoughtSignature, + getOrCreateSessionId, +} from "./thought-signature-store" import type { AntigravityTokens } from "./types" /** @@ -65,14 +75,22 @@ function isRetryableError(status: number): boolean { return false } -async function attemptFetch( - endpoint: string, - url: string, - init: RequestInit, - accessToken: string, - projectId: string, +interface AttemptFetchOptions { + endpoint: string + url: string + init: RequestInit + accessToken: string + projectId: string + sessionId: string modelName?: string + thoughtSignature?: string +} + +async function attemptFetch( + options: AttemptFetchOptions ): Promise { + const { endpoint, url, init, accessToken, projectId, sessionId, modelName, thoughtSignature } = + options debugLog(`Trying endpoint: ${endpoint}`) try { @@ -99,14 +117,16 @@ async function attemptFetch( } } - const transformed = transformRequest( + const transformed = transformRequest({ url, - parsedBody, + body: parsedBody, accessToken, projectId, + sessionId, modelName, - endpoint - ) + endpointOverride: endpoint, + thoughtSignature, + }) const response = await fetch(transformed.url, { method: init.method || "POST", @@ -129,16 +149,56 @@ async function attemptFetch( } } -/** - * Transform response with thinking extraction if applicable - */ +interface GeminiResponsePart { + thoughtSignature?: string + thought_signature?: string + functionCall?: Record + text?: string + [key: string]: unknown +} + +interface GeminiResponseCandidate { + content?: { + parts?: GeminiResponsePart[] + [key: string]: unknown + } + [key: string]: unknown +} + +interface GeminiResponseBody { + candidates?: GeminiResponseCandidate[] + [key: string]: unknown +} + +function extractSignatureFromResponse(parsed: GeminiResponseBody): string | undefined { + if (!parsed.candidates || !Array.isArray(parsed.candidates)) { + return undefined + } + + for (const candidate of parsed.candidates) { + const parts = candidate.content?.parts + if (!parts || !Array.isArray(parts)) { + continue + } + + for (const part of parts) { + const sig = part.thoughtSignature || part.thought_signature + if (sig && typeof sig === "string") { + return sig + } + } + } + + return undefined +} + async function transformResponseWithThinking( response: Response, - modelName: string + modelName: string, + fetchInstanceId: string ): Promise { const streaming = isStreamingResponse(response) - // Transform response based on streaming mode let result if (streaming) { result = await transformStreamingResponse(response) @@ -146,26 +206,37 @@ async function transformResponseWithThinking( result = await transformResponse(response) } - // Apply thinking extraction for high-thinking models - if (!streaming && shouldIncludeThinking(modelName)) { - try { - const text = await result.response.clone().text() - const parsed = JSON.parse(text) as Record + try { + const text = await result.response.clone().text() - // Extract and transform thinking blocks - const thinkingResult = extractThinkingBlocks(parsed) - if (thinkingResult.hasThinking) { - const transformed = transformResponseThinking(parsed) - return new Response(JSON.stringify(transformed), { - status: result.response.status, - statusText: result.response.statusText, - headers: result.response.headers, - }) + if (streaming) { + const signature = extractSignatureFromSsePayload(text) + if (signature) { + setThoughtSignature(fetchInstanceId, signature) + debugLog(`[STREAMING] Stored thought signature for instance ${fetchInstanceId}`) + } + } else { + const parsed = JSON.parse(text) as GeminiResponseBody + + const signature = extractSignatureFromResponse(parsed) + if (signature) { + setThoughtSignature(fetchInstanceId, signature) + debugLog(`Stored thought signature for instance ${fetchInstanceId}`) + } + + if (shouldIncludeThinking(modelName)) { + const thinkingResult = extractThinkingBlocks(parsed) + if (thinkingResult.hasThinking) { + const transformed = transformResponseThinking(parsed) + return new Response(JSON.stringify(transformed), { + status: result.response.status, + statusText: result.response.statusText, + headers: result.response.headers, + }) + } } - } catch { - // If thinking extraction fails, return original transformed response } - } + } catch {} return result.response } @@ -207,9 +278,9 @@ export function createAntigravityFetch( clientId?: string, clientSecret?: string ): (url: string, init?: RequestInit) => Promise { - // Cache for current token state let cachedTokens: AntigravityTokens | null = null let cachedProjectId: string | null = null + const fetchInstanceId = crypto.randomUUID() return async (url: string, init: RequestInit = {}): Promise => { debugLog(`Intercepting request to: ${url}`) @@ -304,18 +375,23 @@ export function createAntigravityFetch( } const maxEndpoints = Math.min(ANTIGRAVITY_ENDPOINT_FALLBACKS.length, 3) + const sessionId = getOrCreateSessionId(fetchInstanceId) + const thoughtSignature = getThoughtSignature(fetchInstanceId) + debugLog(`[TSIG][GET] sessionId=${sessionId}, signature=${thoughtSignature ? thoughtSignature.substring(0, 20) + "..." : "none"}`) for (let i = 0; i < maxEndpoints; i++) { const endpoint = ANTIGRAVITY_ENDPOINT_FALLBACKS[i] - const response = await attemptFetch( + const response = await attemptFetch({ endpoint, url, init, - cachedTokens.access_token, + accessToken: cachedTokens.access_token, projectId, - modelName - ) + sessionId, + modelName, + thoughtSignature, + }) if (response === "pass-through") { debugLog("Non-string body detected, passing through with auth headers") @@ -328,7 +404,12 @@ export function createAntigravityFetch( if (response) { debugLog(`Success with endpoint: ${endpoint}`) - return transformResponseWithThinking(response, modelName || "") + const transformedResponse = await transformResponseWithThinking( + response, + modelName || "", + fetchInstanceId + ) + return transformedResponse } } diff --git a/src/auth/antigravity/index.ts b/src/auth/antigravity/index.ts index ab61965..8d2d4d3 100644 --- a/src/auth/antigravity/index.ts +++ b/src/auth/antigravity/index.ts @@ -1,5 +1,3 @@ -// Antigravity auth module barrel export - export * from "./types" export * from "./constants" export * from "./oauth" @@ -9,5 +7,6 @@ export * from "./request" export * from "./response" export * from "./tools" export * from "./thinking" +export * from "./thought-signature-store" export * from "./fetch" export * from "./plugin" diff --git a/src/auth/antigravity/request.ts b/src/auth/antigravity/request.ts index 72bcd46..dda38cd 100644 --- a/src/auth/antigravity/request.ts +++ b/src/auth/antigravity/request.ts @@ -132,15 +132,11 @@ function generateRequestId(): string { return `agent-${crypto.randomUUID()}` } -function generateSessionId(): string { - const n = Math.floor(Math.random() * 9_000_000_000_000_000_000) - return `-${n}` -} - export function wrapRequestBody( body: Record, projectId: string, - modelName: string + modelName: string, + sessionId: string ): AntigravityRequestBody { const requestPayload = { ...body } delete requestPayload.model @@ -152,11 +148,69 @@ export function wrapRequestBody( requestId: generateRequestId(), request: { ...requestPayload, - sessionId: generateSessionId(), + sessionId, }, } } +interface ContentPart { + functionCall?: Record + thoughtSignature?: string + [key: string]: unknown +} + +interface ContentBlock { + role?: string + parts?: ContentPart[] + [key: string]: unknown +} + +function debugLog(message: string): void { + if (process.env.ANTIGRAVITY_DEBUG === "1") { + console.log(`[antigravity-request] ${message}`) + } +} + +export function injectThoughtSignatureIntoFunctionCalls( + body: Record, + signature: string | undefined +): Record { + debugLog(`[TSIG][INJECT] signature=${signature ? signature.substring(0, 20) + "..." : "none"}`) + + if (!signature) { + return body + } + + const contents = body.contents as ContentBlock[] | undefined + if (!contents || !Array.isArray(contents)) { + debugLog(`[TSIG][INJECT] no contents array found`) + return body + } + + let injectedCount = 0 + const modifiedContents = contents.map((content) => { + if (!content.parts || !Array.isArray(content.parts)) { + return content + } + + const modifiedParts = content.parts.map((part) => { + if (part.functionCall && !part.thoughtSignature) { + injectedCount++ + return { + ...part, + thoughtSignature: signature, + } + } + return part + }) + + return { ...content, parts: modifiedParts } + }) + + debugLog(`[TSIG][INJECT] injected signature into ${injectedCount} functionCall(s)`) + return { ...body, contents: modifiedContents } +} + /** * Detect if request is for streaming. * Checks both action name and request body for stream flag. @@ -183,48 +237,45 @@ export function isStreamingRequest( return false } -/** - * Transform an OpenAI-format request to Antigravity format. - * This is the main transformation function used by the fetch interceptor. - * - * @param url - Original request URL - * @param body - Original request body (OpenAI format) - * @param accessToken - OAuth access token for Authorization - * @param projectId - GCP project ID for wrapper - * @param modelName - Model name to use (overrides body.model if provided) - * @param endpointOverride - Optional endpoint override (uses first fallback if not provided) - * @returns Transformed request with URL, headers, body, and streaming flag - */ -export function transformRequest( - url: string, - body: Record, - accessToken: string, - projectId: string, - modelName?: string, +export interface TransformRequestOptions { + url: string + body: Record + accessToken: string + projectId: string + sessionId: string + modelName?: string endpointOverride?: string -): TransformedRequest { - // Determine model name (parameter override > body > URL) + thoughtSignature?: string +} + +export function transformRequest(options: TransformRequestOptions): TransformedRequest { + const { + url, + body, + accessToken, + projectId, + sessionId, + modelName, + endpointOverride, + thoughtSignature, + } = options + const effectiveModel = modelName || extractModelFromBody(body) || extractModelFromUrl(url) || "gemini-3-pro-preview" - // Determine if streaming const streaming = isStreamingRequest(url, body) - - // Determine action (default to appropriate generate action) const action = streaming ? "streamGenerateContent" : "generateContent" - // Build URL const endpoint = endpointOverride || getDefaultEndpoint() const transformedUrl = buildAntigravityUrl(endpoint, action, streaming) - // Build headers const headers = buildRequestHeaders(accessToken) if (streaming) { headers["Accept"] = "text/event-stream" } - // Wrap body in Antigravity format - const wrappedBody = wrapRequestBody(body, projectId, effectiveModel) + const bodyWithSignature = injectThoughtSignatureIntoFunctionCalls(body, thoughtSignature) + const wrappedBody = wrapRequestBody(bodyWithSignature, projectId, effectiveModel, sessionId) return { url: transformedUrl, diff --git a/src/auth/antigravity/response.ts b/src/auth/antigravity/response.ts index 60aedeb..6b73c89 100644 --- a/src/auth/antigravity/response.ts +++ b/src/auth/antigravity/response.ts @@ -465,6 +465,61 @@ export function isStreamingResponse(response: Response): boolean { return contentType.includes("text/event-stream") } +/** + * Extract thought signature from SSE payload text + * + * Looks for thoughtSignature in SSE events: + * data: { "response": { "candidates": [{ "content": { "parts": [{ "thoughtSignature": "..." }] } }] } } + * + * Returns the last found signature (most recent in the stream). + * + * @param payload - SSE payload text + * @returns Last thought signature if found + */ +export function extractSignatureFromSsePayload(payload: string): string | undefined { + const lines = payload.split("\n") + let lastSignature: string | undefined + + for (const line of lines) { + if (!line.startsWith("data:")) { + continue + } + + const json = line.slice(5).trim() + if (!json || json === "[DONE]") { + continue + } + + try { + const parsed = JSON.parse(json) as Record + + // Check in response wrapper (Antigravity format) + const response = (parsed.response || parsed) as Record + const candidates = response.candidates as Array> | undefined + + if (candidates && Array.isArray(candidates)) { + for (const candidate of candidates) { + const content = candidate.content as Record | undefined + const parts = content?.parts as Array> | undefined + + if (parts && Array.isArray(parts)) { + for (const part of parts) { + const sig = (part.thoughtSignature || part.thought_signature) as string | undefined + if (sig && typeof sig === "string") { + lastSignature = sig + } + } + } + } + } + } catch { + // Continue to next line if parsing fails + } + } + + return lastSignature +} + /** * Extract usage from SSE payload text * diff --git a/src/auth/antigravity/thought-signature-store.ts b/src/auth/antigravity/thought-signature-store.ts new file mode 100644 index 0000000..17b8804 --- /dev/null +++ b/src/auth/antigravity/thought-signature-store.ts @@ -0,0 +1,97 @@ +/** + * Thought Signature Store + * + * Stores and retrieves thought signatures for multi-turn conversations. + * Gemini 3 Pro requires thought_signature on function call content blocks + * in subsequent requests to maintain reasoning continuity. + * + * Key responsibilities: + * - Store the latest thought signature per session + * - Provide signature for injection into function call requests + * - Clear signatures when sessions end + */ + +/** + * In-memory store for thought signatures indexed by session ID + */ +const signatureStore = new Map() + +/** + * In-memory store for session IDs per fetch instance + * Used to maintain consistent sessionId across multi-turn conversations + */ +const sessionIdStore = new Map() + +/** + * Store a thought signature for a session + * + * @param sessionKey - Unique session identifier (typically fetch instance ID) + * @param signature - The thought signature from model response + */ +export function setThoughtSignature(sessionKey: string, signature: string): void { + if (sessionKey && signature) { + signatureStore.set(sessionKey, signature) + } +} + +/** + * Retrieve the stored thought signature for a session + * + * @param sessionKey - Unique session identifier + * @returns The stored signature or undefined if not found + */ +export function getThoughtSignature(sessionKey: string): string | undefined { + return signatureStore.get(sessionKey) +} + +/** + * Clear the thought signature for a session + * + * @param sessionKey - Unique session identifier + */ +export function clearThoughtSignature(sessionKey: string): void { + signatureStore.delete(sessionKey) +} + +/** + * Store or retrieve a persistent session ID for a fetch instance + * + * @param fetchInstanceId - Unique identifier for the fetch instance + * @param sessionId - Optional session ID to store (if not provided, returns existing or generates new) + * @returns The session ID for this fetch instance + */ +export function getOrCreateSessionId(fetchInstanceId: string, sessionId?: string): string { + if (sessionId) { + sessionIdStore.set(fetchInstanceId, sessionId) + return sessionId + } + + const existing = sessionIdStore.get(fetchInstanceId) + if (existing) { + return existing + } + + const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER) + const newSessionId = `-${n}` + sessionIdStore.set(fetchInstanceId, newSessionId) + return newSessionId +} + +/** + * Clear the session ID for a fetch instance + * + * @param fetchInstanceId - Unique identifier for the fetch instance + */ +export function clearSessionId(fetchInstanceId: string): void { + sessionIdStore.delete(fetchInstanceId) +} + +/** + * Clear all stored data for a fetch instance (signature + session ID) + * + * @param fetchInstanceId - Unique identifier for the fetch instance + */ +export function clearFetchInstanceData(fetchInstanceId: string): void { + signatureStore.delete(fetchInstanceId) + sessionIdStore.delete(fetchInstanceId) +}