@@ -71,11 +71,6 @@ std::atomic<embedding_state_t> EmbeddingManager::m_state{embedding_state_t::EMBE
7171std::condition_variable EmbeddingManager::m_manager_cv;
7272std::mutex EmbeddingManager::m_manager_mutex;
7373
74- std::condition_variable EmbeddingManager::m_fully_stopped_cv;
75- std::mutex EmbeddingManager::m_fully_stopped_mutex;
76- bool EmbeddingManager::m_fully_stopped{true };
77- std::atomic<bool > EmbeddingManager::m_shutdown_initiated{false };
78-
7974struct ScopedInternalTHD {
8075 THD *thd{nullptr };
8176
@@ -662,7 +657,6 @@ static void *embedding_table_worker_func(void *arg) {
662657 }
663658
664659 DBUG_PRINT (" ml" , (" ML TableWorker [%s]: exiting" , ctx->key .c_str ()));
665- mgr->on_thread_exiting ();
666660 return nullptr ;
667661}
668662
@@ -824,12 +818,11 @@ static void *embedding_manager_func(void *arg) {
824818 }
825819
826820 DBUG_PRINT (" ml" , (" ML EmbeddingManager: event loop finished, exiting." ));
827- mgr->on_thread_exiting ();
828821 return nullptr ;
829822}
830823
831824void EmbeddingManager::start_impl () {
832- if (!m_initialized.load () || m_shutdown_initiated. load (std::memory_order_acquire) ) return ;
825+ if (!m_initialized.load ()) return ;
833826 embedding_state_t expected = embedding_state_t ::EMBEDDING_STATE_EXIT;
834827 if (!m_state.compare_exchange_strong (expected, embedding_state_t ::EMBEDDING_STATE_RUN, std::memory_order_acq_rel,
835828 std::memory_order_acquire)) {
@@ -840,13 +833,6 @@ void EmbeddingManager::start_impl() {
840833 }
841834 }
842835
843- {
844- std::lock_guard<std::mutex> lk (m_fully_stopped_mutex);
845- m_fully_stopped = false ;
846- }
847-
848- m_active_thread_count.fetch_add (1 , std::memory_order_relaxed);
849-
850836 {
851837 std::lock_guard<std::mutex> lk (m_embedder_mutex);
852838 m_embedder = std::make_unique<ML_embedding_row>();
@@ -862,15 +848,36 @@ void EmbeddingManager::start_impl() {
862848 if (my_thread_create (&m_manager_thread, &attr, embedding_manager_func, this ) != 0 ) {
863849 sql_print_error (" [EmbeddingManager] start: failed to create coordinator thread" );
864850 my_thread_attr_destroy (&attr);
865- m_active_thread_count.fetch_sub (1 , std::memory_order_relaxed);
866851 m_state.store (embedding_state_t ::EMBEDDING_STATE_STOP, std::memory_order_release);
867852 return ;
868853 }
869854 my_thread_attr_destroy (&attr);
870855}
871856
857+ void EmbeddingManager::shutdown_impl () {
858+ initiate_shutdown_impl ();
859+
860+ if (m_manager_thread.thread != 0 ) {
861+ my_thread_join (&m_manager_thread, nullptr );
862+ m_manager_thread = {};
863+ }
864+
865+ {
866+ std::lock_guard<std::mutex> lk (m_table_workers_mutex);
867+ for (auto &[key, ctx] : m_table_workers) {
868+ if (ctx->thread .thread != 0 ) {
869+ my_thread_join (&ctx->thread , nullptr );
870+ ctx->thread = {};
871+ }
872+ }
873+ m_table_workers.clear ();
874+ }
875+
876+ m_state.store (embedding_state_t ::EMBEDDING_STATE_EXIT, std::memory_order_release);
877+ m_initialized.store (false );
878+ }
879+
872880void EmbeddingManager::initiate_shutdown_impl () {
873- m_shutdown_initiated.store (true , std::memory_order_release);
874881 embedding_state_t expected = embedding_state_t ::EMBEDDING_STATE_RUN;
875882 if (!m_state.compare_exchange_strong (expected, embedding_state_t ::EMBEDDING_STATE_STOP, std::memory_order_acq_rel,
876883 std::memory_order_acquire)) {
@@ -907,55 +914,6 @@ void EmbeddingManager::initiate_shutdown_impl() {
907914 m_manager_cv.notify_all ();
908915}
909916
910- void EmbeddingManager::on_thread_exiting () {
911- int prev = m_active_thread_count.fetch_sub (1 , std::memory_order_acq_rel);
912- if (prev == 1 ) {
913- std::lock_guard<std::mutex> lk (m_fully_stopped_mutex);
914- m_fully_stopped = true ;
915- m_fully_stopped_cv.notify_all ();
916- DBUG_PRINT (" ml" , (" ML EmbeddingManager: all threads exited — shutdown gate open." ));
917- }
918- }
919-
920- void EmbeddingManager::initiate_shutdown () {
921- auto *mgr = instance ();
922- if (mgr && EmbeddingManager::is_running ()) mgr->initiate_shutdown_impl ();
923- }
924-
925- bool EmbeddingManager::wait_until_fully_stopped (std::chrono::milliseconds timeout) {
926- std::unique_lock<std::mutex> lk (m_fully_stopped_mutex);
927- return m_fully_stopped_cv.wait_for (lk, timeout, [] { return m_fully_stopped; });
928- }
929-
930- void EmbeddingManager::shutdown_impl () {
931- initiate_shutdown_impl ();
932-
933- if (m_manager_thread.thread != 0 ) {
934- my_thread_join (&m_manager_thread, nullptr );
935- m_manager_thread = {};
936- }
937-
938- {
939- std::lock_guard<std::mutex> lk (m_table_workers_mutex);
940- for (auto &[key, ctx] : m_table_workers) {
941- if (ctx->thread .thread != 0 ) {
942- my_thread_join (&ctx->thread , nullptr );
943- ctx->thread = {};
944- }
945- }
946- m_table_workers.clear ();
947- }
948-
949- {
950- std::lock_guard<std::mutex> lk (m_fully_stopped_mutex);
951- m_fully_stopped = true ;
952- }
953- m_fully_stopped_cv.notify_all ();
954-
955- m_state.store (embedding_state_t ::EMBEDDING_STATE_EXIT, std::memory_order_release);
956- m_initialized.store (false );
957- }
958-
959917/* *
960918 * When the pool is below MAX_WORKER_THREADS, spawn a new dedicated worker for
961919 * this (schema, table) key. Once the pool is full, return the existing worker
@@ -977,13 +935,11 @@ TableWorkerContext *EmbeddingManager::get_or_create_worker(const std::string &ke
977935 my_thread_attr_t attr;
978936 my_thread_attr_init (&attr);
979937 my_thread_attr_setdetachstate (&attr, MY_THREAD_CREATE_JOINABLE);
980- m_active_thread_count.fetch_add (1 , std::memory_order_relaxed);
981938 int rc = my_thread_create (&ctx->thread , &attr, embedding_table_worker_func, ctx.get ());
982939 my_thread_attr_destroy (&attr);
983940
984941 if (rc != 0 ) {
985942 DBUG_PRINT (" ml" , (" ML EmbeddingManager: failed to spawn worker for %s" , key.c_str ()));
986- m_active_thread_count.fetch_sub (1 , std::memory_order_relaxed);
987943 return nullptr ;
988944 }
989945
0 commit comments