diff --git a/ggml/src/ggml-cpu/ggml-cpu.c b/ggml/src/ggml-cpu/ggml-cpu.c index b468b115..c47511ad 100644 --- a/ggml/src/ggml-cpu/ggml-cpu.c +++ b/ggml/src/ggml-cpu/ggml-cpu.c @@ -187,6 +187,9 @@ typedef void * thread_ret_t; typedef pthread_t ggml_thread_t; +#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU) +#define GGML_THREADPOOL_N_THREADS_BITS (16) + #if defined(__APPLE__) #include #include @@ -449,7 +452,7 @@ struct ggml_threadpool { struct ggml_cplan * cplan; // synchronization primitives - atomic_int n_graph; // incremented when there is work to be done (i.e each graph) + atomic_int n_graph; // updated when there is work to be done (i.e each graph) holds graph and active thread counts. atomic_int GGML_CACHE_ALIGN n_barrier; atomic_int GGML_CACHE_ALIGN n_barrier_passed; atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads. @@ -457,12 +460,10 @@ struct ggml_threadpool { // these are atomic as an annotation for thread-sanitizer atomic_bool stop; // Used for stopping the threadpool altogether atomic_bool pause; // Used for pausing the threadpool or individual threads - atomic_int abort; // Used for aborting processing of a graph + atomic_int abort; // Used for aborting processing of a graph struct ggml_compute_state * workers; // per thread state - int n_threads_max; // number of threads in the pool - atomic_int n_threads_cur; // number of threads used in the current graph - + int n_threads; // Number of threads in the pool int32_t prio; // Scheduling priority uint32_t poll; // Polling level (0 - no polling) @@ -539,7 +540,7 @@ struct ggml_state { static struct ggml_state g_state = {0}; void ggml_barrier(struct ggml_threadpool * tp) { - int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed); + int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK; if (n_threads == 1) { return; } @@ -556,7 +557,7 @@ void ggml_barrier(struct ggml_threadpool * tp) { // last thread atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed); - // exit barrier (fill seq-cst fence) + // exit barrier (full seq-cst fence) atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst); return; } @@ -2628,7 +2629,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask void ggml_threadpool_free(struct ggml_threadpool* threadpool) { if (!threadpool) return; - const int n_threads = threadpool->n_threads_max; + const int n_threads = threadpool->n_threads; #ifndef GGML_USE_OPENMP struct ggml_compute_state* workers = threadpool->workers; @@ -2704,7 +2705,7 @@ struct ggml_cplan ggml_graph_plan( //GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads); } if (n_threads <= 0) { - n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS; + n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS; } #if defined(__EMSCRIPTEN__) && !defined(__EMSCRIPTEN_PTHREADS__) @@ -2912,12 +2913,14 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_params params = { /*.ith =*/ state->ith, - /*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed), + /*.nth =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK, /*.wsize =*/ cplan->work_size, /*.wdata =*/ cplan->work_data, /*.threadpool=*/ tp, }; + GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph); + for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) { struct ggml_tensor * node = cgraph->nodes[node_n]; @@ -2939,6 +2942,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { } } + GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph); + ggml_barrier(state->threadpool); return 0; @@ -2946,27 +2951,23 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { #ifndef GGML_USE_OPENMP -// check if thread is active -static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) { - struct ggml_threadpool * threadpool = state->threadpool; - int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed); - return (state->ith < n_threads); -} - // check if thread is ready to proceed (exit from polling or sleeping) +// returns true if loops should exit, sets state->pending to indicate new work static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) { struct ggml_threadpool * threadpool = state->threadpool; if (state->pending || threadpool->stop || threadpool->pause) { return true; } // check for new graph/work - int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); - if (new_graph != state->last_graph) { - state->pending = ggml_graph_compute_thread_active(state); - state->last_graph = new_graph; + int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed); + int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK; + if (n_graph != state->last_graph) { + state->pending = (state->ith < n_threads); + state->last_graph = n_graph; + return true; } - return state->pending; + return false; } // sync thread state after polling @@ -2983,11 +2984,6 @@ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * st static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) { struct ggml_threadpool * threadpool = state->threadpool; - // Skip polling for unused threads - if (!ggml_graph_compute_thread_active(state)) { - return state->pending; - } - // This seems to make 0 ... 100 a decent range for polling level across modern processors. // Perhaps, we can adjust it dynamically based on load and things. const uint64_t n_rounds = 1024UL * 128 * threadpool->poll; @@ -3049,7 +3045,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { ggml_graph_compute_check_for_work(state); if (state->pending) { state->pending = false; - ggml_graph_compute_thread(state); } } @@ -3064,14 +3059,15 @@ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int ggml_mutex_lock(&threadpool->mutex); - GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads); + // Update the number of active threads and the graph count + int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS; + n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK); - // Update the number of active threads - atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed); + GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph); // Indicate the graph is ready to be processed // We need the full seq-cst fence here because of the polling threads (used in thread_sync) - atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst); + atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst); if (threadpool->pause) { // Update main thread prio and affinity to match the threadpool settings @@ -3109,8 +3105,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl( threadpool->pause = tpp->paused; threadpool->abort = -1; threadpool->workers = NULL; - threadpool->n_threads_max = tpp->n_threads; - threadpool->n_threads_cur = tpp->n_threads; + threadpool->n_threads = tpp->n_threads; threadpool->poll = tpp->poll; threadpool->prio = tpp->prio; threadpool->ec = GGML_STATUS_SUCCESS; @@ -3205,7 +3200,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl { // update the number of threads from the actual number of threads that we got from OpenMP n_threads = omp_get_num_threads(); - atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed); + atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed); } // Apply thread CPU mask and priority @@ -3218,13 +3213,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl ggml_graph_compute_thread(&threadpool->workers[ith]); } } else { - atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed); + atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed); ggml_graph_compute_thread(&threadpool->workers[0]); } #else - if (n_threads > threadpool->n_threads_max) { - GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max); - n_threads = threadpool->n_threads_max; + if (n_threads > threadpool->n_threads) { + GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads); + n_threads = threadpool->n_threads; } // Kick all threads to start the new graph