Skip to content

Commit a2be429

Browse files
authored
refactor(daemon): share a single opencode runtime across instances (#29)
1 parent 7e9f5f2 commit a2be429

4 files changed

Lines changed: 186 additions & 91 deletions

File tree

packages/daemon/src/__tests__/daemon.test.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,85 @@ describe("Daemon streaming", () => {
451451
expect(job.outputText).toBe("reply:plain");
452452
});
453453

454+
test("two instances stream independently without cross-leak", async () => {
455+
registry.streamingDeltas = ["x", "y", "z"];
456+
registry.deltaIntervalMs = 20;
457+
458+
const alpha = await daemon.handleRequest(
459+
req({
460+
id: "instance-alpha",
461+
method: "instance.create",
462+
params: { name: "Alpha", directory: "/tmp/alpha" },
463+
}),
464+
);
465+
const beta = await daemon.handleRequest(
466+
req({
467+
id: "instance-beta",
468+
method: "instance.create",
469+
params: { name: "Beta", directory: "/tmp/beta" },
470+
}),
471+
);
472+
const alphaInstance = expectSuccess(alpha, "instance.create").instance;
473+
const betaInstance = expectSuccess(beta, "instance.create").instance;
474+
475+
const submitAlpha = await daemon.handleRequest(
476+
req({
477+
id: "submit-alpha",
478+
method: "job.submit",
479+
params: {
480+
instanceId: alphaInstance.id,
481+
session: { type: "new" },
482+
task: { type: "prompt", prompt: "alpha-prompt" },
483+
},
484+
}),
485+
);
486+
const submitBeta = await daemon.handleRequest(
487+
req({
488+
id: "submit-beta",
489+
method: "job.submit",
490+
params: {
491+
instanceId: betaInstance.id,
492+
session: { type: "new" },
493+
task: { type: "prompt", prompt: "beta-prompt" },
494+
},
495+
}),
496+
);
497+
const alphaJobId = expectSuccess(submitAlpha, "job.submit").job.id;
498+
const betaJobId = expectSuccess(submitBeta, "job.submit").job.id;
499+
500+
const alphaEvents: Array<{ type: string }> = [];
501+
const betaEvents: Array<{ type: string }> = [];
502+
const unsubA = daemon.subscribeJob(alphaJobId, (e) => alphaEvents.push(e));
503+
const unsubB = daemon.subscribeJob(betaJobId, (e) => betaEvents.push(e));
504+
505+
await Bun.sleep(200);
506+
unsubA();
507+
unsubB();
508+
509+
const alphaJob = expectSuccess(
510+
await daemon.handleRequest(
511+
req({ id: "g-a", method: "job.get", params: { jobId: alphaJobId } }),
512+
),
513+
"job.get",
514+
).job;
515+
const betaJob = expectSuccess(
516+
await daemon.handleRequest(
517+
req({ id: "g-b", method: "job.get", params: { jobId: betaJobId } }),
518+
),
519+
"job.get",
520+
).job;
521+
522+
expect(alphaJob.outputText).toBe("xyz");
523+
expect(betaJob.outputText).toBe("xyz");
524+
expect(alphaEvents.some((e) => e.type === "delta")).toBe(true);
525+
expect(betaEvents.some((e) => e.type === "delta")).toBe(true);
526+
expect(alphaEvents[alphaEvents.length - 1]?.type).toBe("done");
527+
expect(betaEvents[betaEvents.length - 1]?.type).toBe("done");
528+
529+
expect(registry.directoriesStarted).toContain("/tmp/alpha");
530+
expect(registry.directoriesStarted).toContain("/tmp/beta");
531+
});
532+
454533
test("late subscriber gets snapshot of accumulated text and continues without duplicates", async () => {
455534
registry.streamingDeltas = ["alpha ", "beta ", "gamma ", "delta"];
456535
registry.deltaIntervalMs = 25;

packages/daemon/src/__tests__/helpers.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ export class FakeOpencodeRegistry implements OpencodeRuntimeManager {
6161
prompt: string;
6262
}> = [];
6363
readonly abortCalls: Array<{ instanceId: string; sessionId: string }> = [];
64+
readonly directoriesStarted: string[] = [];
65+
readonly disposeCalls: Array<{ directory?: string }> = [];
6466
globalMaxConcurrent = 0;
6567
/** Configurable per-test: an array of text deltas the fake will emit
6668
* via the wired onEvent handler before returning the final response
@@ -84,7 +86,11 @@ export class FakeOpencodeRegistry implements OpencodeRuntimeManager {
8486
this.onEvent?.(instanceId, event);
8587
}
8688

87-
async ensureStarted(instanceId: string): Promise<ManagedOpencodeRuntime> {
89+
async ensureStarted(
90+
instanceId: string,
91+
directory: string,
92+
): Promise<ManagedOpencodeRuntime> {
93+
this.directoriesStarted.push(directory);
8894
const existing = this.runtimes.get(instanceId);
8995
if (existing) {
9096
return existing;
@@ -93,7 +99,10 @@ export class FakeOpencodeRegistry implements OpencodeRuntimeManager {
9399
const runtime: ManagedOpencodeRuntime = {
94100
client: {
95101
instance: {
96-
dispose: async () => undefined,
102+
dispose: async (parameters) => {
103+
this.disposeCalls.push({ directory: parameters?.directory });
104+
return undefined;
105+
},
97106
},
98107
session: {
99108
create: async () => {
@@ -204,7 +213,16 @@ export class FakeOpencodeRegistry implements OpencodeRuntimeManager {
204213
this.runtimes.clear();
205214
}
206215

207-
async queryProviders(_directory?: string, _refresh?: boolean) {
216+
async queryProviders(
217+
directories: string[],
218+
_directory?: string,
219+
refresh?: boolean,
220+
) {
221+
if (refresh) {
222+
for (const dir of directories) {
223+
this.disposeCalls.push({ directory: dir });
224+
}
225+
}
208226
return { providers: [], connected: [] };
209227
}
210228
}

packages/daemon/src/opencode.ts

Lines changed: 78 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export interface ProviderListResult {
5050
export interface OpencodeRuntimeClient {
5151
session: OpencodeSessionClient;
5252
instance: {
53-
dispose(): Promise<unknown>;
53+
dispose(parameters?: { directory?: string }): Promise<unknown>;
5454
};
5555
provider: {
5656
list(parameters?: { directory?: string }): Promise<ProviderListResult>;
@@ -69,33 +69,42 @@ export interface ManagedOpencodeRuntime {
6969
export type OpencodeRuntimeEvent = OpencodeEvent;
7070

7171
export interface OpencodeRuntimeManager {
72-
ensureStarted(instanceId: string): Promise<ManagedOpencodeRuntime>;
72+
ensureStarted(
73+
instanceId: string,
74+
directory: string,
75+
): Promise<ManagedOpencodeRuntime>;
7376
get(instanceId: string): ManagedOpencodeRuntime | undefined;
7477
isRunning(instanceId: string): boolean;
7578
stop(instanceId: string): Promise<void>;
7679
stopAll(): Promise<void>;
77-
/** Register the handler that receives every event surfaced by managed
78-
* runtimes. Called by the Daemon during construction so that wiring is
80+
/** Register the handler that receives every event surfaced by the shared
81+
* runtime. Called by the Daemon during construction so that wiring is
7982
* uniform regardless of whether the registry was injected or default. */
8083
setOnEvent(
8184
handler: (instanceId: string, event: OpencodeRuntimeEvent) => void,
8285
): void;
8386
queryProviders(
87+
directories: string[],
8488
directory?: string,
8589
refresh?: boolean,
8690
): Promise<ProviderListResult>;
8791
}
8892

89-
interface RuntimeEntry {
90-
runtime?: ManagedOpencodeRuntime;
91-
starting?: Promise<ManagedOpencodeRuntime>;
93+
interface SharedRuntime extends ManagedOpencodeRuntime {
94+
rawEventSubscribe(parameters: {
95+
directory: string;
96+
}): Promise<{ stream: AsyncIterable<OpencodeRuntimeEvent> }>;
9297
}
9398

94-
const SYSTEM_INSTANCE_ID = "__system__";
99+
interface InstanceSubscription {
100+
directory: string;
101+
cancel(): void;
102+
}
95103

96104
export class OpencodeRegistry implements OpencodeRuntimeManager {
97-
private readonly runtimes = new Map<string, RuntimeEntry>();
98-
private readonly eventSubscriptions = new Map<string, { cancel(): void }>();
105+
private shared?: SharedRuntime;
106+
private sharedStarting?: Promise<SharedRuntime>;
107+
private readonly subscriptions = new Map<string, InstanceSubscription>();
99108
private onEvent?: (instanceId: string, event: OpencodeRuntimeEvent) => void;
100109

101110
setOnEvent(
@@ -104,25 +113,38 @@ export class OpencodeRegistry implements OpencodeRuntimeManager {
104113
this.onEvent = handler;
105114
}
106115

107-
async ensureStarted(instanceId: string): Promise<ManagedOpencodeRuntime> {
108-
const entry = this.runtimes.get(instanceId);
109-
if (entry?.runtime) {
110-
return entry.runtime;
111-
}
116+
async ensureStarted(
117+
instanceId: string,
118+
directory: string,
119+
): Promise<ManagedOpencodeRuntime> {
120+
const runtime = await this.ensureSharedRuntime();
112121

113-
if (entry?.starting) {
114-
return entry.starting;
122+
const existing = this.subscriptions.get(instanceId);
123+
if (!existing) {
124+
const events = await runtime.rawEventSubscribe({ directory });
125+
const subscription = this.consumeEvents(instanceId, events);
126+
this.subscriptions.set(instanceId, {
127+
directory,
128+
cancel: subscription.cancel,
129+
});
115130
}
116131

117-
const starting = createOpencode().then(async ({ client, server }) => {
118-
const events = await client.event.subscribe();
119-
const subscription = this.consumeEvents(instanceId, events);
120-
this.eventSubscriptions.set(instanceId, subscription);
132+
return runtime;
133+
}
134+
135+
private async ensureSharedRuntime(): Promise<SharedRuntime> {
136+
if (this.shared) {
137+
return this.shared;
138+
}
139+
if (this.sharedStarting) {
140+
return this.sharedStarting;
141+
}
121142

122-
const runtime: ManagedOpencodeRuntime = {
143+
const starting = createOpencode({ port: 0 }).then(({ client, server }) => {
144+
const runtime: SharedRuntime = {
123145
client: {
124146
instance: {
125-
dispose: () => client.instance.dispose(),
147+
dispose: (parameters) => client.instance.dispose(parameters),
126148
},
127149
session: {
128150
create: async (parameters) => {
@@ -184,17 +206,19 @@ export class OpencodeRegistry implements OpencodeRuntimeManager {
184206
},
185207
},
186208
server,
209+
rawEventSubscribe: async (parameters) => {
210+
return client.event.subscribe(parameters);
211+
},
187212
};
188-
this.runtimes.set(instanceId, { runtime });
213+
this.shared = runtime;
189214
return runtime;
190215
});
191216

192-
this.runtimes.set(instanceId, { starting });
217+
this.sharedStarting = starting;
193218
try {
194219
return await starting;
195-
} catch (error) {
196-
this.runtimes.delete(instanceId);
197-
throw error;
220+
} finally {
221+
this.sharedStarting = undefined;
198222
}
199223
}
200224

@@ -223,84 +247,52 @@ export class OpencodeRegistry implements OpencodeRuntimeManager {
223247
}
224248

225249
get(instanceId: string): ManagedOpencodeRuntime | undefined {
226-
return this.runtimes.get(instanceId)?.runtime;
250+
return this.subscriptions.has(instanceId) ? this.shared : undefined;
227251
}
228252

229253
isRunning(instanceId: string): boolean {
230-
return this.runtimes.has(instanceId) && Boolean(this.get(instanceId));
254+
return this.subscriptions.has(instanceId);
231255
}
232256

233257
async stop(instanceId: string): Promise<void> {
234-
this.eventSubscriptions.get(instanceId)?.cancel();
235-
this.eventSubscriptions.delete(instanceId);
236-
237-
const entry = this.runtimes.get(instanceId);
238-
if (!entry) {
258+
const subscription = this.subscriptions.get(instanceId);
259+
if (!subscription) {
239260
return;
240261
}
241-
242-
try {
243-
const runtime =
244-
entry.runtime ?? (entry.starting ? await entry.starting : undefined);
245-
await runtime?.client.instance.dispose();
246-
runtime?.server.close();
247-
} finally {
248-
this.runtimes.delete(instanceId);
249-
}
262+
subscription.cancel();
263+
this.subscriptions.delete(instanceId);
250264
}
251265

252266
async stopAll(): Promise<void> {
253-
await Promise.allSettled(
254-
[...this.runtimes.keys()].map((instanceId) => this.stop(instanceId)),
255-
);
256-
}
257-
258-
/**
259-
* Get or create a long-lived system runtime for provider queries and other
260-
* lightweight operations that don't belong to a user-created instance.
261-
*/
262-
private ensureSystemRuntime(): Promise<ManagedOpencodeRuntime> {
263-
return this.ensureStarted(SYSTEM_INSTANCE_ID);
264-
}
267+
for (const subscription of this.subscriptions.values()) {
268+
subscription.cancel();
269+
}
270+
this.subscriptions.clear();
265271

266-
private async healthCheck(runtime: ManagedOpencodeRuntime): Promise<boolean> {
267-
return runtime.client.ping();
272+
const runtime = this.shared;
273+
this.shared = undefined;
274+
if (runtime) {
275+
try {
276+
runtime.server.close();
277+
} catch {
278+
// best-effort shutdown
279+
}
280+
}
268281
}
269282

270283
async queryProviders(
284+
directories: string[],
271285
directory?: string,
272286
refresh?: boolean,
273287
): Promise<ProviderListResult> {
274-
// When refresh is requested, dispose the runtime's internal instance so
275-
// OpenCode re-reads auth.json and rebuilds its provider cache.
288+
const runtime = await this.ensureSharedRuntime();
276289
if (refresh) {
277-
const runtime = await this.ensureSystemRuntime();
278-
try {
279-
await runtime.client.instance.dispose();
280-
} catch {
281-
// dispose may fail if the instance was already gone — ignore
282-
}
283-
return runtime.client.provider.list({ directory });
284-
}
285-
286-
// Prefer an existing user runtime if one is available
287-
for (const [id, entry] of this.runtimes.entries()) {
288-
if (id !== SYSTEM_INSTANCE_ID && entry.runtime) {
289-
try {
290-
return await entry.runtime.client.provider.list({ directory });
291-
} catch {}
292-
}
290+
await Promise.allSettled(
291+
directories.map((dir) =>
292+
runtime.client.instance.dispose({ directory: dir }),
293+
),
294+
);
293295
}
294-
295-
// Fall back to the long-lived system runtime
296-
let runtime = await this.ensureSystemRuntime();
297-
298-
// Health check — restart if the system instance died
299-
if (!(await this.healthCheck(runtime))) {
300-
this.runtimes.delete(SYSTEM_INSTANCE_ID);
301-
runtime = await this.ensureSystemRuntime();
302-
}
303-
304296
return runtime.client.provider.list({ directory });
305297
}
306298
}

0 commit comments

Comments
 (0)