perf(skill-loader): add blocking discovery API with worker threads
Implement synchronous skill discovery using Node.js Worker Threads and Atomics.wait for blocking operations. Allows synchronous API access while leveraging async operations internally via dedicated worker thread. Changes: - blocking.ts: Main blocking discovery function using SharedArrayBuffer and MessagePort - discover-worker.ts: Worker thread implementation for async skill discovery - blocking.test.ts: Comprehensive test suite with BDD comments 🤖 GENERATED WITH ASSISTANCE OF [OhMyOpenCode](https://github.com/code-yeongyu/oh-my-opencode)
This commit is contained in:
210
src/features/opencode-skill-loader/blocking.test.ts
Normal file
210
src/features/opencode-skill-loader/blocking.test.ts
Normal file
@@ -0,0 +1,210 @@
|
||||
import { describe, it, expect, beforeEach, afterEach } from "bun:test"
|
||||
import { mkdirSync, writeFileSync, rmSync } from "fs"
|
||||
import { join } from "path"
|
||||
import { tmpdir } from "os"
|
||||
import { discoverAllSkillsBlocking } from "./blocking"
|
||||
import type { SkillScope } from "./types"
|
||||
|
||||
const TEST_DIR = join(tmpdir(), `blocking-test-${Date.now()}`)
|
||||
|
||||
beforeEach(() => {
|
||||
mkdirSync(TEST_DIR, { recursive: true })
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
rmSync(TEST_DIR, { recursive: true, force: true })
|
||||
})
|
||||
|
||||
describe("discoverAllSkillsBlocking", () => {
|
||||
it("returns skills synchronously from valid directories", () => {
|
||||
// #given valid skill directory
|
||||
const skillDir = join(TEST_DIR, "skills")
|
||||
mkdirSync(skillDir, { recursive: true })
|
||||
|
||||
const skillMdPath = join(skillDir, "test-skill.md")
|
||||
writeFileSync(
|
||||
skillMdPath,
|
||||
`---
|
||||
name: test-skill
|
||||
description: A test skill
|
||||
---
|
||||
This is test skill content.`
|
||||
)
|
||||
|
||||
const dirs = [skillDir]
|
||||
const scopes: SkillScope[] = ["opencode-project"]
|
||||
|
||||
// #when discoverAllSkillsBlocking called
|
||||
const skills = discoverAllSkillsBlocking(dirs, scopes)
|
||||
|
||||
// #then returns skills synchronously
|
||||
expect(skills).toBeArray()
|
||||
expect(skills.length).toBe(1)
|
||||
expect(skills[0].name).toBe("test-skill")
|
||||
expect(skills[0].definition.description).toContain("test skill")
|
||||
})
|
||||
|
||||
it("returns empty array for empty directories", () => {
|
||||
// #given empty directory
|
||||
const emptyDir = join(TEST_DIR, "empty")
|
||||
mkdirSync(emptyDir, { recursive: true })
|
||||
|
||||
const dirs = [emptyDir]
|
||||
const scopes: SkillScope[] = ["opencode-project"]
|
||||
|
||||
// #when discoverAllSkillsBlocking called
|
||||
const skills = discoverAllSkillsBlocking(dirs, scopes)
|
||||
|
||||
// #then returns empty array
|
||||
expect(skills).toBeArray()
|
||||
expect(skills.length).toBe(0)
|
||||
})
|
||||
|
||||
it("returns empty array for non-existent directories", () => {
|
||||
// #given non-existent directory
|
||||
const nonExistentDir = join(TEST_DIR, "does-not-exist")
|
||||
|
||||
const dirs = [nonExistentDir]
|
||||
const scopes: SkillScope[] = ["opencode-project"]
|
||||
|
||||
// #when discoverAllSkillsBlocking called
|
||||
const skills = discoverAllSkillsBlocking(dirs, scopes)
|
||||
|
||||
// #then returns empty array (no throw)
|
||||
expect(skills).toBeArray()
|
||||
expect(skills.length).toBe(0)
|
||||
})
|
||||
|
||||
it("handles multiple directories with mixed content", () => {
|
||||
// #given multiple directories with valid and invalid skills
|
||||
const dir1 = join(TEST_DIR, "dir1")
|
||||
const dir2 = join(TEST_DIR, "dir2")
|
||||
mkdirSync(dir1, { recursive: true })
|
||||
mkdirSync(dir2, { recursive: true })
|
||||
|
||||
writeFileSync(
|
||||
join(dir1, "skill1.md"),
|
||||
`---
|
||||
name: skill1
|
||||
description: First skill
|
||||
---
|
||||
Skill 1 content.`
|
||||
)
|
||||
|
||||
writeFileSync(
|
||||
join(dir2, "skill2.md"),
|
||||
`---
|
||||
name: skill2
|
||||
description: Second skill
|
||||
---
|
||||
Skill 2 content.`
|
||||
)
|
||||
|
||||
const dirs = [dir1, dir2]
|
||||
const scopes: SkillScope[] = ["opencode-project"]
|
||||
|
||||
// #when discoverAllSkillsBlocking called
|
||||
const skills = discoverAllSkillsBlocking(dirs, scopes)
|
||||
|
||||
// #then returns all valid skills
|
||||
expect(skills).toBeArray()
|
||||
expect(skills.length).toBe(2)
|
||||
|
||||
const skillNames = skills.map(s => s.name).sort()
|
||||
expect(skillNames).toEqual(["skill1", "skill2"])
|
||||
})
|
||||
|
||||
it("skips invalid YAML files", () => {
|
||||
// #given directory with invalid YAML
|
||||
const skillDir = join(TEST_DIR, "skills")
|
||||
mkdirSync(skillDir, { recursive: true })
|
||||
|
||||
const validSkillPath = join(skillDir, "valid.md")
|
||||
writeFileSync(
|
||||
validSkillPath,
|
||||
`---
|
||||
name: valid-skill
|
||||
description: Valid skill
|
||||
---
|
||||
Valid skill content.`
|
||||
)
|
||||
|
||||
const invalidSkillPath = join(skillDir, "invalid.md")
|
||||
writeFileSync(
|
||||
invalidSkillPath,
|
||||
`---
|
||||
name: invalid skill
|
||||
description: [ invalid yaml
|
||||
---
|
||||
Invalid content.`
|
||||
)
|
||||
|
||||
const dirs = [skillDir]
|
||||
const scopes: SkillScope[] = ["opencode-project"]
|
||||
|
||||
// #when discoverAllSkillsBlocking called
|
||||
const skills = discoverAllSkillsBlocking(dirs, scopes)
|
||||
|
||||
// #then skips invalid, returns valid
|
||||
expect(skills).toBeArray()
|
||||
expect(skills.length).toBe(1)
|
||||
expect(skills[0].name).toBe("valid-skill")
|
||||
})
|
||||
|
||||
it("handles directory-based skills with SKILL.md", () => {
|
||||
// #given directory-based skill structure
|
||||
const skillsDir = join(TEST_DIR, "skills")
|
||||
const mySkillDir = join(skillsDir, "my-skill")
|
||||
mkdirSync(mySkillDir, { recursive: true })
|
||||
|
||||
const skillMdPath = join(mySkillDir, "SKILL.md")
|
||||
writeFileSync(
|
||||
skillMdPath,
|
||||
`---
|
||||
name: my-skill
|
||||
description: Directory-based skill
|
||||
---
|
||||
This is a directory-based skill.`
|
||||
)
|
||||
|
||||
const dirs = [skillsDir]
|
||||
const scopes: SkillScope[] = ["opencode-project"]
|
||||
|
||||
// #when discoverAllSkillsBlocking called
|
||||
const skills = discoverAllSkillsBlocking(dirs, scopes)
|
||||
|
||||
// #then returns skill from SKILL.md
|
||||
expect(skills).toBeArray()
|
||||
expect(skills.length).toBe(1)
|
||||
expect(skills[0].name).toBe("my-skill")
|
||||
})
|
||||
|
||||
it("processes large skill sets without timeout", () => {
|
||||
// #given directory with many skills (20+)
|
||||
const skillDir = join(TEST_DIR, "many-skills")
|
||||
mkdirSync(skillDir, { recursive: true })
|
||||
|
||||
const skillCount = 25
|
||||
for (let i = 0; i < skillCount; i++) {
|
||||
const skillPath = join(skillDir, `skill-${i}.md`)
|
||||
writeFileSync(
|
||||
skillPath,
|
||||
`---
|
||||
name: skill-${i}
|
||||
description: Skill number ${i}
|
||||
---
|
||||
Content for skill ${i}.`
|
||||
)
|
||||
}
|
||||
|
||||
const dirs = [skillDir]
|
||||
const scopes: SkillScope[] = ["opencode-project"]
|
||||
|
||||
// #when discoverAllSkillsBlocking called
|
||||
const skills = discoverAllSkillsBlocking(dirs, scopes)
|
||||
|
||||
// #then completes without timeout
|
||||
expect(skills).toBeArray()
|
||||
expect(skills.length).toBe(skillCount)
|
||||
})
|
||||
})
|
||||
62
src/features/opencode-skill-loader/blocking.ts
Normal file
62
src/features/opencode-skill-loader/blocking.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import { Worker, MessageChannel, receiveMessageOnPort } from "worker_threads"
|
||||
import type { LoadedSkill, SkillScope } from "./types"
|
||||
|
||||
interface WorkerInput {
|
||||
dirs: string[]
|
||||
scopes: SkillScope[]
|
||||
}
|
||||
|
||||
interface WorkerOutputSuccess {
|
||||
ok: true
|
||||
skills: LoadedSkill[]
|
||||
}
|
||||
|
||||
interface WorkerOutputError {
|
||||
ok: false
|
||||
error: { message: string; stack?: string }
|
||||
}
|
||||
|
||||
type WorkerOutput = WorkerOutputSuccess | WorkerOutputError
|
||||
|
||||
const TIMEOUT_MS = 30000
|
||||
|
||||
export function discoverAllSkillsBlocking(dirs: string[], scopes: SkillScope[]): LoadedSkill[] {
|
||||
const signal = new Int32Array(new SharedArrayBuffer(4))
|
||||
const { port1, port2 } = new MessageChannel()
|
||||
|
||||
const worker = new Worker(new URL("./discover-worker.ts", import.meta.url), {
|
||||
workerData: { signal }
|
||||
})
|
||||
|
||||
worker.postMessage({ port: port2 }, [port2])
|
||||
|
||||
const input: WorkerInput = { dirs, scopes }
|
||||
port1.postMessage(input)
|
||||
|
||||
const waitResult = Atomics.wait(signal, 0, 0, TIMEOUT_MS)
|
||||
|
||||
if (waitResult === "timed-out") {
|
||||
worker.terminate()
|
||||
port1.close()
|
||||
throw new Error(`Worker timeout after ${TIMEOUT_MS}ms`)
|
||||
}
|
||||
|
||||
const message = receiveMessageOnPort(port1)
|
||||
|
||||
worker.terminate()
|
||||
port1.close()
|
||||
|
||||
if (!message) {
|
||||
throw new Error("Worker did not return result")
|
||||
}
|
||||
|
||||
const output = message.message as WorkerOutput
|
||||
|
||||
if (output.ok === false) {
|
||||
const error = new Error(output.error.message)
|
||||
error.stack = output.error.stack
|
||||
throw error
|
||||
}
|
||||
|
||||
return output.skills
|
||||
}
|
||||
59
src/features/opencode-skill-loader/discover-worker.ts
Normal file
59
src/features/opencode-skill-loader/discover-worker.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import { workerData, parentPort } from "worker_threads"
|
||||
import type { MessagePort } from "worker_threads"
|
||||
import { discoverSkillsInDirAsync } from "./async-loader"
|
||||
import type { LoadedSkill, SkillScope } from "./types"
|
||||
|
||||
interface WorkerInput {
|
||||
dirs: string[]
|
||||
scopes: SkillScope[]
|
||||
}
|
||||
|
||||
interface WorkerOutputSuccess {
|
||||
ok: true
|
||||
skills: LoadedSkill[]
|
||||
}
|
||||
|
||||
interface WorkerOutputError {
|
||||
ok: false
|
||||
error: { message: string; stack?: string }
|
||||
}
|
||||
|
||||
type WorkerOutput = WorkerOutputSuccess | WorkerOutputError
|
||||
|
||||
const { signal } = workerData as { signal: Int32Array }
|
||||
|
||||
if (!parentPort) {
|
||||
throw new Error("Worker must be run with parentPort")
|
||||
}
|
||||
|
||||
parentPort.once("message", (data: { port: MessagePort }) => {
|
||||
const { port } = data
|
||||
|
||||
port.on("message", async (input: WorkerInput) => {
|
||||
try {
|
||||
const results = await Promise.all(
|
||||
input.dirs.map(dir => discoverSkillsInDirAsync(dir))
|
||||
)
|
||||
|
||||
const skills = results.flat()
|
||||
|
||||
const output: WorkerOutputSuccess = { ok: true, skills }
|
||||
|
||||
port.postMessage(output)
|
||||
Atomics.store(signal, 0, 1)
|
||||
Atomics.notify(signal, 0)
|
||||
} catch (error: unknown) {
|
||||
const output: WorkerOutputError = {
|
||||
ok: false,
|
||||
error: {
|
||||
message: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
},
|
||||
}
|
||||
|
||||
port.postMessage(output)
|
||||
Atomics.store(signal, 0, 1)
|
||||
Atomics.notify(signal, 0)
|
||||
}
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user