diff --git a/cf/src/connection.js b/cf/src/connection.js index 8e79170a..30ff5fde 100644 --- a/cf/src/connection.js +++ b/cf/src/connection.js @@ -254,6 +254,11 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function nextWrite(fn) { + if (!socket) { + // Connection was closed, cannot write + chunk = nextWriteTimer = null + return false + } const x = socket.write(chunk, fn) nextWriteTimer !== null && clearImmediate(nextWriteTimer) chunk = nextWriteTimer = null @@ -436,6 +441,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } async function closed(hadError) { + terminated = true // Mark connection as terminated to prevent further query attempts incoming = Buffer.alloc(0) remaining = 0 incomings = null diff --git a/cf/src/index.js b/cf/src/index.js index ffbe7aef..baa8077a 100644 --- a/cf/src/index.js +++ b/cf/src/index.js @@ -238,12 +238,13 @@ function Postgres(a, b) { let savepoints = 0 , connection , prepare = null + , closed = false try { await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() return await Promise.race([ scope(connection, fn), - new Promise((_, reject) => connection.onclose = reject) + new Promise((_, reject) => connection.onclose = e => (closed = true, reject(e))) ]) } catch (error) { throw error @@ -290,6 +291,8 @@ function Postgres(a, b) { } function handler(q) { + if (closed) + return q.reject(Errors.connection('CONNECTION_CLOSED', options)) q.catch(e => uncaughtError || (uncaughtError = e)) c.queue === full ? queries.push(q) diff --git a/cjs/src/connection.js b/cjs/src/connection.js index 07f67167..ddfdf4ba 100644 --- a/cjs/src/connection.js +++ b/cjs/src/connection.js @@ -252,6 +252,11 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function nextWrite(fn) { + if (!socket) { + // Connection was closed, cannot write + chunk = nextWriteTimer = null + return false + } const x = socket.write(chunk, fn) nextWriteTimer !== null && clearImmediate(nextWriteTimer) chunk = nextWriteTimer = null @@ -434,6 +439,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } async function closed(hadError) { + terminated = true // Mark connection as terminated to prevent further query attempts incoming = Buffer.alloc(0) remaining = 0 incomings = null diff --git a/cjs/src/index.js b/cjs/src/index.js index f09c61c7..932f432c 100644 --- a/cjs/src/index.js +++ b/cjs/src/index.js @@ -237,12 +237,13 @@ function Postgres(a, b) { let savepoints = 0 , connection , prepare = null + , closed = false try { await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() return await Promise.race([ scope(connection, fn), - new Promise((_, reject) => connection.onclose = reject) + new Promise((_, reject) => connection.onclose = e => (closed = true, reject(e))) ]) } catch (error) { throw error @@ -289,6 +290,8 @@ function Postgres(a, b) { } function handler(q) { + if (closed) + return q.reject(Errors.connection('CONNECTION_CLOSED', options)) q.catch(e => uncaughtError || (uncaughtError = e)) c.queue === full ? queries.push(q) diff --git a/cjs/tests/index.js b/cjs/tests/index.js index 85d1aa46..3a73b0b6 100644 --- a/cjs/tests/index.js +++ b/cjs/tests/index.js @@ -2638,3 +2638,49 @@ t('query during copy error', async() => { await sql`drop table test` ] }) + +t('idle_in_transaction_session_timeout causes CONNECTION_CLOSED', { timeout: 5 }, async() => { + const sql = postgres(options) + const error = await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + await txSql`SELECT 1` + }).catch(e => e) + await sql.end() + return [true, error.code === 'CONNECTION_CLOSED' || error.code === 'CONNECTION_DESTROYED'] +}) + +t('txSql fails after idle_in_transaction_session_timeout even if callback continues', { timeout: 5 }, async() => { + const sql = postgres(options) + let failed = false + await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + try { await txSql`SELECT 1` } catch (e) { + failed = e.code === 'CONNECTION_CLOSED' || e.code === 'CONNECTION_DESTROYED' + } + }).catch(() => { /* ignore */ }) + await new Promise(r => setTimeout(r, 1000)) + await sql.end() + return [true, failed] +}) + +t('txSql fails even when pool connection reconnects after idle timeout', { timeout: 10 }, async() => { + const sql = postgres({ ...options, max: 1 }) + // Queue pool queries to trigger reconnect after connection closes + for (let i = 0; i < 3; i++) + setTimeout(() => sql`SELECT ${i}`.catch(() => { /* ignore */ }), 1000 + i * 100) + + let txSqlSucceeded = false + await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + try { + await txSql`SELECT 'should fail'` + txSqlSucceeded = true + } catch { /* ignore */ } + }).catch(() => { /* ignore */ }) + await new Promise(r => setTimeout(r, 1000)) + await sql.end() + return [false, txSqlSucceeded] +}) diff --git a/deno/src/connection.js b/deno/src/connection.js index 796725de..13c8806f 100644 --- a/deno/src/connection.js +++ b/deno/src/connection.js @@ -255,6 +255,11 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function nextWrite(fn) { + if (!socket) { + // Connection was closed, cannot write + chunk = nextWriteTimer = null + return false + } const x = socket.write(chunk, fn) nextWriteTimer !== null && clearImmediate(nextWriteTimer) chunk = nextWriteTimer = null @@ -437,6 +442,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } async function closed(hadError) { + terminated = true // Mark connection as terminated to prevent further query attempts incoming = Buffer.alloc(0) remaining = 0 incomings = null diff --git a/deno/src/index.js b/deno/src/index.js index b6d23db1..3d1efbe3 100644 --- a/deno/src/index.js +++ b/deno/src/index.js @@ -238,12 +238,13 @@ function Postgres(a, b) { let savepoints = 0 , connection , prepare = null + , closed = false try { await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() return await Promise.race([ scope(connection, fn), - new Promise((_, reject) => connection.onclose = reject) + new Promise((_, reject) => connection.onclose = e => (closed = true, reject(e))) ]) } catch (error) { throw error @@ -290,6 +291,8 @@ function Postgres(a, b) { } function handler(q) { + if (closed) + return q.reject(Errors.connection('CONNECTION_CLOSED', options)) q.catch(e => uncaughtError || (uncaughtError = e)) c.queue === full ? queries.push(q) diff --git a/deno/tests/index.js b/deno/tests/index.js index cc2a2518..4c2cc98e 100644 --- a/deno/tests/index.js +++ b/deno/tests/index.js @@ -2641,4 +2641,50 @@ t('query during copy error', async() => { ] }) +t('idle_in_transaction_session_timeout causes CONNECTION_CLOSED', { timeout: 5 }, async() => { + const sql = postgres(options) + const error = await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + await txSql`SELECT 1` + }).catch(e => e) + await sql.end() + return [true, error.code === 'CONNECTION_CLOSED' || error.code === 'CONNECTION_DESTROYED'] +}) + +t('txSql fails after idle_in_transaction_session_timeout even if callback continues', { timeout: 5 }, async() => { + const sql = postgres(options) + let failed = false + await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + try { await txSql`SELECT 1` } catch (e) { + failed = e.code === 'CONNECTION_CLOSED' || e.code === 'CONNECTION_DESTROYED' + } + }).catch(() => { /* ignore */ }) + await new Promise(r => setTimeout(r, 1000)) + await sql.end() + return [true, failed] +}) + +t('txSql fails even when pool connection reconnects after idle timeout', { timeout: 10 }, async() => { + const sql = postgres({ ...options, max: 1 }) + // Queue pool queries to trigger reconnect after connection closes + for (let i = 0; i < 3; i++) + setTimeout(() => sql`SELECT ${i}`.catch(() => { /* ignore */ }), 1000 + i * 100) + + let txSqlSucceeded = false + await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + try { + await txSql`SELECT 'should fail'` + txSqlSucceeded = true + } catch { /* ignore */ } + }).catch(() => { /* ignore */ }) + await new Promise(r => setTimeout(r, 1000)) + await sql.end() + return [false, txSqlSucceeded] +}) + ;globalThis.addEventListener("unload", () => Deno.exit(process.exitCode)) \ No newline at end of file diff --git a/src/connection.js b/src/connection.js index 1b1cccde..d4e2e776 100644 --- a/src/connection.js +++ b/src/connection.js @@ -252,6 +252,11 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function nextWrite(fn) { + if (!socket) { + // Connection was closed, cannot write + chunk = nextWriteTimer = null + return false + } const x = socket.write(chunk, fn) nextWriteTimer !== null && clearImmediate(nextWriteTimer) chunk = nextWriteTimer = null @@ -434,6 +439,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } async function closed(hadError) { + terminated = true // Mark connection as terminated to prevent further query attempts incoming = Buffer.alloc(0) remaining = 0 incomings = null diff --git a/src/index.js b/src/index.js index c7fba3da..dd716fd9 100644 --- a/src/index.js +++ b/src/index.js @@ -237,12 +237,13 @@ function Postgres(a, b) { let savepoints = 0 , connection , prepare = null + , closed = false try { await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() return await Promise.race([ scope(connection, fn), - new Promise((_, reject) => connection.onclose = reject) + new Promise((_, reject) => connection.onclose = e => (closed = true, reject(e))) ]) } catch (error) { throw error @@ -289,6 +290,8 @@ function Postgres(a, b) { } function handler(q) { + if (closed) + return q.reject(Errors.connection('CONNECTION_CLOSED', options)) q.catch(e => uncaughtError || (uncaughtError = e)) c.queue === full ? queries.push(q) diff --git a/tests/index.js b/tests/index.js index 23e6c4d4..2eed8f89 100644 --- a/tests/index.js +++ b/tests/index.js @@ -2638,3 +2638,49 @@ t('query during copy error', async() => { await sql`drop table test` ] }) + +t('idle_in_transaction_session_timeout causes CONNECTION_CLOSED', { timeout: 5 }, async() => { + const sql = postgres(options) + const error = await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + await txSql`SELECT 1` + }).catch(e => e) + await sql.end() + return [true, error.code === 'CONNECTION_CLOSED' || error.code === 'CONNECTION_DESTROYED'] +}) + +t('txSql fails after idle_in_transaction_session_timeout even if callback continues', { timeout: 5 }, async() => { + const sql = postgres(options) + let failed = false + await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + try { await txSql`SELECT 1` } catch (e) { + failed = e.code === 'CONNECTION_CLOSED' || e.code === 'CONNECTION_DESTROYED' + } + }).catch(() => { /* ignore */ }) + await new Promise(r => setTimeout(r, 1000)) + await sql.end() + return [true, failed] +}) + +t('txSql fails even when pool connection reconnects after idle timeout', { timeout: 10 }, async() => { + const sql = postgres({ ...options, max: 1 }) + // Queue pool queries to trigger reconnect after connection closes + for (let i = 0; i < 3; i++) + setTimeout(() => sql`SELECT ${i}`.catch(() => { /* ignore */ }), 1000 + i * 100) + + let txSqlSucceeded = false + await sql.begin(async(txSql) => { + await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'` + await new Promise(r => setTimeout(r, 2000)) + try { + await txSql`SELECT 'should fail'` + txSqlSucceeded = true + } catch { /* ignore */ } + }).catch(() => { /* ignore */ }) + await new Promise(r => setTimeout(r, 1000)) + await sql.end() + return [false, txSqlSucceeded] +})