feat(background-agent): add model-based concurrency management (#548)
* feat(config): add BackgroundTaskConfigSchema for model concurrency 🤖 GENERATED WITH ASSISTANCE OF OhMyOpenCode (https://github.com/code-yeongyu/oh-my-opencode) * feat(background-agent): add ConcurrencyManager for model-based limits 🤖 GENERATED WITH ASSISTANCE OF [OhMyOpenCode](https://github.com/code-yeongyu/oh-my-opencode) * feat(background-agent): integrate ConcurrencyManager into BackgroundManager 🤖 GENERATED WITH ASSISTANCE OF OhMyOpenCode (https://github.com/code-yeongyu/oh-my-opencode) * test(background-agent): add ConcurrencyManager tests 🤖 GENERATED WITH ASSISTANCE OF [OhMyOpenCode](https://github.com/code-yeongyu/oh-my-opencode) * fix(background-agent): set default concurrency to 5 🤖 Generated with [OhMyOpenCode](https://github.com/code-yeongyu/oh-my-opencode) * feat(background-agent): support 0 as unlimited concurrency Setting concurrency to 0 means unlimited (Infinity). Works for defaultConcurrency, providerConcurrency, and modelConcurrency. 🤖 Generated with [OhMyOpenCode](https://github.com/code-yeongyu/oh-my-opencode)
This commit is contained in:
@@ -1658,6 +1658,35 @@
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"background_task": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"defaultConcurrency": {
|
||||
"type": "number",
|
||||
"minimum": 1
|
||||
},
|
||||
"providerConcurrency": {
|
||||
"type": "object",
|
||||
"propertyNames": {
|
||||
"type": "string"
|
||||
},
|
||||
"additionalProperties": {
|
||||
"type": "number",
|
||||
"minimum": 1
|
||||
}
|
||||
},
|
||||
"modelConcurrency": {
|
||||
"type": "object",
|
||||
"propertyNames": {
|
||||
"type": "string"
|
||||
},
|
||||
"additionalProperties": {
|
||||
"type": "number",
|
||||
"minimum": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -232,6 +232,12 @@ export const RalphLoopConfigSchema = z.object({
|
||||
state_dir: z.string().optional(),
|
||||
})
|
||||
|
||||
export const BackgroundTaskConfigSchema = z.object({
|
||||
defaultConcurrency: z.number().min(1).optional(),
|
||||
providerConcurrency: z.record(z.string(), z.number().min(1)).optional(),
|
||||
modelConcurrency: z.record(z.string(), z.number().min(1)).optional(),
|
||||
})
|
||||
|
||||
export const OhMyOpenCodeConfigSchema = z.object({
|
||||
$schema: z.string().optional(),
|
||||
disabled_mcps: z.array(AnyMcpNameSchema).optional(),
|
||||
@@ -248,11 +254,13 @@ export const OhMyOpenCodeConfigSchema = z.object({
|
||||
auto_update: z.boolean().optional(),
|
||||
skills: SkillsConfigSchema.optional(),
|
||||
ralph_loop: RalphLoopConfigSchema.optional(),
|
||||
background_task: BackgroundTaskConfigSchema.optional(),
|
||||
})
|
||||
|
||||
export type OhMyOpenCodeConfig = z.infer<typeof OhMyOpenCodeConfigSchema>
|
||||
export type AgentOverrideConfig = z.infer<typeof AgentOverrideConfigSchema>
|
||||
export type AgentOverrides = z.infer<typeof AgentOverridesSchema>
|
||||
export type BackgroundTaskConfig = z.infer<typeof BackgroundTaskConfigSchema>
|
||||
export type AgentName = z.infer<typeof AgentNameSchema>
|
||||
export type HookName = z.infer<typeof HookNameSchema>
|
||||
export type BuiltinCommandName = z.infer<typeof BuiltinCommandNameSchema>
|
||||
|
||||
351
src/features/background-agent/concurrency.test.ts
Normal file
351
src/features/background-agent/concurrency.test.ts
Normal file
@@ -0,0 +1,351 @@
|
||||
import { describe, test, expect, beforeEach } from "bun:test"
|
||||
import { ConcurrencyManager } from "./concurrency"
|
||||
import type { BackgroundTaskConfig } from "../../config/schema"
|
||||
|
||||
describe("ConcurrencyManager.getConcurrencyLimit", () => {
|
||||
test("should return model-specific limit when modelConcurrency is set", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
modelConcurrency: { "anthropic/claude-sonnet-4-5": 5 }
|
||||
}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(5)
|
||||
})
|
||||
|
||||
test("should return provider limit when providerConcurrency is set for model provider", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
providerConcurrency: { anthropic: 3 }
|
||||
}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(3)
|
||||
})
|
||||
|
||||
test("should return provider limit even when modelConcurrency exists but doesn't match", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
modelConcurrency: { "google/gemini-3-pro": 5 },
|
||||
providerConcurrency: { anthropic: 3 }
|
||||
}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(3)
|
||||
})
|
||||
|
||||
test("should return default limit when defaultConcurrency is set", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
defaultConcurrency: 2
|
||||
}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(2)
|
||||
})
|
||||
|
||||
test("should return default 5 when no config provided", () => {
|
||||
// #given
|
||||
const manager = new ConcurrencyManager()
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(5)
|
||||
})
|
||||
|
||||
test("should return default 5 when config exists but no concurrency settings", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(5)
|
||||
})
|
||||
|
||||
test("should prioritize model-specific over provider-specific over default", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
modelConcurrency: { "anthropic/claude-sonnet-4-5": 10 },
|
||||
providerConcurrency: { anthropic: 5 },
|
||||
defaultConcurrency: 2
|
||||
}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const modelLimit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
const providerLimit = manager.getConcurrencyLimit("anthropic/claude-opus-4-5")
|
||||
const defaultLimit = manager.getConcurrencyLimit("google/gemini-3-pro")
|
||||
|
||||
// #then
|
||||
expect(modelLimit).toBe(10)
|
||||
expect(providerLimit).toBe(5)
|
||||
expect(defaultLimit).toBe(2)
|
||||
})
|
||||
|
||||
test("should handle models without provider part", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
providerConcurrency: { "custom-model": 4 }
|
||||
}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("custom-model")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(4)
|
||||
})
|
||||
|
||||
test("should return Infinity when defaultConcurrency is 0", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = { defaultConcurrency: 0 }
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("any-model")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(Infinity)
|
||||
})
|
||||
|
||||
test("should return Infinity when providerConcurrency is 0", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
providerConcurrency: { anthropic: 0 }
|
||||
}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(Infinity)
|
||||
})
|
||||
|
||||
test("should return Infinity when modelConcurrency is 0", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
modelConcurrency: { "anthropic/claude-sonnet-4-5": 0 }
|
||||
}
|
||||
const manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
const limit = manager.getConcurrencyLimit("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #then
|
||||
expect(limit).toBe(Infinity)
|
||||
})
|
||||
})
|
||||
|
||||
describe("ConcurrencyManager.acquire/release", () => {
|
||||
let manager: ConcurrencyManager
|
||||
|
||||
beforeEach(() => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {}
|
||||
manager = new ConcurrencyManager(config)
|
||||
})
|
||||
|
||||
test("should allow acquiring up to limit", async () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = { defaultConcurrency: 2 }
|
||||
manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
await manager.acquire("model-a")
|
||||
await manager.acquire("model-a")
|
||||
|
||||
// #then - both resolved without waiting
|
||||
expect(true).toBe(true)
|
||||
})
|
||||
|
||||
test("should allow acquires up to default limit of 5", async () => {
|
||||
// #given - no config = default limit of 5
|
||||
|
||||
// #when
|
||||
await manager.acquire("model-a")
|
||||
await manager.acquire("model-a")
|
||||
await manager.acquire("model-a")
|
||||
await manager.acquire("model-a")
|
||||
await manager.acquire("model-a")
|
||||
|
||||
// #then - all 5 resolved
|
||||
expect(true).toBe(true)
|
||||
})
|
||||
|
||||
test("should queue when limit reached", async () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = { defaultConcurrency: 1 }
|
||||
manager = new ConcurrencyManager(config)
|
||||
await manager.acquire("model-a")
|
||||
|
||||
// #when
|
||||
let resolved = false
|
||||
const waitPromise = manager.acquire("model-a").then(() => { resolved = true })
|
||||
|
||||
// Give microtask queue a chance to run
|
||||
await Promise.resolve()
|
||||
|
||||
// #then - should still be waiting
|
||||
expect(resolved).toBe(false)
|
||||
|
||||
// #when - release
|
||||
manager.release("model-a")
|
||||
await waitPromise
|
||||
|
||||
// #then - now resolved
|
||||
expect(resolved).toBe(true)
|
||||
})
|
||||
|
||||
test("should queue multiple tasks and process in order", async () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = { defaultConcurrency: 1 }
|
||||
manager = new ConcurrencyManager(config)
|
||||
await manager.acquire("model-a")
|
||||
|
||||
// #when
|
||||
const order: string[] = []
|
||||
const task1 = manager.acquire("model-a").then(() => { order.push("1") })
|
||||
const task2 = manager.acquire("model-a").then(() => { order.push("2") })
|
||||
const task3 = manager.acquire("model-a").then(() => { order.push("3") })
|
||||
|
||||
// Give microtask queue a chance to run
|
||||
await Promise.resolve()
|
||||
|
||||
// #then - none resolved yet
|
||||
expect(order).toEqual([])
|
||||
|
||||
// #when - release one at a time
|
||||
manager.release("model-a")
|
||||
await task1
|
||||
expect(order).toEqual(["1"])
|
||||
|
||||
manager.release("model-a")
|
||||
await task2
|
||||
expect(order).toEqual(["1", "2"])
|
||||
|
||||
manager.release("model-a")
|
||||
await task3
|
||||
expect(order).toEqual(["1", "2", "3"])
|
||||
})
|
||||
|
||||
test("should handle independent models separately", async () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = { defaultConcurrency: 1 }
|
||||
manager = new ConcurrencyManager(config)
|
||||
await manager.acquire("model-a")
|
||||
|
||||
// #when - acquire different model
|
||||
const resolved = await Promise.race([
|
||||
manager.acquire("model-b").then(() => "resolved"),
|
||||
Promise.resolve("timeout").then(() => "timeout")
|
||||
])
|
||||
|
||||
// #then - different model should resolve immediately
|
||||
expect(resolved).toBe("resolved")
|
||||
})
|
||||
|
||||
test("should allow re-acquiring after release", async () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = { defaultConcurrency: 1 }
|
||||
manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
await manager.acquire("model-a")
|
||||
manager.release("model-a")
|
||||
await manager.acquire("model-a")
|
||||
|
||||
// #then
|
||||
expect(true).toBe(true)
|
||||
})
|
||||
|
||||
test("should handle release when no acquire", () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = { defaultConcurrency: 2 }
|
||||
manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when - release without acquire
|
||||
manager.release("model-a")
|
||||
|
||||
// #then - should not throw
|
||||
expect(true).toBe(true)
|
||||
})
|
||||
|
||||
test("should handle release when no prior acquire", () => {
|
||||
// #given - default config
|
||||
|
||||
// #when - release without acquire
|
||||
manager.release("model-a")
|
||||
|
||||
// #then - should not throw
|
||||
expect(true).toBe(true)
|
||||
})
|
||||
|
||||
test("should handle multiple acquires and releases correctly", async () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = { defaultConcurrency: 3 }
|
||||
manager = new ConcurrencyManager(config)
|
||||
|
||||
// #when
|
||||
await manager.acquire("model-a")
|
||||
await manager.acquire("model-a")
|
||||
await manager.acquire("model-a")
|
||||
|
||||
// Release all
|
||||
manager.release("model-a")
|
||||
manager.release("model-a")
|
||||
manager.release("model-a")
|
||||
|
||||
// Should be able to acquire again
|
||||
await manager.acquire("model-a")
|
||||
|
||||
// #then
|
||||
expect(true).toBe(true)
|
||||
})
|
||||
|
||||
test("should use model-specific limit for acquire", async () => {
|
||||
// #given
|
||||
const config: BackgroundTaskConfig = {
|
||||
modelConcurrency: { "anthropic/claude-sonnet-4-5": 2 },
|
||||
defaultConcurrency: 5
|
||||
}
|
||||
manager = new ConcurrencyManager(config)
|
||||
await manager.acquire("anthropic/claude-sonnet-4-5")
|
||||
await manager.acquire("anthropic/claude-sonnet-4-5")
|
||||
|
||||
// #when
|
||||
let resolved = false
|
||||
const waitPromise = manager.acquire("anthropic/claude-sonnet-4-5").then(() => { resolved = true })
|
||||
|
||||
// Give microtask queue a chance to run
|
||||
await Promise.resolve()
|
||||
|
||||
// #then - should be waiting (model-specific limit is 2)
|
||||
expect(resolved).toBe(false)
|
||||
|
||||
// Cleanup
|
||||
manager.release("anthropic/claude-sonnet-4-5")
|
||||
await waitPromise
|
||||
})
|
||||
})
|
||||
66
src/features/background-agent/concurrency.ts
Normal file
66
src/features/background-agent/concurrency.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import type { BackgroundTaskConfig } from "../../config/schema"
|
||||
|
||||
export class ConcurrencyManager {
|
||||
private config?: BackgroundTaskConfig
|
||||
private counts: Map<string, number> = new Map()
|
||||
private queues: Map<string, Array<() => void>> = new Map()
|
||||
|
||||
constructor(config?: BackgroundTaskConfig) {
|
||||
this.config = config
|
||||
}
|
||||
|
||||
getConcurrencyLimit(model: string): number {
|
||||
const modelLimit = this.config?.modelConcurrency?.[model]
|
||||
if (modelLimit !== undefined) {
|
||||
return modelLimit === 0 ? Infinity : modelLimit
|
||||
}
|
||||
const provider = model.split('/')[0]
|
||||
const providerLimit = this.config?.providerConcurrency?.[provider]
|
||||
if (providerLimit !== undefined) {
|
||||
return providerLimit === 0 ? Infinity : providerLimit
|
||||
}
|
||||
const defaultLimit = this.config?.defaultConcurrency
|
||||
if (defaultLimit !== undefined) {
|
||||
return defaultLimit === 0 ? Infinity : defaultLimit
|
||||
}
|
||||
return 5
|
||||
}
|
||||
|
||||
async acquire(model: string): Promise<void> {
|
||||
const limit = this.getConcurrencyLimit(model)
|
||||
if (limit === Infinity) {
|
||||
return
|
||||
}
|
||||
|
||||
const current = this.counts.get(model) ?? 0
|
||||
if (current < limit) {
|
||||
this.counts.set(model, current + 1)
|
||||
return
|
||||
}
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
const queue = this.queues.get(model) ?? []
|
||||
queue.push(resolve)
|
||||
this.queues.set(model, queue)
|
||||
})
|
||||
}
|
||||
|
||||
release(model: string): void {
|
||||
const limit = this.getConcurrencyLimit(model)
|
||||
if (limit === Infinity) {
|
||||
return
|
||||
}
|
||||
|
||||
const queue = this.queues.get(model)
|
||||
if (queue && queue.length > 0) {
|
||||
const next = queue.shift()!
|
||||
this.counts.set(model, this.counts.get(model) ?? 0)
|
||||
next()
|
||||
} else {
|
||||
const current = this.counts.get(model) ?? 0
|
||||
if (current > 0) {
|
||||
this.counts.set(model, current - 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,3 @@
|
||||
export * from "./types"
|
||||
export { BackgroundManager } from "./manager"
|
||||
export { ConcurrencyManager } from "./concurrency"
|
||||
|
||||
@@ -6,6 +6,8 @@ import type {
|
||||
LaunchInput,
|
||||
} from "./types"
|
||||
import { log } from "../../shared/logger"
|
||||
import { ConcurrencyManager } from "./concurrency"
|
||||
import type { BackgroundTaskConfig } from "../../config/schema"
|
||||
import {
|
||||
findNearestMessageWithFields,
|
||||
MESSAGE_STORAGE,
|
||||
@@ -60,12 +62,14 @@ export class BackgroundManager {
|
||||
private client: OpencodeClient
|
||||
private directory: string
|
||||
private pollingInterval?: ReturnType<typeof setInterval>
|
||||
private concurrencyManager: ConcurrencyManager
|
||||
|
||||
constructor(ctx: PluginInput) {
|
||||
constructor(ctx: PluginInput, config?: BackgroundTaskConfig) {
|
||||
this.tasks = new Map()
|
||||
this.notifications = new Map()
|
||||
this.client = ctx.client
|
||||
this.directory = ctx.directory
|
||||
this.concurrencyManager = new ConcurrencyManager(config)
|
||||
}
|
||||
|
||||
async launch(input: LaunchInput): Promise<BackgroundTask> {
|
||||
@@ -73,6 +77,10 @@ export class BackgroundManager {
|
||||
throw new Error("Agent parameter is required")
|
||||
}
|
||||
|
||||
const model = input.agent
|
||||
|
||||
await this.concurrencyManager.acquire(model)
|
||||
|
||||
const createResult = await this.client.session.create({
|
||||
body: {
|
||||
parentID: input.parentSessionID,
|
||||
@@ -102,6 +110,7 @@ export class BackgroundManager {
|
||||
lastUpdate: new Date(),
|
||||
},
|
||||
parentModel: input.parentModel,
|
||||
model,
|
||||
}
|
||||
|
||||
this.tasks.set(task.id, task)
|
||||
@@ -132,6 +141,9 @@ export class BackgroundManager {
|
||||
existingTask.error = errorMessage
|
||||
}
|
||||
existingTask.completedAt = new Date()
|
||||
if (existingTask.model) {
|
||||
this.concurrencyManager.release(existingTask.model)
|
||||
}
|
||||
this.markForNotification(existingTask)
|
||||
this.notifyParentSession(existingTask)
|
||||
}
|
||||
@@ -253,6 +265,9 @@ export class BackgroundManager {
|
||||
task.error = "Session deleted"
|
||||
}
|
||||
|
||||
if (task.model) {
|
||||
this.concurrencyManager.release(task.model)
|
||||
}
|
||||
this.tasks.delete(task.id)
|
||||
this.clearNotificationsForTask(task.id)
|
||||
subagentSessions.delete(sessionID)
|
||||
@@ -352,6 +367,9 @@ export class BackgroundManager {
|
||||
} catch (error) {
|
||||
log("[background-agent] prompt failed:", String(error))
|
||||
} finally {
|
||||
if (task.model) {
|
||||
this.concurrencyManager.release(task.model)
|
||||
}
|
||||
// Always clean up both maps to prevent memory leaks
|
||||
this.clearNotificationsForTask(taskId)
|
||||
this.tasks.delete(taskId)
|
||||
@@ -391,6 +409,9 @@ export class BackgroundManager {
|
||||
task.status = "error"
|
||||
task.error = "Task timed out after 30 minutes"
|
||||
task.completedAt = new Date()
|
||||
if (task.model) {
|
||||
this.concurrencyManager.release(task.model)
|
||||
}
|
||||
this.clearNotificationsForTask(taskId)
|
||||
this.tasks.delete(taskId)
|
||||
subagentSessions.delete(task.sessionID)
|
||||
|
||||
@@ -27,6 +27,7 @@ export interface BackgroundTask {
|
||||
error?: string
|
||||
progress?: TaskProgress
|
||||
parentModel?: { providerID: string; modelID: string }
|
||||
model?: string
|
||||
}
|
||||
|
||||
export interface LaunchInput {
|
||||
|
||||
Reference in New Issue
Block a user