diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 895d09caee4998..c50fd823505fc4 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -501,6 +501,27 @@ class NodeWorker extends EventEmitter { process.nextTick(() => process.emit("worker", this)); } + // Whether the message promise should keep the event loop alive. + // True when the worker is refed OR when there are "message" listeners + // (matching Node.js behavior where message listeners keep the event + // loop alive even after worker.unref()). + #messageRefed = true; + + #updateMessageRef() { + const shouldRef = this.#refed || this.listenerCount("message") > 0; + if (shouldRef === this.#messageRefed) { + return; + } + this.#messageRefed = shouldRef; + if (this.#messagePromise) { + if (shouldRef) { + core.refOpPromise(this.#messagePromise); + } else { + core.unrefOpPromise(this.#messagePromise); + } + } + } + [privateWorkerRef](ref) { if (ref === this.#refed) { return; @@ -511,17 +532,12 @@ class NodeWorker extends EventEmitter { if (this.#controlPromise) { core.refOpPromise(this.#controlPromise); } - if (this.#messagePromise) { - core.refOpPromise(this.#messagePromise); - } } else { if (this.#controlPromise) { core.unrefOpPromise(this.#controlPromise); } - if (this.#messagePromise) { - core.unrefOpPromise(this.#messagePromise); - } } + this.#updateMessageRef(); } #handleError(err) { @@ -605,7 +621,7 @@ class NodeWorker extends EventEmitter { #pollMessages = async () => { while (this.#status !== "TERMINATED") { this.#messagePromise = op_host_recv_message(this.#id); - if (!this.#refed) { + if (!this.#messageRefed) { core.unrefOpPromise(this.#messagePromise); } const data = await this.#messagePromise; @@ -710,6 +726,32 @@ class NodeWorker extends EventEmitter { this[privateWorkerRef](false); } + on(event, listener) { + super.on(event, listener); + if (event === "message") this.#updateMessageRef(); + return this; + } + + addListener(event, listener) { + return this.on(event, listener); + } + + off(event, listener) { + super.off(event, listener); + if (event === "message") this.#updateMessageRef(); + return this; + } + + removeListener(event, listener) { + return this.off(event, listener); + } + + removeAllListeners(event?) { + super.removeAllListeners(event); + if (event === undefined || event === "message") this.#updateMessageRef(); + return this; + } + cpuUsage(prevValue?: { user: number; system: number }) { if (prevValue != null && !NumberIsNaN(prevValue)) { validateObject(prevValue, "prevValue"); diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index b40a24015a0d6d..c1eb96eef10fdf 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -967,3 +967,32 @@ Deno.test({ await worker.terminate(); }, }); + +Deno.test({ + name: + "[node/worker_threads] unref with message listener keeps event loop alive", + async fn() { + // Regression test: worker.unref() should not prevent the process from + // receiving messages when there are active "message" listeners. + const worker = new workerThreads.Worker( + ` + const { parentPort } = require('worker_threads'); + parentPort.on('message', () => { + const sharedArrayBuffer = new SharedArrayBuffer(12); + parentPort.postMessage(sharedArrayBuffer); + }); + `, + { eval: true }, + ); + + worker.unref(); + + const messagePromise = once(worker, "message"); + worker.postMessage("go"); + const [msg] = await messagePromise as [SharedArrayBuffer]; + assert(msg instanceof SharedArrayBuffer); + assertEquals(msg.byteLength, 12); + + await worker.terminate(); + }, +});