Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def build_index(
index_store.store_signal_type_index(for_signal_type, built_index, checkpoint)
finally:
# Force garbage collection to reclaim memory and attempt to free pages
# explicitly free the built index before reclaiming memory
built_index = None
trim_process_memory(logger, "Indexer")

logger.info(
Expand Down Expand Up @@ -115,6 +117,9 @@ def _prepare_index(
index_cls = for_signal_type.get_index_cls()
built_index = index_cls.build(signal_list)

# explicitly free the signal list before returning
signal_list.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very suspicious this does anything, because it requires that the local variable is not properly garbage collected when it leaves scope, which stretches my understanding about how refcounting works.

Do we have any evidence that this is needed?

Random googling:

  1. https://stackoverflow.com/questions/28324084/does-deleting-reassigning-a-python-list-allow-for-garbage-collection
  2. https://blog.codingconfessions.com/p/cpython-garbage-collection-internals


# Create checkpoint
checkpoint = SignalTypeIndexBuildCheckpoint.get_empty()
if last_cs is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,28 +495,34 @@ def commit_signal_index(
store_start_time = time.time()
# Deep dark magic - direct access postgres large object API
raw_conn = db.engine.raw_connection()
l_obj = raw_conn.lobject(0, "wb", 0, tmpfile.name)
self._log(
"imported tmpfile as lobject oid %d took %s",
l_obj.oid,
duration_to_human_str(time.time() - store_start_time),
)
if self.serialized_index_large_object_oid is not None:
if self.index_lobj_exists():
old_obj = raw_conn.lobject(self.serialized_index_large_object_oid, "n")
self._log("deallocating old lobject %d", old_obj.oid)
old_obj.unlink()
else:
self._log(
"old lobject %d doesn't exist? "
+ "This might be a previous partial failure",
self.serialized_index_large_object_oid,
level=logging.WARNING,
)

self.serialized_index_large_object_oid = l_obj.oid
db.session.add(self)
raw_conn.commit()
try:
l_obj = raw_conn.lobject(0, "wb", 0, tmpfile.name)
self._log(
"imported tmpfile as lobject oid %d took %s",
l_obj.oid,
duration_to_human_str(time.time() - store_start_time),
)
if self.serialized_index_large_object_oid is not None:
if self.index_lobj_exists():
old_obj = raw_conn.lobject(
self.serialized_index_large_object_oid, "n"
)
self._log("deallocating old lobject %d", old_obj.oid)
old_obj.unlink()
else:
self._log(
"old lobject %d doesn't exist? "
+ "This might be a previous partial failure",
self.serialized_index_large_object_oid,
level=logging.WARNING,
)

self.serialized_index_large_object_oid = l_obj.oid
db.session.add(self)
raw_conn.commit()
finally:
# explicitly close the raw connection to free memory
raw_conn.close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might work too: https://docs.python.org/3/library/contextlib.html#contextlib.closing which lets you use a with statement.

Reading https://docs.sqlalchemy.org/en/21/core/connections.html#working-with-the-dbapi-cursor-directly I think this is very likely missing the close


self._log(
"commited new index, %d signals %s took %s",
Expand Down Expand Up @@ -546,28 +552,35 @@ def load_signal_index(self) -> SignalTypeIndex[int]:
# I'm sorry future debugger finding this comment.
load_start_time = time.time()
raw_conn = db.engine.raw_connection()
l_obj = raw_conn.lobject(oid, "rb")
try:
l_obj = raw_conn.lobject(oid, "rb")

with tempfile.NamedTemporaryFile("rb") as tmpfile:
self._log("importing lobject oid %d to tmpfile %s", l_obj.oid, tmpfile.name)
l_obj.export(tmpfile.name)
tmpfile.seek(0, io.SEEK_END)
self._log(
"downloading %s to tmpfile took %s",
_human_friendly_bytesize(tmpfile.tell()),
duration_to_human_str(time.time() - load_start_time),
)
tmpfile.seek(0)
with tempfile.NamedTemporaryFile("rb") as tmpfile:
self._log(
"importing lobject oid %d to tmpfile %s", l_obj.oid, tmpfile.name
)
l_obj.export(tmpfile.name)
tmpfile.seek(0, io.SEEK_END)
self._log(
"downloading %s to tmpfile took %s",
_human_friendly_bytesize(tmpfile.tell()),
duration_to_human_str(time.time() - load_start_time),
)
tmpfile.seek(0)

deserialize_start = time.time()
index = t.cast(
SignalTypeIndex[int],
SignalTypeIndex.deserialize(t.cast(t.BinaryIO, tmpfile.file)),
)
self._log(
"deserialize took %s",
duration_to_human_str(time.time() - deserialize_start),
)
finally:
# explicitly close the raw connection to free memory
raw_conn.close()

deserialize_start = time.time()
index = t.cast(
SignalTypeIndex[int],
SignalTypeIndex.deserialize(t.cast(t.BinaryIO, tmpfile.file)),
)
self._log(
"deserialize took %s",
duration_to_human_str(time.time() - deserialize_start),
)
self._log(
"loading signal index took %s",
duration_to_human_str(time.time() - load_start_time),
Expand Down
Loading