Fix race conditions in threadpool when dealing with dynamic/frequent n_threads changes (llama/17748)
* tests: update barrier test to check for race condition in active threads * cpu: combine n_graph and n_threads into a single atomic update * tests: add multi-graph test for test_barrier
This commit is contained in:
parent
cd9b8c6d18
commit
a2886fba48
|
|
@ -187,6 +187,9 @@ typedef void * thread_ret_t;
|
|||
|
||||
typedef pthread_t ggml_thread_t;
|
||||
|
||||
#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU)
|
||||
#define GGML_THREADPOOL_N_THREADS_BITS (16)
|
||||
|
||||
#if defined(__APPLE__)
|
||||
#include <unistd.h>
|
||||
#include <mach/mach.h>
|
||||
|
|
@ -449,7 +452,7 @@ struct ggml_threadpool {
|
|||
struct ggml_cplan * cplan;
|
||||
|
||||
// synchronization primitives
|
||||
atomic_int n_graph; // incremented when there is work to be done (i.e each graph)
|
||||
atomic_int n_graph; // updated when there is work to be done (i.e each graph) holds graph and active thread counts.
|
||||
atomic_int GGML_CACHE_ALIGN n_barrier;
|
||||
atomic_int GGML_CACHE_ALIGN n_barrier_passed;
|
||||
atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
|
||||
|
|
@ -457,12 +460,10 @@ struct ggml_threadpool {
|
|||
// these are atomic as an annotation for thread-sanitizer
|
||||
atomic_bool stop; // Used for stopping the threadpool altogether
|
||||
atomic_bool pause; // Used for pausing the threadpool or individual threads
|
||||
atomic_int abort; // Used for aborting processing of a graph
|
||||
atomic_int abort; // Used for aborting processing of a graph
|
||||
|
||||
struct ggml_compute_state * workers; // per thread state
|
||||
int n_threads_max; // number of threads in the pool
|
||||
atomic_int n_threads_cur; // number of threads used in the current graph
|
||||
|
||||
int n_threads; // Number of threads in the pool
|
||||
int32_t prio; // Scheduling priority
|
||||
uint32_t poll; // Polling level (0 - no polling)
|
||||
|
||||
|
|
@ -539,7 +540,7 @@ struct ggml_state {
|
|||
static struct ggml_state g_state = {0};
|
||||
|
||||
void ggml_barrier(struct ggml_threadpool * tp) {
|
||||
int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
|
||||
int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK;
|
||||
if (n_threads == 1) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -556,7 +557,7 @@ void ggml_barrier(struct ggml_threadpool * tp) {
|
|||
// last thread
|
||||
atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
|
||||
|
||||
// exit barrier (fill seq-cst fence)
|
||||
// exit barrier (full seq-cst fence)
|
||||
atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst);
|
||||
return;
|
||||
}
|
||||
|
|
@ -2628,7 +2629,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask
|
|||
void ggml_threadpool_free(struct ggml_threadpool* threadpool) {
|
||||
if (!threadpool) return;
|
||||
|
||||
const int n_threads = threadpool->n_threads_max;
|
||||
const int n_threads = threadpool->n_threads;
|
||||
|
||||
#ifndef GGML_USE_OPENMP
|
||||
struct ggml_compute_state* workers = threadpool->workers;
|
||||
|
|
@ -2704,7 +2705,7 @@ struct ggml_cplan ggml_graph_plan(
|
|||
//GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
|
||||
}
|
||||
if (n_threads <= 0) {
|
||||
n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
|
||||
n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS;
|
||||
}
|
||||
|
||||
#if defined(__EMSCRIPTEN__) && !defined(__EMSCRIPTEN_PTHREADS__)
|
||||
|
|
@ -2912,12 +2913,14 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
|
|||
|
||||
struct ggml_compute_params params = {
|
||||
/*.ith =*/ state->ith,
|
||||
/*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
|
||||
/*.nth =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK,
|
||||
/*.wsize =*/ cplan->work_size,
|
||||
/*.wdata =*/ cplan->work_data,
|
||||
/*.threadpool=*/ tp,
|
||||
};
|
||||
|
||||
GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
|
||||
|
||||
for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) {
|
||||
struct ggml_tensor * node = cgraph->nodes[node_n];
|
||||
|
||||
|
|
@ -2939,6 +2942,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
|
|||
}
|
||||
}
|
||||
|
||||
GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
|
||||
|
||||
ggml_barrier(state->threadpool);
|
||||
|
||||
return 0;
|
||||
|
|
@ -2946,27 +2951,23 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
|
|||
|
||||
#ifndef GGML_USE_OPENMP
|
||||
|
||||
// check if thread is active
|
||||
static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
|
||||
struct ggml_threadpool * threadpool = state->threadpool;
|
||||
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
|
||||
return (state->ith < n_threads);
|
||||
}
|
||||
|
||||
// check if thread is ready to proceed (exit from polling or sleeping)
|
||||
// returns true if loops should exit, sets state->pending to indicate new work
|
||||
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
|
||||
struct ggml_threadpool * threadpool = state->threadpool;
|
||||
|
||||
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
|
||||
|
||||
// check for new graph/work
|
||||
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
|
||||
if (new_graph != state->last_graph) {
|
||||
state->pending = ggml_graph_compute_thread_active(state);
|
||||
state->last_graph = new_graph;
|
||||
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
|
||||
int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK;
|
||||
if (n_graph != state->last_graph) {
|
||||
state->pending = (state->ith < n_threads);
|
||||
state->last_graph = n_graph;
|
||||
return true;
|
||||
}
|
||||
|
||||
return state->pending;
|
||||
return false;
|
||||
}
|
||||
|
||||
// sync thread state after polling
|
||||
|
|
@ -2983,11 +2984,6 @@ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * st
|
|||
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
|
||||
struct ggml_threadpool * threadpool = state->threadpool;
|
||||
|
||||
// Skip polling for unused threads
|
||||
if (!ggml_graph_compute_thread_active(state)) {
|
||||
return state->pending;
|
||||
}
|
||||
|
||||
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
|
||||
// Perhaps, we can adjust it dynamically based on load and things.
|
||||
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
|
||||
|
|
@ -3049,7 +3045,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
|
|||
ggml_graph_compute_check_for_work(state);
|
||||
if (state->pending) {
|
||||
state->pending = false;
|
||||
|
||||
ggml_graph_compute_thread(state);
|
||||
}
|
||||
}
|
||||
|
|
@ -3064,14 +3059,15 @@ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int
|
|||
|
||||
ggml_mutex_lock(&threadpool->mutex);
|
||||
|
||||
GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
|
||||
// Update the number of active threads and the graph count
|
||||
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS;
|
||||
n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK);
|
||||
|
||||
// Update the number of active threads
|
||||
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
|
||||
GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph);
|
||||
|
||||
// Indicate the graph is ready to be processed
|
||||
// We need the full seq-cst fence here because of the polling threads (used in thread_sync)
|
||||
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
|
||||
atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst);
|
||||
|
||||
if (threadpool->pause) {
|
||||
// Update main thread prio and affinity to match the threadpool settings
|
||||
|
|
@ -3109,8 +3105,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
|
|||
threadpool->pause = tpp->paused;
|
||||
threadpool->abort = -1;
|
||||
threadpool->workers = NULL;
|
||||
threadpool->n_threads_max = tpp->n_threads;
|
||||
threadpool->n_threads_cur = tpp->n_threads;
|
||||
threadpool->n_threads = tpp->n_threads;
|
||||
threadpool->poll = tpp->poll;
|
||||
threadpool->prio = tpp->prio;
|
||||
threadpool->ec = GGML_STATUS_SUCCESS;
|
||||
|
|
@ -3205,7 +3200,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
|
|||
{
|
||||
// update the number of threads from the actual number of threads that we got from OpenMP
|
||||
n_threads = omp_get_num_threads();
|
||||
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
|
||||
atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed);
|
||||
}
|
||||
|
||||
// Apply thread CPU mask and priority
|
||||
|
|
@ -3218,13 +3213,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
|
|||
ggml_graph_compute_thread(&threadpool->workers[ith]);
|
||||
}
|
||||
} else {
|
||||
atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed);
|
||||
atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
|
||||
ggml_graph_compute_thread(&threadpool->workers[0]);
|
||||
}
|
||||
#else
|
||||
if (n_threads > threadpool->n_threads_max) {
|
||||
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
|
||||
n_threads = threadpool->n_threads_max;
|
||||
if (n_threads > threadpool->n_threads) {
|
||||
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads);
|
||||
n_threads = threadpool->n_threads;
|
||||
}
|
||||
|
||||
// Kick all threads to start the new graph
|
||||
|
|
|
|||
Loading…
Reference in New Issue