diff --git a/assets/oh-my-opencode.schema.json b/assets/oh-my-opencode.schema.json index bafe370..9b47beb 100644 --- a/assets/oh-my-opencode.schema.json +++ b/assets/oh-my-opencode.schema.json @@ -1658,6 +1658,35 @@ "type": "string" } } + }, + "background_task": { + "type": "object", + "properties": { + "defaultConcurrency": { + "type": "number", + "minimum": 1 + }, + "providerConcurrency": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": { + "type": "number", + "minimum": 1 + } + }, + "modelConcurrency": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": { + "type": "number", + "minimum": 1 + } + } + } } } } \ No newline at end of file diff --git a/src/config/schema.ts b/src/config/schema.ts index 6f2097c..8984284 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -232,6 +232,12 @@ export const RalphLoopConfigSchema = z.object({ state_dir: z.string().optional(), }) +export const BackgroundTaskConfigSchema = z.object({ + defaultConcurrency: z.number().min(1).optional(), + providerConcurrency: z.record(z.string(), z.number().min(1)).optional(), + modelConcurrency: z.record(z.string(), z.number().min(1)).optional(), +}) + export const OhMyOpenCodeConfigSchema = z.object({ $schema: z.string().optional(), disabled_mcps: z.array(AnyMcpNameSchema).optional(), @@ -248,11 +254,13 @@ export const OhMyOpenCodeConfigSchema = z.object({ auto_update: z.boolean().optional(), skills: SkillsConfigSchema.optional(), ralph_loop: RalphLoopConfigSchema.optional(), + background_task: BackgroundTaskConfigSchema.optional(), }) export type OhMyOpenCodeConfig = z.infer export type AgentOverrideConfig = z.infer export type AgentOverrides = z.infer +export type BackgroundTaskConfig = z.infer export type AgentName = z.infer export type HookName = z.infer export type BuiltinCommandName = z.infer diff --git a/src/features/background-agent/concurrency.test.ts b/src/features/background-agent/concurrency.test.ts new file mode 100644 index 0000000..677440e --- /dev/null +++ b/src/features/background-agent/concurrency.test.ts @@ -0,0 +1,351 @@ +import { describe, test, expect, beforeEach } from "bun:test" +import { ConcurrencyManager } from "./concurrency" +import type { BackgroundTaskConfig } from "../../config/schema" + +describe("ConcurrencyManager.getConcurrencyLimit", () => { + test("should return model-specific limit when modelConcurrency is set", () => { + // #given + const config: BackgroundTaskConfig = { + modelConcurrency: { "anthropic/claude-sonnet-4-5": 5 } + } + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + + // #then + expect(limit).toBe(5) + }) + + test("should return provider limit when providerConcurrency is set for model provider", () => { + // #given + const config: BackgroundTaskConfig = { + providerConcurrency: { anthropic: 3 } + } + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + + // #then + expect(limit).toBe(3) + }) + + test("should return provider limit even when modelConcurrency exists but doesn't match", () => { + // #given + const config: BackgroundTaskConfig = { + modelConcurrency: { "google/gemini-3-pro": 5 }, + providerConcurrency: { anthropic: 3 } + } + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + + // #then + expect(limit).toBe(3) + }) + + test("should return default limit when defaultConcurrency is set", () => { + // #given + const config: BackgroundTaskConfig = { + defaultConcurrency: 2 + } + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + + // #then + expect(limit).toBe(2) + }) + + test("should return default 5 when no config provided", () => { + // #given + const manager = new ConcurrencyManager() + + // #when + const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + + // #then + expect(limit).toBe(5) + }) + + test("should return default 5 when config exists but no concurrency settings", () => { + // #given + const config: BackgroundTaskConfig = {} + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + + // #then + expect(limit).toBe(5) + }) + + test("should prioritize model-specific over provider-specific over default", () => { + // #given + const config: BackgroundTaskConfig = { + modelConcurrency: { "anthropic/claude-sonnet-4-5": 10 }, + providerConcurrency: { anthropic: 5 }, + defaultConcurrency: 2 + } + const manager = new ConcurrencyManager(config) + + // #when + const modelLimit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + const providerLimit = manager.getConcurrencyLimit("anthropic/claude-opus-4-5") + const defaultLimit = manager.getConcurrencyLimit("google/gemini-3-pro") + + // #then + expect(modelLimit).toBe(10) + expect(providerLimit).toBe(5) + expect(defaultLimit).toBe(2) + }) + + test("should handle models without provider part", () => { + // #given + const config: BackgroundTaskConfig = { + providerConcurrency: { "custom-model": 4 } + } + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("custom-model") + + // #then + expect(limit).toBe(4) + }) + + test("should return Infinity when defaultConcurrency is 0", () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 0 } + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("any-model") + + // #then + expect(limit).toBe(Infinity) + }) + + test("should return Infinity when providerConcurrency is 0", () => { + // #given + const config: BackgroundTaskConfig = { + providerConcurrency: { anthropic: 0 } + } + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + + // #then + expect(limit).toBe(Infinity) + }) + + test("should return Infinity when modelConcurrency is 0", () => { + // #given + const config: BackgroundTaskConfig = { + modelConcurrency: { "anthropic/claude-sonnet-4-5": 0 } + } + const manager = new ConcurrencyManager(config) + + // #when + const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5") + + // #then + expect(limit).toBe(Infinity) + }) +}) + +describe("ConcurrencyManager.acquire/release", () => { + let manager: ConcurrencyManager + + beforeEach(() => { + // #given + const config: BackgroundTaskConfig = {} + manager = new ConcurrencyManager(config) + }) + + test("should allow acquiring up to limit", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 2 } + manager = new ConcurrencyManager(config) + + // #when + await manager.acquire("model-a") + await manager.acquire("model-a") + + // #then - both resolved without waiting + expect(true).toBe(true) + }) + + test("should allow acquires up to default limit of 5", async () => { + // #given - no config = default limit of 5 + + // #when + await manager.acquire("model-a") + await manager.acquire("model-a") + await manager.acquire("model-a") + await manager.acquire("model-a") + await manager.acquire("model-a") + + // #then - all 5 resolved + expect(true).toBe(true) + }) + + test("should queue when limit reached", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 1 } + manager = new ConcurrencyManager(config) + await manager.acquire("model-a") + + // #when + let resolved = false + const waitPromise = manager.acquire("model-a").then(() => { resolved = true }) + + // Give microtask queue a chance to run + await Promise.resolve() + + // #then - should still be waiting + expect(resolved).toBe(false) + + // #when - release + manager.release("model-a") + await waitPromise + + // #then - now resolved + expect(resolved).toBe(true) + }) + + test("should queue multiple tasks and process in order", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 1 } + manager = new ConcurrencyManager(config) + await manager.acquire("model-a") + + // #when + const order: string[] = [] + const task1 = manager.acquire("model-a").then(() => { order.push("1") }) + const task2 = manager.acquire("model-a").then(() => { order.push("2") }) + const task3 = manager.acquire("model-a").then(() => { order.push("3") }) + + // Give microtask queue a chance to run + await Promise.resolve() + + // #then - none resolved yet + expect(order).toEqual([]) + + // #when - release one at a time + manager.release("model-a") + await task1 + expect(order).toEqual(["1"]) + + manager.release("model-a") + await task2 + expect(order).toEqual(["1", "2"]) + + manager.release("model-a") + await task3 + expect(order).toEqual(["1", "2", "3"]) + }) + + test("should handle independent models separately", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 1 } + manager = new ConcurrencyManager(config) + await manager.acquire("model-a") + + // #when - acquire different model + const resolved = await Promise.race([ + manager.acquire("model-b").then(() => "resolved"), + Promise.resolve("timeout").then(() => "timeout") + ]) + + // #then - different model should resolve immediately + expect(resolved).toBe("resolved") + }) + + test("should allow re-acquiring after release", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 1 } + manager = new ConcurrencyManager(config) + + // #when + await manager.acquire("model-a") + manager.release("model-a") + await manager.acquire("model-a") + + // #then + expect(true).toBe(true) + }) + + test("should handle release when no acquire", () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 2 } + manager = new ConcurrencyManager(config) + + // #when - release without acquire + manager.release("model-a") + + // #then - should not throw + expect(true).toBe(true) + }) + + test("should handle release when no prior acquire", () => { + // #given - default config + + // #when - release without acquire + manager.release("model-a") + + // #then - should not throw + expect(true).toBe(true) + }) + + test("should handle multiple acquires and releases correctly", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 3 } + manager = new ConcurrencyManager(config) + + // #when + await manager.acquire("model-a") + await manager.acquire("model-a") + await manager.acquire("model-a") + + // Release all + manager.release("model-a") + manager.release("model-a") + manager.release("model-a") + + // Should be able to acquire again + await manager.acquire("model-a") + + // #then + expect(true).toBe(true) + }) + + test("should use model-specific limit for acquire", async () => { + // #given + const config: BackgroundTaskConfig = { + modelConcurrency: { "anthropic/claude-sonnet-4-5": 2 }, + defaultConcurrency: 5 + } + manager = new ConcurrencyManager(config) + await manager.acquire("anthropic/claude-sonnet-4-5") + await manager.acquire("anthropic/claude-sonnet-4-5") + + // #when + let resolved = false + const waitPromise = manager.acquire("anthropic/claude-sonnet-4-5").then(() => { resolved = true }) + + // Give microtask queue a chance to run + await Promise.resolve() + + // #then - should be waiting (model-specific limit is 2) + expect(resolved).toBe(false) + + // Cleanup + manager.release("anthropic/claude-sonnet-4-5") + await waitPromise + }) +}) diff --git a/src/features/background-agent/concurrency.ts b/src/features/background-agent/concurrency.ts new file mode 100644 index 0000000..e9d24b8 --- /dev/null +++ b/src/features/background-agent/concurrency.ts @@ -0,0 +1,66 @@ +import type { BackgroundTaskConfig } from "../../config/schema" + +export class ConcurrencyManager { + private config?: BackgroundTaskConfig + private counts: Map = new Map() + private queues: Map void>> = new Map() + + constructor(config?: BackgroundTaskConfig) { + this.config = config + } + + getConcurrencyLimit(model: string): number { + const modelLimit = this.config?.modelConcurrency?.[model] + if (modelLimit !== undefined) { + return modelLimit === 0 ? Infinity : modelLimit + } + const provider = model.split('/')[0] + const providerLimit = this.config?.providerConcurrency?.[provider] + if (providerLimit !== undefined) { + return providerLimit === 0 ? Infinity : providerLimit + } + const defaultLimit = this.config?.defaultConcurrency + if (defaultLimit !== undefined) { + return defaultLimit === 0 ? Infinity : defaultLimit + } + return 5 + } + + async acquire(model: string): Promise { + const limit = this.getConcurrencyLimit(model) + if (limit === Infinity) { + return + } + + const current = this.counts.get(model) ?? 0 + if (current < limit) { + this.counts.set(model, current + 1) + return + } + + return new Promise((resolve) => { + const queue = this.queues.get(model) ?? [] + queue.push(resolve) + this.queues.set(model, queue) + }) + } + + release(model: string): void { + const limit = this.getConcurrencyLimit(model) + if (limit === Infinity) { + return + } + + const queue = this.queues.get(model) + if (queue && queue.length > 0) { + const next = queue.shift()! + this.counts.set(model, this.counts.get(model) ?? 0) + next() + } else { + const current = this.counts.get(model) ?? 0 + if (current > 0) { + this.counts.set(model, current - 1) + } + } + } +} diff --git a/src/features/background-agent/index.ts b/src/features/background-agent/index.ts index d4d1c84..26fece8 100644 --- a/src/features/background-agent/index.ts +++ b/src/features/background-agent/index.ts @@ -1,2 +1,3 @@ export * from "./types" export { BackgroundManager } from "./manager" +export { ConcurrencyManager } from "./concurrency" diff --git a/src/features/background-agent/manager.ts b/src/features/background-agent/manager.ts index a224384..3b33b47 100644 --- a/src/features/background-agent/manager.ts +++ b/src/features/background-agent/manager.ts @@ -6,6 +6,8 @@ import type { LaunchInput, } from "./types" import { log } from "../../shared/logger" +import { ConcurrencyManager } from "./concurrency" +import type { BackgroundTaskConfig } from "../../config/schema" import { findNearestMessageWithFields, MESSAGE_STORAGE, @@ -60,12 +62,14 @@ export class BackgroundManager { private client: OpencodeClient private directory: string private pollingInterval?: ReturnType + private concurrencyManager: ConcurrencyManager - constructor(ctx: PluginInput) { + constructor(ctx: PluginInput, config?: BackgroundTaskConfig) { this.tasks = new Map() this.notifications = new Map() this.client = ctx.client this.directory = ctx.directory + this.concurrencyManager = new ConcurrencyManager(config) } async launch(input: LaunchInput): Promise { @@ -73,6 +77,10 @@ export class BackgroundManager { throw new Error("Agent parameter is required") } + const model = input.agent + + await this.concurrencyManager.acquire(model) + const createResult = await this.client.session.create({ body: { parentID: input.parentSessionID, @@ -102,6 +110,7 @@ export class BackgroundManager { lastUpdate: new Date(), }, parentModel: input.parentModel, + model, } this.tasks.set(task.id, task) @@ -132,6 +141,9 @@ export class BackgroundManager { existingTask.error = errorMessage } existingTask.completedAt = new Date() + if (existingTask.model) { + this.concurrencyManager.release(existingTask.model) + } this.markForNotification(existingTask) this.notifyParentSession(existingTask) } @@ -253,6 +265,9 @@ export class BackgroundManager { task.error = "Session deleted" } + if (task.model) { + this.concurrencyManager.release(task.model) + } this.tasks.delete(task.id) this.clearNotificationsForTask(task.id) subagentSessions.delete(sessionID) @@ -352,6 +367,9 @@ export class BackgroundManager { } catch (error) { log("[background-agent] prompt failed:", String(error)) } finally { + if (task.model) { + this.concurrencyManager.release(task.model) + } // Always clean up both maps to prevent memory leaks this.clearNotificationsForTask(taskId) this.tasks.delete(taskId) @@ -391,6 +409,9 @@ export class BackgroundManager { task.status = "error" task.error = "Task timed out after 30 minutes" task.completedAt = new Date() + if (task.model) { + this.concurrencyManager.release(task.model) + } this.clearNotificationsForTask(taskId) this.tasks.delete(taskId) subagentSessions.delete(task.sessionID) diff --git a/src/features/background-agent/types.ts b/src/features/background-agent/types.ts index 7ba29ec..8a697a0 100644 --- a/src/features/background-agent/types.ts +++ b/src/features/background-agent/types.ts @@ -27,6 +27,7 @@ export interface BackgroundTask { error?: string progress?: TaskProgress parentModel?: { providerID: string; modelID: string } + model?: string } export interface LaunchInput {