Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cf/src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

function nextWrite(fn) {
if (!socket) {
// Connection was closed, cannot write
chunk = nextWriteTimer = null
return false
}
const x = socket.write(chunk, fn)
nextWriteTimer !== null && clearImmediate(nextWriteTimer)
chunk = nextWriteTimer = null
Expand Down Expand Up @@ -436,6 +441,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

async function closed(hadError) {
terminated = true // Mark connection as terminated to prevent further query attempts
incoming = Buffer.alloc(0)
remaining = 0
incomings = null
Expand Down
5 changes: 4 additions & 1 deletion cf/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,13 @@ function Postgres(a, b) {
let savepoints = 0
, connection
, prepare = null
, closed = false

try {
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute()
return await Promise.race([
scope(connection, fn),
new Promise((_, reject) => connection.onclose = reject)
new Promise((_, reject) => connection.onclose = e => (closed = true, reject(e)))
])
} catch (error) {
throw error
Expand Down Expand Up @@ -290,6 +291,8 @@ function Postgres(a, b) {
}

function handler(q) {
if (closed)
return q.reject(Errors.connection('CONNECTION_CLOSED', options))
q.catch(e => uncaughtError || (uncaughtError = e))
c.queue === full
? queries.push(q)
Expand Down
6 changes: 6 additions & 0 deletions cjs/src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

function nextWrite(fn) {
if (!socket) {
// Connection was closed, cannot write
chunk = nextWriteTimer = null
return false
}
const x = socket.write(chunk, fn)
nextWriteTimer !== null && clearImmediate(nextWriteTimer)
chunk = nextWriteTimer = null
Expand Down Expand Up @@ -434,6 +439,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

async function closed(hadError) {
terminated = true // Mark connection as terminated to prevent further query attempts
incoming = Buffer.alloc(0)
remaining = 0
incomings = null
Expand Down
5 changes: 4 additions & 1 deletion cjs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,13 @@ function Postgres(a, b) {
let savepoints = 0
, connection
, prepare = null
, closed = false

try {
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute()
return await Promise.race([
scope(connection, fn),
new Promise((_, reject) => connection.onclose = reject)
new Promise((_, reject) => connection.onclose = e => (closed = true, reject(e)))
])
} catch (error) {
throw error
Expand Down Expand Up @@ -289,6 +290,8 @@ function Postgres(a, b) {
}

function handler(q) {
if (closed)
return q.reject(Errors.connection('CONNECTION_CLOSED', options))
q.catch(e => uncaughtError || (uncaughtError = e))
c.queue === full
? queries.push(q)
Expand Down
46 changes: 46 additions & 0 deletions cjs/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2638,3 +2638,49 @@ t('query during copy error', async() => {
await sql`drop table test`
]
})

t('idle_in_transaction_session_timeout causes CONNECTION_CLOSED', { timeout: 5 }, async() => {
const sql = postgres(options)
const error = await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
await txSql`SELECT 1`
}).catch(e => e)
await sql.end()
return [true, error.code === 'CONNECTION_CLOSED' || error.code === 'CONNECTION_DESTROYED']
})

t('txSql fails after idle_in_transaction_session_timeout even if callback continues', { timeout: 5 }, async() => {
const sql = postgres(options)
let failed = false
await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
try { await txSql`SELECT 1` } catch (e) {
failed = e.code === 'CONNECTION_CLOSED' || e.code === 'CONNECTION_DESTROYED'
}
}).catch(() => { /* ignore */ })
await new Promise(r => setTimeout(r, 1000))
await sql.end()
return [true, failed]
})

t('txSql fails even when pool connection reconnects after idle timeout', { timeout: 10 }, async() => {
const sql = postgres({ ...options, max: 1 })
// Queue pool queries to trigger reconnect after connection closes
for (let i = 0; i < 3; i++)
setTimeout(() => sql`SELECT ${i}`.catch(() => { /* ignore */ }), 1000 + i * 100)

let txSqlSucceeded = false
await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
try {
await txSql`SELECT 'should fail'`
txSqlSucceeded = true
} catch { /* ignore */ }
}).catch(() => { /* ignore */ })
await new Promise(r => setTimeout(r, 1000))
await sql.end()
return [false, txSqlSucceeded]
})
6 changes: 6 additions & 0 deletions deno/src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

function nextWrite(fn) {
if (!socket) {
// Connection was closed, cannot write
chunk = nextWriteTimer = null
return false
}
const x = socket.write(chunk, fn)
nextWriteTimer !== null && clearImmediate(nextWriteTimer)
chunk = nextWriteTimer = null
Expand Down Expand Up @@ -437,6 +442,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

async function closed(hadError) {
terminated = true // Mark connection as terminated to prevent further query attempts
incoming = Buffer.alloc(0)
remaining = 0
incomings = null
Expand Down
5 changes: 4 additions & 1 deletion deno/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,13 @@ function Postgres(a, b) {
let savepoints = 0
, connection
, prepare = null
, closed = false

try {
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute()
return await Promise.race([
scope(connection, fn),
new Promise((_, reject) => connection.onclose = reject)
new Promise((_, reject) => connection.onclose = e => (closed = true, reject(e)))
])
} catch (error) {
throw error
Expand Down Expand Up @@ -290,6 +291,8 @@ function Postgres(a, b) {
}

function handler(q) {
if (closed)
return q.reject(Errors.connection('CONNECTION_CLOSED', options))
q.catch(e => uncaughtError || (uncaughtError = e))
c.queue === full
? queries.push(q)
Expand Down
46 changes: 46 additions & 0 deletions deno/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2641,4 +2641,50 @@ t('query during copy error', async() => {
]
})

t('idle_in_transaction_session_timeout causes CONNECTION_CLOSED', { timeout: 5 }, async() => {
const sql = postgres(options)
const error = await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
await txSql`SELECT 1`
}).catch(e => e)
await sql.end()
return [true, error.code === 'CONNECTION_CLOSED' || error.code === 'CONNECTION_DESTROYED']
})

t('txSql fails after idle_in_transaction_session_timeout even if callback continues', { timeout: 5 }, async() => {
const sql = postgres(options)
let failed = false
await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
try { await txSql`SELECT 1` } catch (e) {
failed = e.code === 'CONNECTION_CLOSED' || e.code === 'CONNECTION_DESTROYED'
}
}).catch(() => { /* ignore */ })
await new Promise(r => setTimeout(r, 1000))
await sql.end()
return [true, failed]
})

t('txSql fails even when pool connection reconnects after idle timeout', { timeout: 10 }, async() => {
const sql = postgres({ ...options, max: 1 })
// Queue pool queries to trigger reconnect after connection closes
for (let i = 0; i < 3; i++)
setTimeout(() => sql`SELECT ${i}`.catch(() => { /* ignore */ }), 1000 + i * 100)

let txSqlSucceeded = false
await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
try {
await txSql`SELECT 'should fail'`
txSqlSucceeded = true
} catch { /* ignore */ }
}).catch(() => { /* ignore */ })
await new Promise(r => setTimeout(r, 1000))
await sql.end()
return [false, txSqlSucceeded]
})

;globalThis.addEventListener("unload", () => Deno.exit(process.exitCode))
6 changes: 6 additions & 0 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

function nextWrite(fn) {
if (!socket) {
// Connection was closed, cannot write
chunk = nextWriteTimer = null
return false
}
const x = socket.write(chunk, fn)
nextWriteTimer !== null && clearImmediate(nextWriteTimer)
chunk = nextWriteTimer = null
Expand Down Expand Up @@ -434,6 +439,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

async function closed(hadError) {
terminated = true // Mark connection as terminated to prevent further query attempts
incoming = Buffer.alloc(0)
remaining = 0
incomings = null
Expand Down
5 changes: 4 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,13 @@ function Postgres(a, b) {
let savepoints = 0
, connection
, prepare = null
, closed = false

try {
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute()
return await Promise.race([
scope(connection, fn),
new Promise((_, reject) => connection.onclose = reject)
new Promise((_, reject) => connection.onclose = e => (closed = true, reject(e)))
])
} catch (error) {
throw error
Expand Down Expand Up @@ -289,6 +290,8 @@ function Postgres(a, b) {
}

function handler(q) {
if (closed)
return q.reject(Errors.connection('CONNECTION_CLOSED', options))
q.catch(e => uncaughtError || (uncaughtError = e))
c.queue === full
? queries.push(q)
Expand Down
46 changes: 46 additions & 0 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2638,3 +2638,49 @@ t('query during copy error', async() => {
await sql`drop table test`
]
})

t('idle_in_transaction_session_timeout causes CONNECTION_CLOSED', { timeout: 5 }, async() => {
const sql = postgres(options)
const error = await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
await txSql`SELECT 1`
}).catch(e => e)
await sql.end()
return [true, error.code === 'CONNECTION_CLOSED' || error.code === 'CONNECTION_DESTROYED']
})

t('txSql fails after idle_in_transaction_session_timeout even if callback continues', { timeout: 5 }, async() => {
const sql = postgres(options)
let failed = false
await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
try { await txSql`SELECT 1` } catch (e) {
failed = e.code === 'CONNECTION_CLOSED' || e.code === 'CONNECTION_DESTROYED'
}
}).catch(() => { /* ignore */ })
await new Promise(r => setTimeout(r, 1000))
await sql.end()
return [true, failed]
})

t('txSql fails even when pool connection reconnects after idle timeout', { timeout: 10 }, async() => {
const sql = postgres({ ...options, max: 1 })
// Queue pool queries to trigger reconnect after connection closes
for (let i = 0; i < 3; i++)
setTimeout(() => sql`SELECT ${i}`.catch(() => { /* ignore */ }), 1000 + i * 100)

let txSqlSucceeded = false
await sql.begin(async(txSql) => {
await txSql`SET LOCAL idle_in_transaction_session_timeout = '1s'`
await new Promise(r => setTimeout(r, 2000))
try {
await txSql`SELECT 'should fail'`
txSqlSucceeded = true
} catch { /* ignore */ }
}).catch(() => { /* ignore */ })
await new Promise(r => setTimeout(r, 1000))
await sql.end()
return [false, txSqlSucceeded]
})