Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions src/plugin/lcm-doctor-apply.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { DatabaseSync } from "node:sqlite";
import { acquireTransactionLock } from "../transaction-mutex.js";
import { formatTimestamp } from "../compaction.js";
import type { LcmConfig } from "../db/config.js";
import type { LcmSummarizeFn } from "../summarize.js";
Expand Down Expand Up @@ -139,26 +140,31 @@ export async function applyScopedDoctorRepair(params: {
}

if (repairedSummaryIds.length > 0) {
params.db.exec("BEGIN IMMEDIATE");
const release = await acquireTransactionLock(params.db);
try {
for (const summaryId of repairedSummaryIds) {
const override = overrides.get(summaryId);
if (!override) {
continue;
params.db.exec("BEGIN IMMEDIATE");
try {
for (const summaryId of repairedSummaryIds) {
const override = overrides.get(summaryId);
if (!override) {
continue;
}
params.db
.prepare(
`UPDATE summaries
SET content = ?, token_count = ?
WHERE summary_id = ?`,
)
.run(override.content, override.tokenCount, summaryId);
updateSummaryFts(params.db, summaryId, override.content);
}
params.db
.prepare(
`UPDATE summaries
SET content = ?, token_count = ?
WHERE summary_id = ?`,
)
.run(override.content, override.tokenCount, summaryId);
updateSummaryFts(params.db, summaryId, override.content);
params.db.exec("COMMIT");
} catch (error) {
params.db.exec("ROLLBACK");
throw error;
}
params.db.exec("COMMIT");
} catch (error) {
params.db.exec("ROLLBACK");
throw error;
} finally {
release();
}
}

Expand Down
20 changes: 13 additions & 7 deletions src/store/conversation-store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { DatabaseSync } from "node:sqlite";
import { randomUUID } from "node:crypto";
import { acquireTransactionLock } from "../transaction-mutex.js";
import { sanitizeFts5Query } from "./fts5-sanitize.js";
import { buildLikeSearchPlan, containsCjk, createFallbackSnippet } from "./full-text-fallback.js";
import { parseUtcTimestamp, parseUtcTimestampOrNull } from "./parse-utc-timestamp.js";
Expand Down Expand Up @@ -270,14 +271,19 @@ export class ConversationStore {
// ── Transaction helpers ──────────────────────────────────────────────────

async withTransaction<T>(operation: () => Promise<T> | T): Promise<T> {
this.db.exec("BEGIN IMMEDIATE");
const release = await acquireTransactionLock(this.db);
try {
const result = await operation();
this.db.exec("COMMIT");
return result;
} catch (error) {
this.db.exec("ROLLBACK");
throw error;
this.db.exec("BEGIN IMMEDIATE");
try {
const result = await operation();
this.db.exec("COMMIT");
return result;
} catch (error) {
this.db.exec("ROLLBACK");
throw error;
}
} finally {
release();
}
}

Expand Down
94 changes: 50 additions & 44 deletions src/store/summary-store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { DatabaseSync } from "node:sqlite";
import { acquireTransactionLock } from "../transaction-mutex.js";
import { sanitizeFts5Query } from "./fts5-sanitize.js";
import { buildLikeSearchPlan, containsCjk, createFallbackSnippet } from "./full-text-fallback.js";
import { parseUtcTimestamp, parseUtcTimestampOrNull } from "./parse-utc-timestamp.js";
Expand Down Expand Up @@ -921,54 +922,59 @@ export class SummaryStore {
}): Promise<void> {
const { conversationId, startOrdinal, endOrdinal, summaryId } = input;

this.db.exec("BEGIN");
const release = await acquireTransactionLock(this.db);
try {
// 1. Delete context items in the range [startOrdinal, endOrdinal]
this.db
.prepare(
`DELETE FROM context_items
WHERE conversation_id = ?
AND ordinal >= ?
AND ordinal <= ?`,
)
.run(conversationId, startOrdinal, endOrdinal);

// 2. Insert the replacement summary item at startOrdinal
this.db
.prepare(
`INSERT INTO context_items (conversation_id, ordinal, item_type, summary_id)
VALUES (?, ?, 'summary', ?)`,
)
.run(conversationId, startOrdinal, summaryId);

// 3. Resequence all ordinals to maintain contiguity (no gaps).
// Fetch current items, then update ordinals in order.
const items = this.db
.prepare(
`SELECT ordinal FROM context_items
WHERE conversation_id = ?
ORDER BY ordinal`,
)
.all(conversationId) as unknown as { ordinal: number }[];
this.db.exec("BEGIN");
try {
// 1. Delete context items in the range [startOrdinal, endOrdinal]
this.db
.prepare(
`DELETE FROM context_items
WHERE conversation_id = ?
AND ordinal >= ?
AND ordinal <= ?`,
)
.run(conversationId, startOrdinal, endOrdinal);

// 2. Insert the replacement summary item at startOrdinal
this.db
.prepare(
`INSERT INTO context_items (conversation_id, ordinal, item_type, summary_id)
VALUES (?, ?, 'summary', ?)`,
)
.run(conversationId, startOrdinal, summaryId);

// 3. Resequence all ordinals to maintain contiguity (no gaps).
// Fetch current items, then update ordinals in order.
const items = this.db
.prepare(
`SELECT ordinal FROM context_items
WHERE conversation_id = ?
ORDER BY ordinal`,
)
.all(conversationId) as unknown as { ordinal: number }[];

const updateStmt = this.db.prepare(
`UPDATE context_items
SET ordinal = ?
WHERE conversation_id = ? AND ordinal = ?`,
);

const updateStmt = this.db.prepare(
`UPDATE context_items
SET ordinal = ?
WHERE conversation_id = ? AND ordinal = ?`,
);
// Use negative temp ordinals first to avoid unique constraint conflicts
for (let i = 0; i < items.length; i++) {
updateStmt.run(-(i + 1), conversationId, items[i].ordinal);
}
for (let i = 0; i < items.length; i++) {
updateStmt.run(i, conversationId, -(i + 1));
}

// Use negative temp ordinals first to avoid unique constraint conflicts
for (let i = 0; i < items.length; i++) {
updateStmt.run(-(i + 1), conversationId, items[i].ordinal);
this.db.exec("COMMIT");
} catch (err) {
this.db.exec("ROLLBACK");
throw err;
}
for (let i = 0; i < items.length; i++) {
updateStmt.run(i, conversationId, -(i + 1));
}

this.db.exec("COMMIT");
} catch (err) {
this.db.exec("ROLLBACK");
throw err;
} finally {
release();
}
}

Expand Down
69 changes: 69 additions & 0 deletions src/transaction-mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Per-database async transaction mutex.
*
* Hotfix for https://github.com/Martian-Engineering/lossless-claw/issues/260
*
* Problem: Multiple async operations (from different sessions) share one
* synchronous DatabaseSync handle. SQLite does not support nested transactions.
* When two async code paths both try to BEGIN while an earlier BEGIN is still
* in-flight (awaiting async work inside the transaction), the second BEGIN
* fails with "cannot start a transaction within a transaction".
*
* Solution: A per-database async mutex that serializes all explicit transaction
* entry points. Uses a WeakMap keyed on the DatabaseSync instance so each
* database gets its own queue, and databases are garbage-collected normally.
*/

import type { DatabaseSync } from "node:sqlite";

interface MutexState {
/** Tail of the promise chain — each acquirer appends to this. */
tail: Promise<void>;
}

const mutexMap = new WeakMap<DatabaseSync, MutexState>();

function getOrCreateMutex(db: DatabaseSync): MutexState {
let state = mutexMap.get(db);
if (!state) {
state = { tail: Promise.resolve() };
mutexMap.set(db, state);
}
return state;
}

/**
* Acquire exclusive async access to the database for a transaction.
*
* Usage:
* const release = await acquireTransactionLock(this.db);
* try {
* this.db.exec("BEGIN IMMEDIATE");
* // ... do work ...
* this.db.exec("COMMIT");
* } catch (err) {
* this.db.exec("ROLLBACK");
* throw err;
* } finally {
* release();
* }
*
* Returns a release function that MUST be called in a finally block.
*/
Comment thread
100yenadmin marked this conversation as resolved.
export function acquireTransactionLock(db: DatabaseSync): Promise<() => void> {
const mutex = getOrCreateMutex(db);

let releaseResolve: () => void;
const releasePromise = new Promise<void>((resolve) => {
releaseResolve = resolve;
});

// Capture the current tail — we wait on it
const waitOn = mutex.tail;

// Advance the tail — next acquirer will wait on our release
mutex.tail = releasePromise;

// Wait for the previous holder to release, then return our release fn
return waitOn.then(() => releaseResolve!);
Comment thread
100yenadmin marked this conversation as resolved.
Outdated
}
Loading
Loading