diff --git a/src/config/schema.ts b/src/config/schema.ts index 4b707d7..c26c188 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -83,6 +83,7 @@ export const HookNameSchema = z.enum([ "start-work", "sisyphus-orchestrator", "todo-codebase-compaction", + "usage-logging", ]) export const BuiltinCommandNameSchema = z.enum([ diff --git a/src/hooks/index.ts b/src/hooks/index.ts index 617670c..d0107dd 100644 --- a/src/hooks/index.ts +++ b/src/hooks/index.ts @@ -30,3 +30,4 @@ export { createTaskResumeInfoHook } from "./task-resume-info"; export { createStartWorkHook } from "./start-work"; export { createSisyphusOrchestratorHook } from "./sisyphus-orchestrator"; export { createTodoCodebaseCompactionInjector, createCustomCompactionHook } from "./todo-codebase-compaction"; +export { createUsageLoggingHook } from "./usage-logging"; diff --git a/src/hooks/usage-logging/index.ts b/src/hooks/usage-logging/index.ts new file mode 100644 index 0000000..2f7346a --- /dev/null +++ b/src/hooks/usage-logging/index.ts @@ -0,0 +1,229 @@ +import { createHash } from 'crypto'; + +interface LogEvent { + timestamp: string; + stack_name: string; + session_id: string; + event_type: string; + data: Record; +} + +interface UsageLoggingOptions { + ingestUrl?: string; + stackName?: string; + batchSize?: number; + flushIntervalMs?: number; +} + +const DEFAULT_INGEST_URL = process.env.LOG_INGEST_URL || 'http://10.100.0.20:3102/ingest'; +const DEFAULT_STACK_NAME = process.env.STACK_NAME || 'unknown'; + +function hashContent(content: string): string { + return createHash('sha256').update(content).digest('hex').substring(0, 16); +} + +function getWordCount(content: string): number { + return content.split(/\s+/).filter(Boolean).length; +} + +export function createUsageLoggingHook(options: UsageLoggingOptions = {}) { + const { + ingestUrl = DEFAULT_INGEST_URL, + stackName = DEFAULT_STACK_NAME, + batchSize = 10, + flushIntervalMs = 5000, + } = options; + + const enabled = process.env.USAGE_LOGGING_ENABLED !== 'false'; + if (!enabled) { + return { event: async () => {} }; + } + + const eventBuffer: LogEvent[] = []; + let flushTimer: ReturnType | null = null; + const sessionStats = new Map(); + + async function flushEvents(): Promise { + if (eventBuffer.length === 0) return; + + const eventsToSend = [...eventBuffer]; + eventBuffer.length = 0; + + try { + const response = await fetch(`${ingestUrl}/batch`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(eventsToSend), + signal: AbortSignal.timeout(5000) + }); + + if (!response.ok) { + console.error(`[UsageLogging] Failed to send events: ${response.status}`); + } + } catch (error) { + console.error('[UsageLogging] Error sending events:', error); + } + } + + function queueEvent(sessionId: string, eventType: string, data: Record): void { + eventBuffer.push({ + timestamp: new Date().toISOString(), + stack_name: stackName, + session_id: sessionId, + event_type: eventType, + data + }); + + if (eventBuffer.length >= batchSize) { + flushEvents(); + } else if (!flushTimer) { + flushTimer = setTimeout(() => { + flushTimer = null; + flushEvents(); + }, flushIntervalMs); + } + } + + function getOrCreateSessionStats(sessionId: string) { + if (!sessionStats.has(sessionId)) { + sessionStats.set(sessionId, { + startTime: Date.now(), + messageCount: 0, + toolUseCount: 0, + tokensIn: 0, + tokensOut: 0 + }); + } + return sessionStats.get(sessionId)!; + } + + return { + event: async (input: { event: { type: string; properties?: Record } }) => { + const { event } = input; + const props = event.properties || {}; + + try { + switch (event.type) { + case 'session.created': { + const info = props.info as { id?: string } | undefined; + if (info?.id) { + getOrCreateSessionStats(info.id); + queueEvent(info.id, 'session_start', { + start_time: Date.now() + }); + } + break; + } + + case 'session.deleted': { + const info = props.info as { id?: string } | undefined; + if (info?.id) { + const stats = sessionStats.get(info.id); + if (stats) { + queueEvent(info.id, 'session_end', { + duration_ms: Date.now() - stats.startTime, + total_messages: stats.messageCount, + total_tool_uses: stats.toolUseCount, + total_tokens_in: stats.tokensIn, + total_tokens_out: stats.tokensOut + }); + sessionStats.delete(info.id); + } + } + break; + } + + case 'message.created': { + const sessionID = props.sessionID as string | undefined; + const message = props.message as { role?: string; content?: string } | undefined; + if (sessionID && message) { + const stats = getOrCreateSessionStats(sessionID); + stats.messageCount++; + + const content = typeof message.content === 'string' + ? message.content + : JSON.stringify(message.content || ''); + + queueEvent(sessionID, 'message', { + role: message.role || 'unknown', + content_hash: hashContent(content), + content_length: content.length, + word_count: getWordCount(content), + message_number: stats.messageCount + }); + } + break; + } + + case 'tool.started': { + const sessionID = props.sessionID as string | undefined; + const tool = props.tool as { name?: string } | undefined; + if (sessionID && tool?.name) { + queueEvent(sessionID, 'tool_start', { + tool: tool.name, + start_time: Date.now() + }); + } + break; + } + + case 'tool.completed': { + const sessionID = props.sessionID as string | undefined; + const tool = props.tool as { name?: string } | undefined; + const result = props.result as { error?: unknown } | undefined; + if (sessionID && tool?.name) { + const stats = getOrCreateSessionStats(sessionID); + stats.toolUseCount++; + + queueEvent(sessionID, 'tool_use', { + tool: tool.name, + success: !result?.error, + error_message: result?.error ? String(result.error) : undefined, + tool_use_number: stats.toolUseCount + }); + } + break; + } + + case 'session.error': { + const sessionID = props.sessionID as string | undefined; + const error = props.error as { message?: string; code?: string } | undefined; + if (sessionID) { + queueEvent(sessionID, 'error', { + error_message: error?.message || 'Unknown error', + error_code: error?.code + }); + } + break; + } + + case 'tokens.used': { + const sessionID = props.sessionID as string | undefined; + const usage = props.usage as { input_tokens?: number; output_tokens?: number } | undefined; + if (sessionID && usage) { + const stats = getOrCreateSessionStats(sessionID); + stats.tokensIn += usage.input_tokens || 0; + stats.tokensOut += usage.output_tokens || 0; + + queueEvent(sessionID, 'tokens', { + tokens_in: usage.input_tokens, + tokens_out: usage.output_tokens, + total_tokens_in: stats.tokensIn, + total_tokens_out: stats.tokensOut + }); + } + break; + } + } + } catch (error) { + console.error('[UsageLogging] Error processing event:', error); + } + } + }; +} diff --git a/src/index.ts b/src/index.ts index e908bfd..24ed1d1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -31,6 +31,7 @@ import { createStartWorkHook, createSisyphusOrchestratorHook, createPrometheusMdOnlyHook, + createUsageLoggingHook, } from "./hooks"; import { contextCollector, @@ -212,6 +213,13 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => { ? createPrometheusMdOnlyHook(ctx) : null; + const usageLogging = isHookEnabled("usage-logging") + ? createUsageLoggingHook({ + stackName: process.env.STACK_NAME, + ingestUrl: process.env.LOG_INGEST_URL, + }) + : null; + const taskResumeInfo = createTaskResumeInfoHook(); const backgroundManager = new BackgroundManager(ctx); @@ -411,6 +419,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => { await interactiveBashSession?.event(input); await ralphLoop?.event(input); await sisyphusOrchestrator?.handler(input); + await usageLogging?.event(input); const { event } = input; const props = event.properties as Record | undefined;