import { Hono } from 'hono'; import { cors } from 'hono/cors'; import { logger } from 'hono/logger'; import { serve } from '@hono/node-server'; import { Registry, Counter, Histogram, Gauge, collectDefaultMetrics } from 'prom-client'; const app = new Hono(); const register = new Registry(); collectDefaultMetrics({ register }); const metrics = { eventsReceived: new Counter({ name: 'log_ingest_events_total', help: 'Total events received', labelNames: ['stack_name', 'event_type'], registers: [register] }), eventProcessingDuration: new Histogram({ name: 'log_ingest_processing_duration_seconds', help: 'Event processing duration', labelNames: ['stack_name'], buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1], registers: [register] }), lokiPushErrors: new Counter({ name: 'log_ingest_loki_errors_total', help: 'Loki push errors', registers: [register] }), activeStacks: new Gauge({ name: 'log_ingest_active_stacks', help: 'Number of active stacks sending events', registers: [register] }) }; const LOKI_URL = process.env.LOKI_URL || 'http://loki:3100'; interface LogEvent { timestamp?: string; stack_name: string; session_id?: string; event_type: 'session_start' | 'session_end' | 'message' | 'tool_use' | 'error' | 'mcp_connect' | 'mcp_disconnect'; data?: { role?: 'user' | 'assistant' | 'system'; model?: string; agent?: string; tool?: string; tokens_in?: number; tokens_out?: number; duration_ms?: number; success?: boolean; error_code?: string; error_message?: string; content_length?: number; content_hash?: string; mcp_server?: string; }; } const activeStacksSet = new Set(); async function pushToLoki(events: LogEvent[]): Promise { const streams: Record; values: [string, string][] }> = {}; for (const event of events) { const labels = { job: 'ai-stack-events', stack_name: event.stack_name, event_type: event.event_type, ...(event.session_id && { session_id: event.session_id }), ...(event.data?.model && { model: event.data.model }), ...(event.data?.agent && { agent: event.data.agent }), ...(event.data?.tool && { tool: event.data.tool }) }; const labelKey = JSON.stringify(labels); if (!streams[labelKey]) { streams[labelKey] = { stream: labels, values: [] }; } const timestamp = event.timestamp || new Date().toISOString(); const nanoseconds = BigInt(new Date(timestamp).getTime()) * BigInt(1_000_000); streams[labelKey].values.push([ nanoseconds.toString(), JSON.stringify(event) ]); } const payload = { streams: Object.values(streams) }; const response = await fetch(`${LOKI_URL}/loki/api/v1/push`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload) }); if (!response.ok) { const text = await response.text(); throw new Error(`Loki push failed: ${response.status} ${text}`); } } app.use('*', cors()); app.use('*', logger()); app.get('/health', (c) => { return c.json({ status: 'healthy', timestamp: new Date().toISOString() }); }); app.get('/metrics', async (c) => { metrics.activeStacks.set(activeStacksSet.size); c.header('Content-Type', register.contentType); return c.text(await register.metrics()); }); app.post('/ingest', async (c) => { const startTime = Date.now(); try { const body = await c.req.json(); const events: LogEvent[] = Array.isArray(body) ? body : [body]; for (const event of events) { if (!event.stack_name || !event.event_type) { return c.json({ error: 'Missing required fields: stack_name, event_type' }, 400); } activeStacksSet.add(event.stack_name); metrics.eventsReceived.inc({ stack_name: event.stack_name, event_type: event.event_type }); } await pushToLoki(events); const duration = (Date.now() - startTime) / 1000; for (const event of events) { metrics.eventProcessingDuration.observe({ stack_name: event.stack_name }, duration); } return c.json({ success: true, count: events.length }); } catch (error) { metrics.lokiPushErrors.inc(); console.error('Ingest error:', error); return c.json({ error: 'Failed to process events', details: String(error) }, 500); } }); app.post('/ingest/batch', async (c) => { const startTime = Date.now(); try { const body = await c.req.json(); if (!Array.isArray(body)) { return c.json({ error: 'Expected array of events' }, 400); } const events: LogEvent[] = body; for (const event of events) { if (!event.stack_name || !event.event_type) { continue; } activeStacksSet.add(event.stack_name); metrics.eventsReceived.inc({ stack_name: event.stack_name, event_type: event.event_type }); } await pushToLoki(events); const duration = (Date.now() - startTime) / 1000; metrics.eventProcessingDuration.observe({ stack_name: 'batch' }, duration); return c.json({ success: true, count: events.length }); } catch (error) { metrics.lokiPushErrors.inc(); console.error('Batch ingest error:', error); return c.json({ error: 'Failed to process batch', details: String(error) }, 500); } }); const port = parseInt(process.env.PORT || '3000'); console.log(`Log ingest service starting on port ${port}`); serve({ fetch: app.fetch, port });