Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 108 additions & 92 deletions mempalace/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import os
import shutil
import sqlite3
import tempfile
from collections import defaultdict
from contextlib import closing
from datetime import datetime


Expand Down Expand Up @@ -51,61 +53,65 @@ def extract_drawers_from_sqlite(db_path: str) -> list:

Works regardless of which ChromaDB version created the database.
Returns list of dicts with 'id', 'document', and 'metadata' keys.
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row

# Get all embedding IDs and their documents
rows = conn.execute(
"""
SELECT e.embedding_id,
MAX(CASE WHEN em.key = 'chroma:document' THEN em.string_value END) as document
FROM embeddings e
JOIN embedding_metadata em ON em.id = e.id
GROUP BY e.embedding_id
The connection is wrapped in ``contextlib.closing`` so an exception
during extraction does not leak the SQLite handle. On Windows that
would leave a file lock on ``chroma.sqlite3`` and prevent the rest
of the migration from touching the palace directory.
"""
).fetchall()
with closing(sqlite3.connect(db_path)) as conn:
conn.row_factory = sqlite3.Row

drawers = []
for row in rows:
embedding_id = row["embedding_id"]
document = row["document"]
if not document:
continue

# Get metadata for this embedding
meta_rows = conn.execute(
# Get all embedding IDs and their documents
rows = conn.execute(
"""
SELECT em.key, em.string_value, em.int_value, em.float_value, em.bool_value
FROM embedding_metadata em
JOIN embeddings e ON e.id = em.id
WHERE e.embedding_id = ?
AND em.key NOT LIKE 'chroma:%'
""",
(embedding_id,),
SELECT e.embedding_id,
MAX(CASE WHEN em.key = 'chroma:document' THEN em.string_value END) as document
FROM embeddings e
JOIN embedding_metadata em ON em.id = e.id
GROUP BY e.embedding_id
"""
).fetchall()

metadata = {}
for mr in meta_rows:
key = mr["key"]
if mr["string_value"] is not None:
metadata[key] = mr["string_value"]
elif mr["int_value"] is not None:
metadata[key] = mr["int_value"]
elif mr["float_value"] is not None:
metadata[key] = mr["float_value"]
elif mr["bool_value"] is not None:
metadata[key] = bool(mr["bool_value"])

drawers.append(
{
"id": embedding_id,
"document": document,
"metadata": metadata,
}
)
drawers = []
for row in rows:
embedding_id = row["embedding_id"]
document = row["document"]
if not document:
continue

# Get metadata for this embedding
meta_rows = conn.execute(
"""
SELECT em.key, em.string_value, em.int_value, em.float_value, em.bool_value
FROM embedding_metadata em
JOIN embeddings e ON e.id = em.id
WHERE e.embedding_id = ?
AND em.key NOT LIKE 'chroma:%'
""",
(embedding_id,),
).fetchall()

metadata = {}
for mr in meta_rows:
key = mr["key"]
if mr["string_value"] is not None:
metadata[key] = mr["string_value"]
elif mr["int_value"] is not None:
metadata[key] = mr["int_value"]
elif mr["float_value"] is not None:
metadata[key] = mr["float_value"]
elif mr["bool_value"] is not None:
metadata[key] = bool(mr["bool_value"])

drawers.append(
{
"id": embedding_id,
"document": document,
"metadata": metadata,
}
)

conn.close()
return drawers


Expand Down Expand Up @@ -226,53 +232,63 @@ def migrate(palace_path: str, dry_run: bool = False, confirm: bool = False):
print(f"\n Backing up to {backup_path}...")
shutil.copytree(palace_path, backup_path)

# Build fresh palace in a temp directory (avoids chromadb reading old state)
import tempfile

# Build fresh palace in a temp directory (avoids chromadb reading old state).
# Wrap the whole import-and-swap dance in try/finally so the temp dir is
# cleaned up if any of the chromadb writes, the verify count, or the
# rename fails — without try/finally a crashed migration leaves a partial
# palace dir under the system temp root that the user has to find by hand.
temp_palace = tempfile.mkdtemp(prefix="mempalace_migrate_")
print(f" Creating fresh palace in {temp_palace}...")
fresh_backend = ChromaBackend()
col = fresh_backend.get_or_create_collection(temp_palace, "mempalace_drawers")

# Re-import in batches
batch_size = 500
imported = 0
for i in range(0, len(drawers), batch_size):
batch = drawers[i : i + batch_size]
col.add(
ids=[d["id"] for d in batch],
documents=[d["document"] for d in batch],
metadatas=[d["metadata"] for d in batch],
)
imported += len(batch)
print(f" Imported {imported}/{len(drawers)} drawers...")

# Verify before swapping
final_count = col.count()
del col
del fresh_backend

# Swap: rename old palace aside, then move new one into place.
# This avoids a window where both old and new are missing.
print(" Swapping old palace for migrated version...")
stale_path = palace_path + ".old"
if os.path.exists(stale_path):
shutil.rmtree(stale_path)
os.replace(palace_path, stale_path)
try:
os.replace(temp_palace, palace_path)
except OSError as e:
# EXDEV = temp lives on a different filesystem; fall back to copy+delete.
# Anything else is a real error — don't mask it with shutil.move.
if getattr(e, "errno", None) != errno.EXDEV:
_restore_stale_palace(palace_path, stale_path)
raise
print(f" Creating fresh palace in {temp_palace}...")
fresh_backend = ChromaBackend()
col = fresh_backend.get_or_create_collection(temp_palace, "mempalace_drawers")

# Re-import in batches
batch_size = 500
imported = 0
for i in range(0, len(drawers), batch_size):
batch = drawers[i : i + batch_size]
col.add(
ids=[d["id"] for d in batch],
documents=[d["document"] for d in batch],
metadatas=[d["metadata"] for d in batch],
)
imported += len(batch)
print(f" Imported {imported}/{len(drawers)} drawers...")

# Verify before swapping
final_count = col.count()
del col
del fresh_backend

# Swap: rename old palace aside, then move new one into place.
# This avoids a window where both old and new are missing.
print(" Swapping old palace for migrated version...")
stale_path = palace_path + ".old"
if os.path.exists(stale_path):
shutil.rmtree(stale_path)
os.replace(palace_path, stale_path)
try:
shutil.move(temp_palace, palace_path)
except Exception:
_restore_stale_palace(palace_path, stale_path)
raise
shutil.rmtree(stale_path, ignore_errors=True)
os.replace(temp_palace, palace_path)
except OSError as e:
# EXDEV = temp lives on a different filesystem; fall back to copy+delete.
# Anything else is a real error — don't mask it with shutil.move.
if getattr(e, "errno", None) != errno.EXDEV:
_restore_stale_palace(palace_path, stale_path)
raise
try:
shutil.move(temp_palace, palace_path)
except Exception:
_restore_stale_palace(palace_path, stale_path)
raise
shutil.rmtree(stale_path, ignore_errors=True)
finally:
# On the happy path os.replace/shutil.move consumed temp_palace, so
# the directory no longer exists at the temp location — the existence
# guard makes this a no-op then. On any failure path it actually
# removes the orphan.
if os.path.exists(temp_palace):
shutil.rmtree(temp_palace, ignore_errors=True)

print("\n Migration complete.")
print(f" Drawers migrated: {final_count}")
Expand Down
83 changes: 82 additions & 1 deletion tests/test_migrate.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Tests for destructive-operation safety in mempalace.migrate."""

import os
import sqlite3
from types import SimpleNamespace
from unittest.mock import MagicMock, patch

from mempalace.migrate import _restore_stale_palace, migrate
from mempalace.migrate import _restore_stale_palace, extract_drawers_from_sqlite, migrate


def test_migrate_requires_palace_database(tmp_path, capsys):
Expand Down Expand Up @@ -101,3 +102,83 @@ def test_restore_stale_palace_logs_and_swallows_on_failure(tmp_path, capsys):
assert "CRITICAL" in out
assert os.fspath(palace_path) in out
assert os.fspath(stale_path) in out


def _make_minimal_chromadb_sqlite(tmp_path):
"""Build a SQLite file with the minimal schema extract_drawers_from_sqlite reads."""
db = tmp_path / "chroma.sqlite3"
conn = sqlite3.connect(str(db))
conn.executescript(
"""
CREATE TABLE embeddings (id INTEGER PRIMARY KEY, embedding_id TEXT);
CREATE TABLE embedding_metadata (
id INTEGER, key TEXT,
string_value TEXT, int_value INTEGER,
float_value REAL, bool_value INTEGER
);
INSERT INTO embeddings VALUES (1, 'd-001');
INSERT INTO embedding_metadata VALUES (1, 'chroma:document', 'hello', NULL, NULL, NULL);
INSERT INTO embedding_metadata VALUES (1, 'wing', 'personal', NULL, NULL, NULL);
INSERT INTO embedding_metadata VALUES (1, 'room', '2026-04-26', NULL, NULL, NULL);
"""
)
conn.commit()
conn.close()
return str(db)


def test_extract_drawers_returns_drawers(tmp_path):
db_path = _make_minimal_chromadb_sqlite(tmp_path)
drawers = extract_drawers_from_sqlite(db_path)
assert len(drawers) == 1
assert drawers[0]["id"] == "d-001"
assert drawers[0]["document"] == "hello"
assert drawers[0]["metadata"] == {"wing": "personal", "room": "2026-04-26"}


def test_migrate_cleans_temp_palace_on_chromadb_failure(tmp_path):
"""If chromadb fails after the temp palace is created, mkdtemp's
directory must be removed — without try/finally it leaked into the
system temp root forever."""
import tempfile as _tempfile

palace_dir = tmp_path / "palace"
palace_dir.mkdir()
(palace_dir / "chroma.sqlite3").write_text("db")

captured_temp_paths = []
real_mkdtemp = _tempfile.mkdtemp

def tracking_mkdtemp(*args, **kwargs):
path = real_mkdtemp(*args, **kwargs)
captured_temp_paths.append(path)
return path

failing_backend = MagicMock()
# First ChromaBackend().get_collection() must raise so we drop into
# the SQL-extraction path; the second ChromaBackend().get_or_create_collection()
# raises to trigger the cleanup we are testing.
failing_backend.get_collection.side_effect = Exception("unreadable")
failing_backend.get_or_create_collection.side_effect = RuntimeError("chromadb boom")

import mempalace.backends.chroma as _chroma_mod

with (
patch("mempalace.migrate.detect_chromadb_version", return_value="0.5.x"),
patch(
"mempalace.migrate.extract_drawers_from_sqlite",
return_value=[{"id": "id1", "document": "doc", "metadata": {"wing": "w", "room": "r"}}],
),
patch("builtins.input", return_value="y"),
patch("mempalace.migrate.shutil.copytree"),
patch("mempalace.migrate.tempfile.mkdtemp", side_effect=tracking_mkdtemp),
patch.object(_chroma_mod, "ChromaBackend", return_value=failing_backend),
):
try:
migrate(str(palace_dir), confirm=True)
except Exception:
pass

assert captured_temp_paths, "mkdtemp was never called — flow short-circuited"
for p in captured_temp_paths:
assert not os.path.exists(p), f"temp palace was not cleaned up: {p}"