Skip to content
Snippets Groups Projects
Commit 8e70e8b2 authored by David Goulet's avatar David Goulet :panda_face:
Browse files

Merge branch 'maint-0.4.8'

parents 072e3994 dd25a8c7
No related branches found
No related tags found
No related merge requests found
Pipeline #262677 failed
o Minor bugfixes (threads, memory):
- Rework start and exit of worker threads.
- Improvements in cleanup of resources used by threads.
Fixes bug 40991; bugfix on 0.4.8.13-dev.
o Minor bugfix (conflux):
- Avoid a non fatal assert when describing a conflux circuit on the control
port after being prepped to be freed. Fixes bug 41037; bugfix on 0.4.8.15.
......@@ -28,8 +28,9 @@ CIRCUIT_IS_CONFLUX(const circuit_t *circ)
tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_LINKED);
return true;
} else {
tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_LINKED);
tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_UNLINKED);
/* We don't assert on purposes here because we can end up in this branch
* with circ->conflux being NULL but for a conflux purpose. This happens in
* the about_to_free() code path. */
return false;
}
}
......
......@@ -78,6 +78,8 @@ struct threadpool_t {
/** Number of elements in threads. */
int n_threads;
/** Number of elements to be created in threads. */
int n_threads_max;
/** Mutex to protect all the above fields. */
tor_mutex_t lock;
......@@ -88,6 +90,11 @@ struct threadpool_t {
void *(*new_thread_state_fn)(void*);
void (*free_thread_state_fn)(void*);
void *new_thread_state_arg;
/** Used for signalling the worker threads to exit. */
int exit;
/** Mutex for controlling worker threads' startup and exit. */
tor_mutex_t control_lock;
};
/** Used to put a workqueue_priority_t value into a bitfield. */
......@@ -270,13 +277,34 @@ worker_thread_extract_next_work(workerthread_t *thread)
static void
worker_thread_main(void *thread_)
{
static int n_worker_threads_running = 0;
workerthread_t *thread = thread_;
threadpool_t *pool = thread->in_pool;
workqueue_entry_t *work;
workqueue_reply_t result;
tor_mutex_acquire(&pool->control_lock);
log_debug(LD_GENERAL, "Worker thread %u/%u has started [TID: %lu].",
n_worker_threads_running + 1, pool->n_threads_max,
tor_get_thread_id());
if (++n_worker_threads_running == pool->n_threads_max)
tor_cond_signal_one(&pool->condition);
tor_mutex_release(&pool->control_lock);
/* Wait until all worker threads have started.
* pool->lock must be prelocked here. */
tor_mutex_acquire(&pool->lock);
log_debug(LD_GENERAL, "Worker thread has entered the work loop [TID: %lu].",
tor_get_thread_id());
while (1) {
/* Exit thread when signaled to exit */
if (pool->exit)
goto exit;
/* lock must be held at this point. */
while (worker_thread_has_work(thread)) {
/* lock must be held at this point. */
......@@ -290,11 +318,12 @@ worker_thread_main(void *thread_)
workqueue_reply_t r = update_fn(thread->state, arg);
if (r != WQ_RPL_REPLY) {
return;
}
tor_mutex_acquire(&pool->lock);
/* We may need to exit the thread. */
if (r != WQ_RPL_REPLY)
goto exit;
continue;
}
work = worker_thread_extract_next_work(thread);
......@@ -309,11 +338,11 @@ worker_thread_main(void *thread_)
/* Queue the reply for the main thread. */
queue_reply(thread->reply_queue, work);
/* We may need to exit the thread. */
if (result != WQ_RPL_REPLY) {
return;
}
tor_mutex_acquire(&pool->lock);
/* We may need to exit the thread. */
if (result != WQ_RPL_REPLY)
goto exit;
}
/* At this point the lock is held, and there is no work in this thread's
* queue. */
......@@ -325,6 +354,19 @@ worker_thread_main(void *thread_)
log_warn(LD_GENERAL, "Fail tor_cond_wait.");
}
}
exit:
/* At this point pool->lock must be held */
log_debug(LD_GENERAL, "Worker thread %u/%u has exited [TID: %lu].",
pool->n_threads_max - n_worker_threads_running + 1,
pool->n_threads_max, tor_get_thread_id());
if (--n_worker_threads_running == 0)
/* Let the main thread know, the last worker thread has exited. */
tor_mutex_release(&pool->control_lock);
tor_mutex_release(&pool->lock);
}
/** Put a reply on the reply queue. The reply must not currently be on
......@@ -516,12 +558,17 @@ threadpool_start_threads(threadpool_t *pool, int n)
if (n > MAX_THREADS)
n = MAX_THREADS;
tor_mutex_acquire(&pool->control_lock);
tor_mutex_acquire(&pool->lock);
if (pool->n_threads < n)
pool->threads = tor_reallocarray(pool->threads,
sizeof(workerthread_t*), n);
int status = 0;
pool->n_threads_max = n;
log_debug(LD_GENERAL, "Starting worker threads...");
while (pool->n_threads < n) {
/* For half of our threads, we'll choose lower priorities permissively;
* for the other half, we'll stick more strictly to higher priorities.
......@@ -536,16 +583,80 @@ threadpool_start_threads(threadpool_t *pool, int n)
//LCOV_EXCL_START
tor_assert_nonfatal_unreached();
pool->free_thread_state_fn(state);
tor_mutex_release(&pool->lock);
return -1;
status = -1;
goto check_status;
//LCOV_EXCL_STOP
}
thr->index = pool->n_threads;
pool->threads[pool->n_threads++] = thr;
}
struct timeval tv = {.tv_sec = 30, .tv_usec = 0};
/* Wait for the last launched thread to confirm us, it has started.
* Wait max 30 seconds */
status = tor_cond_wait(&pool->condition, &pool->control_lock, &tv);
check_status:
switch (status) {
case 0:
log_debug(LD_GENERAL, "Starting worker threads finished.");
break;
case -1:
log_warn(LD_GENERAL, "Failed to confirm worker threads' start up.");
break;
case 1:
log_warn(LD_GENERAL, "Failed to confirm worker threads' "
"start up after timeout.");
FALLTHROUGH;
default:
status = -1;
}
log_debug(LD_GENERAL, "Signaled the worker threads to enter the work loop.");
/* If we had an error, let the worker threads (if any) exit directly. */
if (status != 0) {
pool->exit = 1;
log_debug(LD_GENERAL, "Signaled the worker threads to exit...");
}
/* Let worker threads enter the work loop. */
tor_mutex_release(&pool->lock);
return 0;
/* pool->control_lock stays locked. This is required for the main thread
* to wait for the worker threads to exit on shutdown. */
return status;
}
/** Stop all worker threads */
static void
threadpool_stop_threads(threadpool_t *pool)
{
tor_mutex_acquire(&pool->lock);
if (pool->exit == 0) {
/* Signal the worker threads to exit */
pool->exit = 1;
/* If worker threads are waiting for work, let them continue to exit */
tor_cond_signal_all(&pool->condition);
log_debug(LD_GENERAL, "Signaled worker threads to exit. "
"Waiting for them to exit...");
}
tor_mutex_release(&pool->lock);
/* Wait until all worker threads have exited.
* pool->control_lock must be prelocked here. */
tor_mutex_acquire(&pool->control_lock);
/* Unlock required, else main thread hangs on mutex uninit. */
tor_mutex_release(&pool->control_lock);
/* If this message appears in the log before all threads have confirmed
* their exit, then pool->control_lock wasn't prelocked for some reason. */
log_debug(LD_GENERAL, "All worker threads have exited.");
}
/**
......@@ -566,6 +677,9 @@ threadpool_new(int n_threads,
pool = tor_malloc_zero(sizeof(threadpool_t));
tor_mutex_init_nonrecursive(&pool->lock);
tor_cond_init(&pool->condition);
tor_mutex_init_nonrecursive(&pool->control_lock);
pool->exit = 0;
unsigned i;
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
TOR_TAILQ_INIT(&pool->work[i]);
......@@ -579,8 +693,6 @@ threadpool_new(int n_threads,
if (threadpool_start_threads(pool, n_threads) < 0) {
//LCOV_EXCL_START
tor_assert_nonfatal_unreached();
tor_cond_uninit(&pool->condition);
tor_mutex_uninit(&pool->lock);
threadpool_free(pool);
return NULL;
//LCOV_EXCL_STOP
......@@ -598,6 +710,14 @@ threadpool_free_(threadpool_t *pool)
if (!pool)
return;
threadpool_stop_threads(pool);
log_debug(LD_GENERAL, "Beginning to clean up...");
tor_cond_uninit(&pool->condition);
tor_mutex_uninit(&pool->lock);
tor_mutex_uninit(&pool->control_lock);
if (pool->threads) {
for (int i = 0; i != pool->n_threads; ++i)
workerthread_free(pool->threads[i]);
......@@ -605,21 +725,35 @@ threadpool_free_(threadpool_t *pool)
tor_free(pool->threads);
}
if (pool->update_args)
pool->free_update_arg_fn(pool->update_args);
if (pool->update_args) {
if (!pool->free_update_arg_fn)
log_warn(LD_GENERAL, "Freeing pool->update_args not possible. "
"pool->free_update_arg_fn is not set.");
else
pool->free_update_arg_fn(pool->update_args);
}
if (pool->reply_event) {
tor_event_del(pool->reply_event);
tor_event_free(pool->reply_event);
if (tor_event_del(pool->reply_event) == -1)
log_warn(LD_GENERAL, "libevent error: deleting reply event failed.");
else
tor_event_free(pool->reply_event);
}
if (pool->reply_queue)
replyqueue_free(pool->reply_queue);
if (pool->new_thread_state_arg)
pool->free_thread_state_fn(pool->new_thread_state_arg);
if (pool->new_thread_state_arg) {
if (!pool->free_thread_state_fn)
log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. "
"pool->free_thread_state_fn is not set.");
else
pool->free_thread_state_fn(pool->new_thread_state_arg);
}
tor_free(pool);
log_debug(LD_GENERAL, "Cleanup finished.");
}
/** Return the reply queue associated with a given thread pool. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment