Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions system/lib/libc/dynlink.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,13 @@ static void thread_sync_done(void* arg) {
free(info);
}

// Proxying queue specially for handling code loading (dlopen) events.
// Initialized by the main thread on the first call to
// `_emscripten_proxy_dlsync` below, and processed by background threads
// that call `_emscripten_process_dlopen_queue` during futex_wait (i.e. whenever
// they block).
static em_proxying_queue * _Atomic dlopen_proxying_queue = NULL;
static thread_local bool processing_queue = false;

void _emscripten_process_dlopen_queue() {
if (dlopen_proxying_queue && !processing_queue) {
if (!processing_queue) {
assert(!emscripten_is_main_runtime_thread());
processing_queue = true;
emscripten_proxy_execute_queue(dlopen_proxying_queue);
emscripten_proxy_execute_queue(_emscripten_proxy_dlopen_queue());
processing_queue = false;
}
}
Expand All @@ -379,9 +373,6 @@ void _emscripten_process_dlopen_queue() {
// manages the worker pool.
int _emscripten_proxy_dlsync_async(pthread_t target_thread, em_promise_t promise) {
assert(emscripten_is_main_runtime_thread());
if (!dlopen_proxying_queue) {
dlopen_proxying_queue = em_proxying_queue_create();
}

struct promise_result* info = malloc(sizeof(struct promise_result));
if (!info) {
Expand All @@ -391,7 +382,7 @@ int _emscripten_proxy_dlsync_async(pthread_t target_thread, em_promise_t promise
.promise = promise,
.result = false,
};
int rtn = emscripten_proxy_callback(dlopen_proxying_queue,
int rtn = emscripten_proxy_callback(_emscripten_proxy_dlopen_queue(),
target_thread,
do_thread_sync,
thread_sync_done,
Expand All @@ -403,21 +394,17 @@ int _emscripten_proxy_dlsync_async(pthread_t target_thread, em_promise_t promise
emscripten_promise_resolve(promise, EM_PROMISE_FULFILL, NULL);
emscripten_promise_destroy(promise);
free(info);
} else {
// Wake up the target thread in case it's blocked in futex_wait
_emscripten_thread_notify(target_thread);
}
return rtn;
}

int _emscripten_proxy_dlsync(pthread_t target_thread) {
assert(emscripten_is_main_runtime_thread());
if (!dlopen_proxying_queue) {
dlopen_proxying_queue = em_proxying_queue_create();
}
int result;
if (!emscripten_proxy_sync(
dlopen_proxying_queue, target_thread, do_thread_sync_out, &result)) {
if (!emscripten_proxy_sync(_emscripten_proxy_dlopen_queue(),
target_thread,
do_thread_sync_out,
&result)) {
return 0;
}
return result;
Expand Down
4 changes: 2 additions & 2 deletions system/lib/pthread/emscripten_yield.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ void _emscripten_thread_crashed() {
// Notify the main thread that the calling thread has crashed. The will bring
// down the whole program next time the main thread calls `_emscripten_yield`.
crashed_thread_id = pthread_self();
// Force the main runtime thread to wake up in case it is waiting in
// `_emscripten_thread_notify`.
// Force the main runtime thread to wake up in case it is blocked in
// `emscripten_futex_wait`.
_emscripten_thread_notify(emscripten_main_runtime_thread_id());
}

Expand Down
36 changes: 30 additions & 6 deletions system/lib/pthread/proxying.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ em_proxying_queue* emscripten_proxy_get_system_queue(void) {
return &system_proxying_queue;
}

#ifdef EMSCRIPTEN_DYNAMIC_LINKING
// Proxying queue specially for handling code loading (dlopen) events.
// Processed by background threads that call `_emscripten_process_dlopen_queue`
// during futex_wait (i.e. whenever they block).
static em_proxying_queue dlopen_proxying_queue = {
.mutex = PTHREAD_MUTEX_INITIALIZER,
.task_queues = NULL,
.size = 0,
.capacity = 0,
};

em_proxying_queue* _emscripten_proxy_dlopen_queue(void) {
return &dlopen_proxying_queue;
}
#endif

em_proxying_queue* em_proxying_queue_create(void) {
// Allocate the new queue.
em_proxying_queue* q = malloc(sizeof(em_proxying_queue));
Expand Down Expand Up @@ -149,6 +165,9 @@ void emscripten_proxy_execute_queue(em_proxying_queue* q) {
static bool do_proxy(em_proxying_queue* q, pthread_t target_thread, task t) {
assert(q != NULL);
pthread_mutex_lock(&q->mutex);
#ifdef EMSCRIPTEN_DYNAMIC_LINKING
bool is_dlopen_queue = q == &dlopen_proxying_queue;
#endif
bool is_system_queue = q == &system_proxying_queue;
if (is_system_queue) {
system_queue_in_use = true;
Expand All @@ -163,12 +182,17 @@ static bool do_proxy(em_proxying_queue* q, pthread_t target_thread, task t) {
}

bool ret = em_task_queue_send(tasks, t);
// When proxying work to the main thread using the system queue we have a
// special case in that we need to wake the target thread in case it is in
// `emscripten_futex_wait`.
if (ret && is_system_queue &&
pthread_equal(target_thread, emscripten_main_runtime_thread_id())) {
DBG("waking main runtime thread using _emscripten_thread_notify");

// When proxying work to the dlopen or system queue we may have to wake the
// target thread in case it is blocked in `emscripten_futex_wait`.
bool needs_notify =
#ifdef EMSCRIPTEN_DYNAMIC_LINKING
is_dlopen_queue ||
#endif
(is_system_queue &&
pthread_equal(target_thread, emscripten_main_runtime_thread_id()));
if (ret && needs_notify) {
DBG("waking target thread using _emscripten_thread_notify");
_emscripten_thread_notify(target_thread);
}
return ret;
Expand Down
3 changes: 3 additions & 0 deletions system/lib/pthread/threading_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#pragma once

#include <emscripten/proxying.h>

#include <inttypes.h>
#include <pthread.h>
#include <stdbool.h>
Expand Down Expand Up @@ -82,6 +84,7 @@ int _emscripten_thread_is_valid(pthread_t thread);
void _emscripten_thread_exit_joinable(pthread_t thread);
void _emscripten_thread_exit(void* result);
void _emscripten_process_dlopen_queue(void);
em_proxying_queue* _emscripten_proxy_dlopen_queue(void);

#if !defined(__EMSCRIPTEN_PTHREADS__) || defined(NDEBUG)
#define emscripten_set_current_thread_status(newStatus)
Expand Down
8 changes: 4 additions & 4 deletions test/codesize/test_codesize_minimal_pthreads.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"a.out.js": 7143,
"a.out.js.gz": 3542,
"a.out.nodebug.wasm": 19037,
"a.out.nodebug.wasm.gz": 8787,
"total": 26180,
"total_gz": 12329,
"a.out.nodebug.wasm": 19036,
"a.out.nodebug.wasm.gz": 8786,
"total": 26179,
"total_gz": 12328,
"sent": [
"a (memory)",
"b (exit)",
Expand Down
8 changes: 4 additions & 4 deletions test/codesize/test_codesize_minimal_pthreads_memgrowth.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"a.out.js": 7551,
"a.out.js.gz": 3745,
"a.out.nodebug.wasm": 19038,
"a.out.nodebug.wasm.gz": 8788,
"total": 26589,
"total_gz": 12533,
"a.out.nodebug.wasm": 19037,
"a.out.nodebug.wasm.gz": 8787,
"total": 26588,
"total_gz": 12532,
"sent": [
"a (memory)",
"b (exit)",
Expand Down
59 changes: 25 additions & 34 deletions test/core/pthread/test_pthread_dlopen.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <unistd.h>
#include <emscripten/threading.h>

#define NUM_THREADS 8

typedef int* (*sidey_data_type)();
typedef int (*func_t)();
typedef func_t (*sidey_func_type)();
Expand All @@ -16,18 +18,17 @@ static sidey_func_type p_side_func_address;
static int* expected_data_addr;
static func_t expected_func_addr;

static atomic_bool started = false;
static atomic_bool ready = false;
pthread_barrier_t started;

static void* thread_main(void* arg) {
printf("in thread_main\n");
started = true;
int id = (int)(intptr_t)arg;

while (!ready) {
printf("yielding ..\n");
sched_yield();
usleep(1000*100);
}
printf("thread %d: in thread_main\n", id);

// Wait until all threads + main have reached the barrier
pthread_barrier_wait(&started);

usleep(1000000);

int* data_addr = p_side_data_address();
assert(data_addr == expected_data_addr);
Expand All @@ -36,20 +37,24 @@ static void* thread_main(void* arg) {
assert(expected_func_addr == func_addr);
assert(func_addr() == 43);

printf("thread_main done\n");
printf("thread %d: thread_main done\n", id);
return 0;
}

int main() {
printf("in main\n");

// Start a thread before loading the shared library
pthread_t t;
int rc = pthread_create(&t, NULL, thread_main, NULL);
assert(rc == 0);
pthread_barrier_init(&started, NULL, NUM_THREADS + 1);
pthread_t threads[NUM_THREADS];

// Spin until the thread has started
while (!started) {}
// Start threads before loading the shared library
for (int i = 0; i < NUM_THREADS; ++i) {
int rc = pthread_create(&threads[i], NULL, thread_main, (void*)(intptr_t)i);
assert(rc == 0);

rc = pthread_detach(threads[i]);
assert(rc == 0);
}

printf("loading dylib\n");
void* handle = dlopen("libside.so", RTLD_NOW|RTLD_GLOBAL);
Expand All @@ -71,25 +76,11 @@ int main() {
printf("p_side_func_address -> %p\n", expected_func_addr);
assert(expected_func_addr() == 43);

ready = true;

printf("joining\n");
rc = pthread_join(t, NULL);
assert(rc == 0);
printf("done join\n");

printf("starting second & third thread\n");
pthread_t t2, t3;
rc = pthread_create(&t2, NULL, thread_main, NULL);
assert(rc == 0);
rc = pthread_create(&t3, NULL, thread_main, NULL);
assert(rc == 0);
rc = pthread_join(t2, NULL);
assert(rc == 0);
rc = pthread_join(t3, NULL);
assert(rc == 0);
printf("starting second & third thread\n");
// Wait until all threads execute their entry points
pthread_barrier_wait(&started);

dlclose(handle);

printf("done\n");
return 0;
}
4 changes: 4 additions & 0 deletions test/test_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3503,6 +3503,10 @@ def test_dlopen_blocking(self):
# But with PROXY_TO_PTHEAD it does work, since we can do blocking and sync XHR in a worker.
self.btest_exit('other/test_dlopen_blocking.c', cflags=['-sMAIN_MODULE=2', '-sPROXY_TO_PTHREAD', '-pthread', '-Wno-experimental', '-sAUTOLOAD_DYLIBS=0', 'libside.so'])

def test_pthread_dlopen(self):
self.emcc('core/pthread/test_pthread_dlopen_side.c', ['-o', 'libside.so', '-sSIDE_MODULE', '-pthread', '-Wno-experimental'])
self.btest_exit('core/pthread/test_pthread_dlopen.c', cflags=['-sMAIN_MODULE=2', '-sEXIT_RUNTIME', '-sPTHREAD_POOL_SIZE=8', '-pthread', '-Wno-experimental', 'libside.so'])

# verify that dynamic linking works in all kinds of in-browser environments.
# don't mix different kinds in a single test.
@parameterized({
Expand Down
3 changes: 1 addition & 2 deletions test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9309,9 +9309,8 @@ def test_pthread_dlopen(self):
self.cflags += ['--embed-file', 'libside.so@libside.so']
self.prep_dlfcn_main()
self.set_setting('EXIT_RUNTIME')
self.set_setting('PROXY_TO_PTHREAD')
self.do_runf('core/pthread/test_pthread_dlopen.c',
['side module ctor', 'done join', 'side module atexit'],
['side module ctor', 'done', 'side module atexit'],
assert_all=True)

@needs_dylink
Expand Down
Loading