diff --git a/common/common.cpp b/common/common.cpp index 65103c3c294d3..0c573c57f9e02 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -218,6 +218,36 @@ void gpt_params_handle_model_default(gpt_params & params) { } } +void postprocess_cpu_params(cpu_params& cpuparams, const cpu_params* role_model) { + int32_t n_set = 0; + + if (cpuparams.n_threads < 0) { + // Assuming everything about cpuparams is invalid + if (role_model != nullptr) { + cpuparams = *role_model; + } else { + cpuparams.n_threads = std::thread::hardware_concurrency(); + } + } + + for (int32_t i = 0; i < GGML_N_CORES_MAX; i++) { + if (cpuparams.cpumask[i]) { + n_set++; + } + } + + if (n_set == 0) { + // You hit the jackpot! + memset(&cpuparams.cpumask[0], 1, GGML_N_CORES_MAX); + n_set = GGML_N_CORES_MAX; + } + + if (n_set < cpuparams.n_threads) { + // Not enough set bits, may experience performance issues. + fprintf(stderr, "warn: Not enough set bits in CPU mask (%d) to satisfy requested thread count: %d\n", n_set, cpuparams.n_threads); + } +} + bool gpt_params_parse_ex(int argc, char ** argv, gpt_params & params) { bool invalid_param = false; std::string arg; @@ -237,6 +267,11 @@ bool gpt_params_parse_ex(int argc, char ** argv, gpt_params & params) { } } + postprocess_cpu_params(params.cpuparams, nullptr); + postprocess_cpu_params(params.cpuparams_batch, ¶ms.cpuparams); + postprocess_cpu_params(params.draft_cpuparams, ¶ms.cpuparams); + postprocess_cpu_params(params.draft_cpuparams_batch, ¶ms.cpuparams_batch); + if (params.prompt_cache_all && (params.interactive || params.interactive_first || params.instruct)) { @@ -280,6 +315,79 @@ bool gpt_params_parse(int argc, char ** argv, gpt_params & params) { return result; } +bool parse_cpu_range(const std::string & range, bool (&boolmask)[GGML_N_CORES_MAX]) { + size_t dash_loc = range.find('-'); + if (dash_loc == std::string::npos) { + fprintf(stderr, "Format of CPU range is invalid! Expected []-[].\n"); + return false; + } + + size_t start_i; + size_t end_i; + + if (dash_loc == 0) { + start_i = 0; + } else { + start_i = std::stoull(range.substr(0, dash_loc)); + if (start_i >= GGML_N_CORES_MAX) { + fprintf(stderr, "Start index out of bounds!\n"); + return false; + } + } + + if (dash_loc == range.length() - 1) { + end_i = GGML_N_CORES_MAX - 1; + } else { + end_i = std::stoull(range.substr(dash_loc + 1)); + if (end_i >= GGML_N_CORES_MAX) { + fprintf(stderr, "End index out of bounds!\n"); + return false; + } + } + + for (size_t i = start_i; i <= end_i; i++) { + boolmask[i] = true; + } + + return true; +} + +bool parse_cpu_mask(const std::string & mask, bool (&boolmask)[GGML_N_CORES_MAX]) { + // Discard potential 0x prefix + size_t start_i = 0; + if (mask.length() >= 2 && mask.substr(0, 2) == "0x") { + start_i = 2; + } + + size_t num_digits = mask.length() - start_i; + if (num_digits > 128) num_digits = 128; + + size_t end_i = num_digits + start_i; + + for (size_t i = start_i, n = (num_digits*4 - 1); i < end_i; i++, n-=4) { + char c = mask.at(i); + int8_t id = c; + + if ((c >= '0' && c <= '9')) { + id -= '0'; + } else if (c >= 'a' && c <= 'f') { + id -= 'a' - 10; + } else if (c >= 'A' && c <= 'F') { + id -= 'A' - 10; + } else { + fprintf(stderr, "Invalid hex character '%c' at position %d\n", c, int32_t(i)); + return false; + } + + boolmask[ n ] = boolmask[ n ] || ((id & 8) != 0); + boolmask[n - 1] = boolmask[n - 1] || ((id & 4) != 0); + boolmask[n - 2] = boolmask[n - 2] || ((id & 2) != 0); + boolmask[n - 3] = boolmask[n - 3] || ((id & 1) != 0); + } + + return true; +} + bool gpt_params_find_arg(int argc, char ** argv, const std::string & arg, gpt_params & params, int & i, bool & invalid_param) { llama_sampling_params & sparams = params.sparams; @@ -298,43 +406,187 @@ bool gpt_params_find_arg(int argc, char ** argv, const std::string & arg, gpt_pa invalid_param = true; return true; } - params.n_threads = std::stoi(argv[i]); - if (params.n_threads <= 0) { - params.n_threads = std::thread::hardware_concurrency(); + params.cpuparams.n_threads = std::stoi(argv[i]); + if (params.cpuparams.n_threads <= 0) { + params.cpuparams.n_threads = std::thread::hardware_concurrency(); } return true; } + if (arg == "-C" || arg == "--cpu-mask") { + if (++i >= argc) { + invalid_param = true; + return true; + } + std::string mask = argv[i]; + params.cpuparams.mask_valid = true; + invalid_param = !parse_cpu_mask(mask, params.cpuparams.cpumask); + return true; + } + if (arg == "-Cr" || arg == "--cpu-range") { + if (++i >= argc) { + invalid_param = true; + return true; + } + std::string range = argv[i]; + params.cpuparams.mask_valid = true; + invalid_param = !parse_cpu_range(range, params.cpuparams.cpumask); + return true; + } + if (arg == "--prio") { + if (++i >= argc) { + invalid_param = true; + return true; + } + params.cpuparams.priority = std::stoul(argv[i]); + return true; + } + if (arg == "--cpu-strict") { + params.cpuparams.strict_cpu = true; + return true; + } + if (arg == "--poll") { + params.cpuparams.poll = true; + return true; + } if (arg == "-tb" || arg == "--threads-batch") { if (++i >= argc) { invalid_param = true; return true; } - params.n_threads_batch = std::stoi(argv[i]); - if (params.n_threads_batch <= 0) { - params.n_threads_batch = std::thread::hardware_concurrency(); + params.cpuparams_batch.n_threads = std::stoi(argv[i]); + if (params.cpuparams_batch.n_threads <= 0) { + params.cpuparams_batch.n_threads = std::thread::hardware_concurrency(); } return true; } + if (arg == "-Cb" || arg == "--cpu-mask-batch") { + if (++i >= argc) { + invalid_param = true; + return true; + } + std::string mask = argv[i]; + params.cpuparams_batch.mask_valid = true; + invalid_param = !parse_cpu_mask(mask, params.cpuparams_batch.cpumask); + return true; + } + if (arg == "-Crb" || arg == "--cpu-range_batch") { + if (++i >= argc) { + invalid_param = true; + return true; + } + std::string range = argv[i]; + params.cpuparams_batch.mask_valid = true; + invalid_param = !parse_cpu_range(range, params.cpuparams_batch.cpumask); + return true; + } + if (arg == "--prio-batch") { + if (++i >= argc) { + invalid_param = true; + return true; + } + params.cpuparams_batch.priority = std::stoul(argv[i]); + return true; + } + if (arg == "--cpu-strict-batch") { + params.cpuparams_batch.strict_cpu = true; + return true; + } + if (arg == "--poll-batch") { + params.cpuparams_batch.poll = true; + return true; + } if (arg == "-td" || arg == "--threads-draft") { if (++i >= argc) { invalid_param = true; return true; } - params.n_threads_draft = std::stoi(argv[i]); - if (params.n_threads_draft <= 0) { - params.n_threads_draft = std::thread::hardware_concurrency(); + params.draft_cpuparams.n_threads = std::stoi(argv[i]); + if (params.draft_cpuparams.n_threads <= 0) { + params.draft_cpuparams.n_threads = std::thread::hardware_concurrency(); + } + return true; + } + if (arg == "-Cd" || arg == "--cpu-mask-draft") { + if (++i >= argc) { + invalid_param = true; + return true; + } + std::string mask = argv[i]; + params.draft_cpuparams.mask_valid = true; + invalid_param = !parse_cpu_mask(mask, params.draft_cpuparams.cpumask); + return true; + } + if (arg == "-Crd" || arg == "--cpu-range-draft") { + if (++i >= argc) { + invalid_param = true; + return true; + } + std::string range = argv[i]; + params.draft_cpuparams.mask_valid = true; + invalid_param = !parse_cpu_range(range, params.draft_cpuparams.cpumask); + return true; + } + if (arg == "--prio-draft") { + if (++i >= argc) { + invalid_param = true; + return true; + } + params.draft_cpuparams.priority = std::stoul(argv[i]); + return true; + } + if (arg == "--cpu-strict-draft") { + params.draft_cpuparams.strict_cpu = true; + return true; + } + if (arg == "--poll-draft") { + params.draft_cpuparams.poll = true; + return true; + } + if (arg == "-tdb" || arg == "--threads-draft-batch") { + if (++i >= argc) { + invalid_param = true; + return true; + } + params.draft_cpuparams_batch.n_threads = std::stoi(argv[i]); + if (params.draft_cpuparams_batch.n_threads <= 0) { + params.draft_cpuparams_batch.n_threads = std::thread::hardware_concurrency(); } return true; } - if (arg == "-tbd" || arg == "--threads-batch-draft") { + if (arg == "-Cdb" || arg == "--cpu-mask-draft-batch") { if (++i >= argc) { invalid_param = true; return true; } - params.n_threads_batch_draft = std::stoi(argv[i]); - if (params.n_threads_batch_draft <= 0) { - params.n_threads_batch_draft = std::thread::hardware_concurrency(); + std::string mask = argv[i]; + params.draft_cpuparams_batch.mask_valid = true; + invalid_param = !parse_cpu_mask(mask, params.draft_cpuparams_batch.cpumask); + return true; + } + if (arg == "-Crdb" || arg == "--cpu-range-draft-batch") { + if (++i >= argc) { + invalid_param = true; + return true; } + std::string range = argv[i]; + params.draft_cpuparams_batch.mask_valid = true; + invalid_param = !parse_cpu_range(range, params.draft_cpuparams_batch.cpumask); + return true; + } + if (arg == "--prio-draft-batch") { + if (++i >= argc) { + invalid_param = true; + return true; + } + params.draft_cpuparams_batch.priority = std::stoul(argv[i]); + return true; + } + if (arg == "--cpu-strict-draft-batch") { + params.draft_cpuparams_batch.strict_cpu = true; + return true; + } + if (arg == "--poll-draft_batch") { + params.draft_cpuparams_batch.poll = true; return true; } if (arg == "-p" || arg == "--prompt") { @@ -1378,13 +1630,41 @@ void gpt_params_print_usage(int /*argc*/, char ** argv, const gpt_params & param printf(" (can be specified more than once for multiple prompts).\n"); printf(" --color colorise output to distinguish prompt and user input from generations\n"); printf(" -s SEED, --seed SEED RNG seed (default: -1, use random seed for < 0)\n"); - printf(" -t N, --threads N number of threads to use during generation (default: %d)\n", params.n_threads); + printf(" -t N, --threads N number of threads to use during generation (default: %d)\n", params.cpuparams.n_threads); printf(" -tb N, --threads-batch N\n"); printf(" number of threads to use during batch and prompt processing (default: same as --threads)\n"); + printf(" -C M, --cpu-mask M CPU affinity mask: arbitrarily long hex. Takes precedence over cpu-range (default: \"\")\n"); + printf(" -Cr --cpu-range lo-hi Ranges of CPUs for affinity (alternative to --cpu-mask)\n"); + printf(" -Cb M, --cpu-mask-batch M\n"); + printf(" CPU affinity mask: arbitrarily long hex. Takes precedence over cpu-range (default: \"\")\n"); + printf(" -Crb --cpu-range-batch lo-hi\n"); + printf(" Ranges of CPUs for affinity (alternative to --cpu-mask)\n"); + printf(" --cpu-strict Use strict CPU placement (default: %u)\n", (unsigned) params.cpuparams.strict_cpu); + printf(" --cpu-strict-batch Use strict CPU placement (default: %u)\n", (unsigned) params.cpuparams.strict_cpu); + printf(" --priority N Set process/thread priority : 0-normal, 1-medium, 2-high, 3-realtime (default: %d)\n", params.cpuparams.priority); + printf(" --priority-batch N Set process/thread priority : 0-normal, 1-medium, 2-high, 3-realtime (default: %d)\n", params.cpuparams.priority); + printf(" --poll Use polling to wait for work (default: %u)\n", (unsigned) params.cpuparams.poll); + printf(" --poll-batch Use polling to wait for work (default: %u)\n", (unsigned) params.cpuparams.poll); printf(" -td N, --threads-draft N"); - printf(" number of threads to use during generation (default: same as --threads)\n"); - printf(" -tbd N, --threads-batch-draft N\n"); - printf(" number of threads to use during batch and prompt processing (default: same as --threads-draft)\n"); + printf(" number of threads to use during generation for draft model (default: same as --threads)\n"); + printf(" -tdb N, --threads-draft-batch N\n"); + printf(" number of threads to use during batch and prompt processing for draft model (default: same as --threads-draft)\n"); + printf(" -Cd M, --cpu-mask-draft M\n"); + printf(" Draft model CPU affinity mask. Takes precedence over cpu-range-draft (default: \"\")\n"); + printf(" -Crd --cpu-range-draft lo-hi\n"); + printf(" Ranges of CPUs for affinity (alternative to --cpu-mask-draft)\n"); + printf(" -Cdb M, --cpu-mask-draft-batch M\n"); + printf(" Draft model CPU affinity mask. Takes precedence over cpu-range-draft (default: \"\")\n"); + printf(" -Crdb --cpu-range-draft-batch lo-hi\n"); + printf(" Ranges of CPUs for affinity (alternative to --cpu-mask-draft)\n"); + printf(" --cpu-strict-draft Use strict CPU placement for draft model (default: %u)\n", (unsigned) params.draft_cpuparams.strict_cpu); + printf(" --cpu-strict-draft-batch\n"); + printf(" Use strict CPU placement for draft model (default: %u)\n", (unsigned) params.draft_cpuparams.strict_cpu); + printf(" --priority-draft N Set draft process/thread priority : 0-normal, 1-medium, 2-high, 3-realtime (default: %d)\n", params.draft_cpuparams.priority); + printf(" --priority-draft-batch N\n"); + printf(" Set draft process/thread priority : 0-normal, 1-medium, 2-high, 3-realtime (default: %d)\n", params.draft_cpuparams.priority); + printf(" --poll-draft Use polling to wait for draft model work (default: %u)\n", (unsigned) params.draft_cpuparams.poll); + printf(" --poll-draft-batch Use polling to wait for draft model work (default: %u)\n", (unsigned) params.draft_cpuparams.poll); printf(" -p PROMPT, --prompt PROMPT\n"); printf(" prompt to start generation with (default: empty)\n"); printf(" -e, --escape process prompt escapes sequences (\\n, \\r, \\t, \\', \\\", \\\\)\n"); @@ -1556,9 +1836,9 @@ void gpt_params_print_usage(int /*argc*/, char ** argv, const gpt_params & param std::string gpt_params_get_system_info(const gpt_params & params) { std::ostringstream os; - os << "system_info: n_threads = " << params.n_threads; - if (params.n_threads_batch != -1) { - os << " (n_threads_batch = " << params.n_threads_batch << ")"; + os << "system_info: n_threads = " << params.cpuparams.n_threads; + if (params.cpuparams_batch.n_threads != -1) { + os << " (n_threads_batch = " << params.cpuparams_batch.n_threads << ")"; } os << " / " << std::thread::hardware_concurrency() << " | " << llama_print_system_info(); @@ -1952,7 +2232,7 @@ std::tuple llama_init_from_gpt_par ((i > 0) || params.lora_base.empty()) ? NULL : params.lora_base.c_str(), - params.n_threads); + params.cpuparams.n_threads); if (err != 0) { fprintf(stderr, "%s: error: failed to apply lora adapter\n", __func__); llama_free(lctx); @@ -2037,8 +2317,9 @@ struct llama_context_params llama_context_params_from_gpt_params(const gpt_param cparams.n_seq_max = params.n_parallel; cparams.n_batch = params.n_batch; cparams.n_ubatch = params.n_ubatch; - cparams.n_threads = params.n_threads; - cparams.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch; + cparams.n_threads = params.cpuparams.n_threads; + cparams.n_threads_batch = params.cpuparams_batch.n_threads == -1 ? + params.cpuparams.n_threads : params.cpuparams_batch.n_threads; cparams.seed = params.seed; cparams.logits_all = params.logits_all; cparams.embeddings = params.embedding; @@ -2063,6 +2344,22 @@ struct llama_context_params llama_context_params_from_gpt_params(const gpt_param return cparams; } +struct ggml_threadpool_params ggml_threadpool_params_from_cpu_params(const cpu_params & params) { + struct ggml_threadpool_params tpp; + + tpp.mask_specified = params.mask_valid; + if (params.mask_valid) { + std::memcpy(&tpp.cpumask, ¶ms.cpumask, GGML_N_CORES_MAX); + } + + tpp.n_threads = params.n_threads; + tpp.prio = params.priority; + tpp.poll = params.poll; + tpp.strict_cpu = params.strict_cpu; + + return tpp; +} + #ifdef LLAMA_USE_CURL static bool starts_with(const std::string & str, const std::string & prefix) { @@ -2981,7 +3278,7 @@ void yaml_dump_non_result_info(FILE * stream, const gpt_params & params, const l yaml_dump_vector_float(stream, "tensor_split", tensor_split_vector); fprintf(stream, "tfs: %f # default: 1.0\n", sparams.tfs_z); - fprintf(stream, "threads: %d # default: %u\n", params.n_threads, std::thread::hardware_concurrency()); + fprintf(stream, "threads: %d # default: %u\n", params.cpuparams.n_threads, std::thread::hardware_concurrency()); fprintf(stream, "top_k: %d # default: 40\n", sparams.top_k); fprintf(stream, "top_p: %f # default: 0.95\n", sparams.top_p); fprintf(stream, "min_p: %f # default: 0.0\n", sparams.min_p); diff --git a/common/common.h b/common/common.h index 264504830a7f0..079fae03300e4 100644 --- a/common/common.h +++ b/common/common.h @@ -52,13 +52,18 @@ int32_t cpu_get_num_math(); // CLI argument parsing // +struct cpu_params { + int32_t n_threads = -1; + bool cpumask[GGML_N_CORES_MAX] = {false}; // CPU affinity mask. + bool mask_valid = false; // Default: any CPU + int32_t priority = 0; // Scheduling prio : (0 - normal, 1 - medium, 2 - high, 3 - realtime) + bool strict_cpu = false; // Use strict CPU placement + bool poll = false; // Use polling (busywait) to wait for work +}; + struct gpt_params { uint32_t seed = LLAMA_DEFAULT_SEED; // RNG seed - int32_t n_threads = cpu_get_num_math(); - int32_t n_threads_draft = -1; - int32_t n_threads_batch = -1; // number of threads to use for batch processing (-1 = use n_threads) - int32_t n_threads_batch_draft = -1; int32_t n_predict = -1; // new tokens to predict int32_t n_ctx = 512; // context size int32_t n_batch = 2048; // logical batch size for prompt processing (must be >=32 to use BLAS) @@ -91,6 +96,11 @@ struct gpt_params { ggml_backend_sched_eval_callback cb_eval = nullptr; void * cb_eval_user_data = nullptr; + struct cpu_params cpuparams; + struct cpu_params cpuparams_batch; + struct cpu_params draft_cpuparams; + struct cpu_params draft_cpuparams_batch; + ggml_numa_strategy numa = GGML_NUMA_STRATEGY_DISABLED; enum llama_rope_scaling_type rope_scaling_type = LLAMA_ROPE_SCALING_TYPE_UNSPECIFIED; @@ -189,6 +199,10 @@ bool gpt_params_parse (int argc, char ** argv, gpt_params & params); bool gpt_params_find_arg (int argc, char ** argv, const std::string & arg, gpt_params & params, int & i, bool & invalid_param); void gpt_params_print_usage(int argc, char ** argv, const gpt_params & params); +bool parse_cpu_range(const std::string& range, bool(&boolmask)[GGML_N_CORES_MAX]); +bool parse_cpu_mask(const std::string& mask, bool(&boolmask)[GGML_N_CORES_MAX]); +void postprocess_cpu_params(cpu_params& cpuparams, const cpu_params* role_model = nullptr); + std::string gpt_params_get_system_info(const gpt_params & params); // @@ -220,8 +234,9 @@ std::string fs_get_cache_directory(); // TODO: avoid tuplue, use struct std::tuple llama_init_from_gpt_params(gpt_params & params); -struct llama_model_params llama_model_params_from_gpt_params (const gpt_params & params); -struct llama_context_params llama_context_params_from_gpt_params(const gpt_params & params); +struct llama_model_params llama_model_params_from_gpt_params (const gpt_params & params); +struct llama_context_params llama_context_params_from_gpt_params (const gpt_params & params); +struct ggml_threadpool_params ggml_threadpool_params_from_cpu_params(const cpu_params & params); struct llama_model * llama_load_model_from_url(const char * model_url, const char * path_model, const struct llama_model_params & params); struct llama_model * llama_load_model_from_hf(const char * repo, const char * file, const char * path_model, const struct llama_model_params & params); diff --git a/examples/baby-llama/baby-llama.cpp b/examples/baby-llama/baby-llama.cpp index bf0125e753746..5b233e352a95a 100644 --- a/examples/baby-llama/baby-llama.cpp +++ b/examples/baby-llama/baby-llama.cpp @@ -19,7 +19,7 @@ constexpr float rms_norm_eps = 5e-6f; #endif static void ggml_graph_compute_helper(std::vector & buf, ggml_cgraph * graph, int n_threads) { - struct ggml_cplan plan = ggml_graph_plan(graph, n_threads); + struct ggml_cplan plan = ggml_graph_plan(graph, n_threads, nullptr); if (plan.work_size > 0) { buf.resize(plan.work_size); diff --git a/examples/batched-bench/batched-bench.cpp b/examples/batched-bench/batched-bench.cpp index 2924d8116f44f..bd4a4c4ade40a 100644 --- a/examples/batched-bench/batched-bench.cpp +++ b/examples/batched-bench/batched-bench.cpp @@ -119,8 +119,9 @@ int main(int argc, char ** argv) { ctx_params.n_ubatch = n_ubatch; ctx_params.flash_attn = flash_attn; - ctx_params.n_threads = params.n_threads; - ctx_params.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch; + ctx_params.n_threads = params.cpuparams.n_threads; + ctx_params.n_threads_batch = params.cpuparams_batch.n_threads == -1 ? + params.cpuparams.n_threads : params.cpuparams_batch.n_threads; // ensure enough sequences are available ctx_params.n_seq_max = *std::max_element(n_pl.begin(), n_pl.end()); diff --git a/examples/batched/batched.cpp b/examples/batched/batched.cpp index 591bc6e57645c..55f11c522c764 100644 --- a/examples/batched/batched.cpp +++ b/examples/batched/batched.cpp @@ -83,8 +83,9 @@ int main(int argc, char ** argv) { ctx_params.n_ctx = n_kv_req; ctx_params.n_batch = std::max(n_len, n_parallel); ctx_params.n_seq_max = n_parallel; - ctx_params.n_threads = params.n_threads; - ctx_params.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch; + ctx_params.n_threads = params.cpuparams.n_threads; + ctx_params.n_threads_batch = params.cpuparams_batch.n_threads == -1 ? + params.cpuparams.n_threads : params.cpuparams_batch.n_threads; llama_context * ctx = llama_new_context_with_model(model, ctx_params); diff --git a/examples/benchmark/benchmark-matmult.cpp b/examples/benchmark/benchmark-matmult.cpp index 47cb16c69d536..e78f6b388ef6e 100644 --- a/examples/benchmark/benchmark-matmult.cpp +++ b/examples/benchmark/benchmark-matmult.cpp @@ -21,7 +21,7 @@ #endif static void ggml_graph_compute_helper(std::vector & buf, ggml_cgraph * graph, int n_threads) { - struct ggml_cplan plan = ggml_graph_plan(graph, n_threads); + struct ggml_cplan plan = ggml_graph_plan(graph, n_threads, nullptr); if (plan.work_size > 0) { buf.resize(plan.work_size); diff --git a/examples/export-lora/export-lora.cpp b/examples/export-lora/export-lora.cpp index 08413f57e4c3a..0cf05a085472d 100644 --- a/examples/export-lora/export-lora.cpp +++ b/examples/export-lora/export-lora.cpp @@ -344,7 +344,7 @@ static bool apply_lora(struct ggml_tensor * tensor, struct lora_data * lora, int ggml_gallocr_alloc_graph(alloc, gf); - struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads); + struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, nullptr); static std::vector data_work; data_work.resize(cplan.work_size); cplan.work_data = data_work.data(); diff --git a/examples/finetune/finetune.cpp b/examples/finetune/finetune.cpp index 22425730f20eb..047868a8bbe68 100644 --- a/examples/finetune/finetune.cpp +++ b/examples/finetune/finetune.cpp @@ -1818,7 +1818,7 @@ int main(int argc, char ** argv) { opt_cb_data.millis_per_iter = 0.0; // measure required memory for work buffer - size_t max_work_size = ggml_graph_plan(gb, params.common.n_threads).work_size + GGML_OBJECT_SIZE; + size_t max_work_size = ggml_graph_plan(gb, params.common.n_threads, nullptr).work_size + GGML_OBJECT_SIZE; printf("%s: work_size = %zu bytes (%.1f MB)\n", __func__, max_work_size, (float) max_work_size / (1024.0f*1024.0f)); // context for work buffer diff --git a/examples/llama-bench/llama-bench.cpp b/examples/llama-bench/llama-bench.cpp index c008904476d3e..460dc5e759347 100644 --- a/examples/llama-bench/llama-bench.cpp +++ b/examples/llama-bench/llama-bench.cpp @@ -187,10 +187,20 @@ struct cmd_params { std::vector use_mmap; std::vector embeddings; ggml_numa_strategy numa; + cpu_params cpuparams; int reps; bool verbose; output_formats output_format; }; +// +//static const cpu_params default_cpuparams( +// int32_t(std::thread::hardware_concurrency()), +// {false}, +// false, +// 1, +// false, +// false +//); static const cmd_params cmd_params_defaults = { /* model */ {"models/7B/ggml-model-q4_0.gguf"}, @@ -212,6 +222,7 @@ static const cmd_params cmd_params_defaults = { /* use_mmap */ {true}, /* embeddings */ {false}, /* numa */ GGML_NUMA_STRATEGY_DISABLED, + /* cpuparams */ {}, /* reps */ 5, /* verbose */ false, /* output_format */ MARKDOWN @@ -239,6 +250,11 @@ static void print_usage(int /* argc */, char ** argv) { printf(" -fa, --flash-attn <0|1> (default: %s)\n", join(cmd_params_defaults.flash_attn, ",").c_str()); printf(" -mmp, --mmap <0|1> (default: %s)\n", join(cmd_params_defaults.use_mmap, ",").c_str()); printf(" --numa (default: disabled)\n"); + printf(" -mt, --max-threads (default: %d)\n", cmd_params_defaults.cpuparams.n_threads); + printf(" -C, --cpu-mask (default: 0x0)\n"); + printf(" --cpu-strict <0|1> (default: %d)\n", cmd_params_defaults.cpuparams.strict_cpu); + printf(" --priority <0|1|2|3> (default: %d)\n", cmd_params_defaults.cpuparams.priority); + printf(" --poll <0|1> (default: %d)\n", cmd_params_defaults.cpuparams.poll); printf(" -embd, --embeddings <0|1> (default: %s)\n", join(cmd_params_defaults.embeddings, ",").c_str()); printf(" -ts, --tensor-split (default: 0)\n"); printf(" -r, --repetitions (default: %d)\n", cmd_params_defaults.reps); @@ -275,7 +291,7 @@ static ggml_type ggml_type_from_name(const std::string & s) { } -static cmd_params parse_cmd_params(int argc, char ** argv) { +static cmd_params parse_cmd_params(int argc, char** argv) { cmd_params params; std::string arg; bool invalid_param = false; @@ -326,7 +342,7 @@ static cmd_params parse_cmd_params(int argc, char ** argv) { invalid_param = true; break; } - params.n_pg.push_back({std::stoi(p[0]), std::stoi(p[1])}); + params.n_pg.push_back({ std::stoi(p[0]), std::stoi(p[1]) }); } else if (arg == "-b" || arg == "--batch-size") { if (++i >= argc) { invalid_param = true; @@ -348,7 +364,7 @@ static cmd_params parse_cmd_params(int argc, char ** argv) { } auto p = split(argv[i], split_delim); std::vector types; - for (const auto & t : p) { + for (const auto& t : p) { ggml_type gt = ggml_type_from_name(t); if (gt == GGML_TYPE_COUNT) { invalid_param = true; @@ -364,7 +380,7 @@ static cmd_params parse_cmd_params(int argc, char ** argv) { } auto p = split(argv[i], split_delim); std::vector types; - for (const auto & t : p) { + for (const auto& t : p) { ggml_type gt = ggml_type_from_name(t); if (gt == GGML_TYPE_COUNT) { invalid_param = true; @@ -400,7 +416,7 @@ static cmd_params parse_cmd_params(int argc, char ** argv) { } auto p = split(argv[i], split_delim); std::vector modes; - for (const auto & m : p) { + for (const auto& m : p) { llama_split_mode mode; if (m == "none") { mode = LLAMA_SPLIT_MODE_NONE; @@ -434,11 +450,36 @@ static cmd_params parse_cmd_params(int argc, char ** argv) { break; } else { std::string value(argv[i]); - /**/ if (value == "distribute" || value == "" ) { params.numa = GGML_NUMA_STRATEGY_DISTRIBUTE; } - else if (value == "isolate") { params.numa = GGML_NUMA_STRATEGY_ISOLATE; } - else if (value == "numactl") { params.numa = GGML_NUMA_STRATEGY_NUMACTL; } + /**/ if (value == "distribute" || value == "") { params.numa = GGML_NUMA_STRATEGY_DISTRIBUTE; } + else if (value == "isolate") { params.numa = GGML_NUMA_STRATEGY_ISOLATE; } + else if (value == "numactl") { params.numa = GGML_NUMA_STRATEGY_NUMACTL; } else { invalid_param = true; break; } } + + } else if (arg == "-mt" || arg == "--max-threads") { + if (++i >= argc) { + invalid_param = true; + break; + } + params.cpuparams.n_threads = std::stoi(argv[i]); + } else if (arg == "-C" || arg == "--cpu-mask") { + if (++i >= argc) { + invalid_param = true; + break; + } + std::string mask = argv[i]; + params.cpuparams.mask_valid = true; + invalid_param = !parse_cpu_mask(mask, params.cpuparams.cpumask); + } else if (arg == "--prio") { + if (++i >= argc) { + invalid_param = true; + break; + } + params.cpuparams.priority = std::stoul(argv[i]); + } else if (arg == "--cpu-strict") { + params.cpuparams.strict_cpu = true; + } else if (arg == "--poll") { + params.cpuparams.poll = true; } else if (arg == "-fa" || arg == "--flash-attn") { if (++i >= argc) { invalid_param = true; @@ -1234,8 +1275,7 @@ struct sql_printer : public printer { } }; -static void test_prompt(llama_context * ctx, int n_prompt, int n_past, int n_batch, int n_threads) { - llama_set_n_threads(ctx, n_threads, n_threads); +static void test_prompt(llama_context * ctx, int n_prompt, int n_past, int n_batch) { const llama_model * model = llama_get_model(ctx); const int32_t n_vocab = llama_n_vocab(model); @@ -1257,9 +1297,7 @@ static void test_prompt(llama_context * ctx, int n_prompt, int n_past, int n_bat llama_synchronize(ctx); } -static void test_gen(llama_context * ctx, int n_gen, int n_past, int n_threads) { - llama_set_n_threads(ctx, n_threads, n_threads); - +static void test_gen(llama_context * ctx, int n_gen, int n_past) { const llama_model * model = llama_get_model(ctx); const int32_t n_vocab = llama_n_vocab(model); @@ -1330,6 +1368,23 @@ int main(int argc, char ** argv) { llama_model * lmodel = nullptr; const cmd_params_instance * prev_inst = nullptr; + postprocess_cpu_params(params.cpuparams); + + struct ggml_threadpool_params tpp; + tpp.n_threads = params.cpuparams.n_threads; + tpp.mask_specified = params.cpuparams.mask_valid; + tpp.strict_cpu = params.cpuparams.strict_cpu; + tpp.prio = params.cpuparams.priority; + tpp.poll = params.cpuparams.poll; + + std::memcpy(&tpp.cpumask[0], ¶ms.cpuparams.cpumask[0], GGML_N_CORES_MAX); + + struct ggml_compute_threadpool* threadpool = ggml_create_threadpool(&tpp); + if (!threadpool) { + LOG_TEE("%s: threadpool create failed : n_threads %d\n", __func__, tpp.n_threads); + exit(1); + } + for (const auto & inst : params_instances) { // keep the same model between tests when possible if (!lmodel || !prev_inst || !inst.equal_mparams(*prev_inst)) { @@ -1356,13 +1411,16 @@ int main(int argc, char ** argv) { llama_kv_cache_clear(ctx); + llama_set_n_threads(ctx, t.n_threads, t.n_threads); + llama_attach_threadpool(ctx, threadpool); + // warmup run if (t.n_prompt > 0) { - //test_prompt(ctx, std::min(t.n_batch, std::min(t.n_prompt, 32)), 0, t.n_batch, t.n_threads); - test_prompt(ctx, t.n_prompt, 0, t.n_batch, t.n_threads); + //test_prompt(ctx, std::min(t.n_batch, std::min(t.n_prompt, 32)), 0, t.n_batch); + test_prompt(ctx, t.n_prompt, 0, t.n_batch); } if (t.n_gen > 0) { - test_gen(ctx, 1, 0, t.n_threads); + test_gen(ctx, 1, 0); } for (int i = 0; i < params.reps; i++) { @@ -1371,10 +1429,10 @@ int main(int argc, char ** argv) { uint64_t t_start = get_time_ns(); if (t.n_prompt > 0) { - test_prompt(ctx, t.n_prompt, 0, t.n_batch, t.n_threads); + test_prompt(ctx, t.n_prompt, 0, t.n_batch); } if (t.n_gen > 0) { - test_gen(ctx, t.n_gen, t.n_prompt, t.n_threads); + test_gen(ctx, t.n_gen, t.n_prompt); } uint64_t t_ns = get_time_ns() - t_start; @@ -1386,7 +1444,9 @@ int main(int argc, char ** argv) { llama_print_timings(ctx); llama_free(ctx); + } + ggml_release_threadpool(threadpool); llama_free_model(lmodel); diff --git a/examples/llava/llava-cli.cpp b/examples/llava/llava-cli.cpp index c974900f21e20..4f90667baaefc 100644 --- a/examples/llava/llava-cli.cpp +++ b/examples/llava/llava-cli.cpp @@ -126,14 +126,14 @@ static struct llava_image_embed * load_image(llava_context * ctx_llava, gpt_para if (!params->image.empty()) { LOG_TEE("using base64 encoded image instead of command line image path\n"); } - embed = llava_image_embed_make_with_prompt_base64(ctx_llava->ctx_clip, params->n_threads, prompt); + embed = llava_image_embed_make_with_prompt_base64(ctx_llava->ctx_clip, params->cpuparams.n_threads, prompt); if (!embed) { LOG_TEE("%s: can't load image from prompt\n", __func__); return NULL; } params->prompt = remove_image_from_prompt(prompt); } else { - embed = llava_image_embed_make_with_filename(ctx_llava->ctx_clip, params->n_threads, fname.c_str()); + embed = llava_image_embed_make_with_filename(ctx_llava->ctx_clip, params->cpuparams.n_threads, fname.c_str()); if (!embed) { fprintf(stderr, "%s: is %s really an image file?\n", __func__, fname.c_str()); return NULL; diff --git a/examples/main/main.cpp b/examples/main/main.cpp index 44949ba869e70..23415766b9b5c 100644 --- a/examples/main/main.cpp +++ b/examples/main/main.cpp @@ -202,11 +202,38 @@ int main(int argc, char ** argv) { ctx_guidance = llama_new_context_with_model(model, lparams); } + LOG("%s: llama threadpool init = n_threads = %d\n", + __func__, + (int32_t) params.cpuparams.n_threads + ); + struct ggml_threadpool_params tpp_batch = + ggml_threadpool_params_from_cpu_params(params.cpuparams_batch); + struct ggml_threadpool_params tpp = + ggml_threadpool_params_from_cpu_params(params.cpuparams); + + struct ggml_compute_threadpool * threadpool_batch = ggml_create_threadpool(&tpp_batch); + if (!threadpool_batch) { + LOG_TEE("%s: batch threadpool create failed : n_threads %d\n", __func__, tpp_batch.n_threads); + exit(1); + } + struct ggml_compute_threadpool * threadpool = ggml_create_threadpool(&tpp); + if (!threadpool) { + LOG_TEE("%s: threadpool create failed : n_threads %d\n", __func__, tpp.n_threads); + exit(1); + } + if (model == NULL) { LOG_TEE("%s: error: unable to load model\n", __func__); return 1; } + llama_attach_batch_threadpool(ctx, threadpool_batch); + llama_attach_threadpool(ctx, threadpool); + if (ctx_guidance) { + llama_attach_batch_threadpool(ctx_guidance, threadpool_batch); + llama_attach_threadpool(ctx_guidance, threadpool); + } + const int n_ctx_train = llama_n_ctx_train(model); const int n_ctx = llama_n_ctx(ctx); LOG("n_ctx: %d\n", n_ctx); @@ -963,6 +990,9 @@ int main(int argc, char ** argv) { llama_sampling_free(ctx_sampling); llama_backend_free(); + ggml_release_threadpool(threadpool); + ggml_release_threadpool(threadpool_batch); + #ifndef LOG_DISABLE_LOGS LOG_TEE("Log end\n"); #endif // LOG_DISABLE_LOGS diff --git a/examples/passkey/passkey.cpp b/examples/passkey/passkey.cpp index f2ef9ca10d4a2..04fe7b56be536 100644 --- a/examples/passkey/passkey.cpp +++ b/examples/passkey/passkey.cpp @@ -94,8 +94,9 @@ int main(int argc, char ** argv) { ctx_params.seed = seed; ctx_params.n_ctx = llama_n_ctx_train(model)*n_grp + n_keep; ctx_params.n_batch = 512; - ctx_params.n_threads = params.n_threads; - ctx_params.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch; + ctx_params.n_threads = params.cpuparams.n_threads; + ctx_params.n_threads_batch = params.cpuparams_batch.n_threads == -1 ? + params.cpuparams.n_threads : params.cpuparams_batch.n_threads; GGML_ASSERT(ctx_params.n_batch % n_grp == 0 && "n_batch must be divisible by n_grp"); diff --git a/examples/server/server.cpp b/examples/server/server.cpp index e9904263d53c7..7bd72661fff7e 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -2329,7 +2329,7 @@ static void server_print_usage(const char * argv0, const gpt_params & params, co printf("options:\n"); printf(" -h, --help show this help message and exit\n"); printf(" -v, --verbose verbose output (default: %s)\n", server_verbose ? "enabled" : "disabled"); - printf(" -t N, --threads N number of threads to use during computation (default: %d)\n", params.n_threads); + printf(" -t N, --threads N number of threads to use during computation (default: %d)\n", params.cpuparams.n_threads); printf(" -tb N, --threads-batch N number of threads to use during batch and prompt processing (default: same as --threads)\n"); printf(" --threads-http N number of threads in the http server pool to process requests (default: max(hardware concurrency - 1, --parallel N + 2))\n"); printf(" -c N, --ctx-size N size of the prompt context (default: %d)\n", params.n_ctx); @@ -2612,7 +2612,7 @@ static void server_params_parse(int argc, char ** argv, server_params & sparams, invalid_param = true; break; } - params.n_threads = std::stoi(argv[i]); + params.cpuparams.n_threads = std::stoi(argv[i]); } else if (arg == "--grp-attn-n" || arg == "-gan") { if (++i >= argc) { invalid_param = true; @@ -2632,7 +2632,7 @@ static void server_params_parse(int argc, char ** argv, server_params & sparams, invalid_param = true; break; } - params.n_threads_batch = std::stoi(argv[i]); + params.cpuparams_batch.n_threads = std::stoi(argv[i]); } else if (arg == "--threads-http") { if (++i >= argc) { invalid_param = true; @@ -2943,8 +2943,8 @@ int main(int argc, char ** argv) { }); LOG_INFO("system info", { - {"n_threads", params.n_threads}, - {"n_threads_batch", params.n_threads_batch}, + {"n_threads", params.cpuparams.n_threads}, + {"n_threads_batch", params.cpuparams_batch.n_threads}, {"total_threads", std::thread::hardware_concurrency()}, {"system_info", llama_print_system_info()}, }); diff --git a/examples/simple/simple.cpp b/examples/simple/simple.cpp index b0f8e0fdc4987..a50a4dc3b52dc 100644 --- a/examples/simple/simple.cpp +++ b/examples/simple/simple.cpp @@ -53,8 +53,9 @@ int main(int argc, char ** argv) { ctx_params.seed = 1234; ctx_params.n_ctx = 2048; - ctx_params.n_threads = params.n_threads; - ctx_params.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch; + ctx_params.n_threads = params.cpuparams.n_threads; + ctx_params.n_threads_batch = params.cpuparams_batch.n_threads == -1 ? + params.cpuparams.n_threads : params.cpuparams_batch.n_threads; llama_context * ctx = llama_new_context_with_model(model, ctx_params); diff --git a/examples/speculative/speculative.cpp b/examples/speculative/speculative.cpp index 12e46fbc91a24..d99b67200714b 100644 --- a/examples/speculative/speculative.cpp +++ b/examples/speculative/speculative.cpp @@ -24,6 +24,14 @@ struct seq_draft { struct llama_sampling_context * ctx_sampling; }; +static void switch_active_threadpool( + ggml_compute_threadpool_t cur, + ggml_compute_threadpool_t nxt +) { + ggml_pause_threadpool(cur); + ggml_resume_threadpool(nxt); +} + int main(int argc, char ** argv) { gpt_params params; @@ -67,13 +75,19 @@ int main(int argc, char ** argv) { // load the target model std::tie(model_tgt, ctx_tgt) = llama_init_from_gpt_params(params); + ggml_threadpool_params tpp_tgt = ggml_threadpool_params_from_cpu_params(params.cpuparams); + ggml_compute_threadpool * threadpool_tgt = ggml_create_threadpool(&tpp_tgt); + if (!threadpool_tgt) { + LOG_TEE("%s: target threadpool create failed : n_threads %d\n", __func__, tpp_tgt.n_threads); + exit(1); + } + // load the draft model params.model = params.model_draft; params.n_gpu_layers = params.n_gpu_layers_draft; - if (params.n_threads_draft > 0) { - params.n_threads = params.n_threads_draft; + if (params.draft_cpuparams.n_threads > 0) { + params.cpuparams = params.draft_cpuparams; } - params.n_threads_batch = params.n_threads_batch_draft; std::tie(model_dft, ctx_dft) = llama_init_from_gpt_params(params); const bool vocab_type_tgt = llama_vocab_type(model_tgt); @@ -98,6 +112,17 @@ int main(int argc, char ** argv) { return 1; } + ggml_threadpool_params tpp_dft = ggml_threadpool_params_from_cpu_params(params.draft_cpuparams); + ggml_compute_threadpool * threadpool_dft = ggml_create_threadpool(&tpp_dft); + if (!threadpool_dft) { + LOG_TEE("%s: draft threadpool create failed : n_threads %d\n", __func__, tpp_dft.n_threads); + exit(1); + } + + llama_attach_threadpool(ctx_tgt, threadpool_tgt); + llama_attach_threadpool(ctx_dft, threadpool_dft); + ggml_pause_threadpool(threadpool_dft); + { const int n_vocab_tgt = llama_n_vocab(model_tgt); const int n_vocab_dft = llama_n_vocab(model_dft); @@ -153,6 +178,7 @@ int main(int argc, char ** argv) { // eval the prompt with both models llama_decode(ctx_tgt, llama_batch_get_one( inp.data(), n_input - 1, 0, 0)); llama_decode(ctx_tgt, llama_batch_get_one(&inp.back(), 1, n_input - 1, 0)); + switch_active_threadpool(threadpool_tgt, threadpool_dft); llama_decode(ctx_dft, llama_batch_get_one( inp.data(), n_input, 0, 0)); const auto t_enc_end = ggml_time_us(); @@ -542,6 +568,7 @@ int main(int argc, char ** argv) { // evaluate the drafted tokens on the draft model llama_decode(ctx_dft, batch_dft); + ++n_past_cur; ++n_drafted; @@ -549,6 +576,7 @@ int main(int argc, char ** argv) { break; } } + switch_active_threadpool(threadpool_dft, threadpool_tgt); // evaluate the target model on the drafted tokens { @@ -559,6 +587,8 @@ int main(int argc, char ** argv) { // LOG("target batch: %s\n", LOG_BATCH_TOSTR_PRETTY(ctx_tgt, batch_tgt).c_str()); llama_decode(ctx_tgt, batch_tgt); + switch_active_threadpool(threadpool_tgt, threadpool_dft); + ++n_past_tgt; } @@ -608,6 +638,9 @@ int main(int argc, char ** argv) { llama_backend_free(); + ggml_release_threadpool(threadpool_tgt); + ggml_release_threadpool(threadpool_dft); + fprintf(stderr, "\n\n"); return 0; diff --git a/examples/train-text-from-scratch/train-text-from-scratch.cpp b/examples/train-text-from-scratch/train-text-from-scratch.cpp index e2f85c68297b8..7994ccd7ef038 100644 --- a/examples/train-text-from-scratch/train-text-from-scratch.cpp +++ b/examples/train-text-from-scratch/train-text-from-scratch.cpp @@ -1211,7 +1211,7 @@ int main(int argc, char ** argv) { opt_cb_data.millis_per_iter = 0.0; // measure required memory for work buffer - size_t max_work_size = ggml_graph_plan(gb, params.common.n_threads).work_size + GGML_OBJECT_SIZE; + size_t max_work_size = ggml_graph_plan(gb, params.common.n_threads, nullptr).work_size + GGML_OBJECT_SIZE; printf("%s: work_size = %zu bytes (%.1f MB)\n", __func__, max_work_size, (float) max_work_size / (1024.0f*1024.0f)); // context for work buffer diff --git a/ggml-alloc.h b/ggml-alloc.h index 434c13b34a929..cd85b6ee70560 100644 --- a/ggml-alloc.h +++ b/ggml-alloc.h @@ -7,8 +7,9 @@ extern "C" { #endif typedef struct ggml_backend_buffer_type * ggml_backend_buffer_type_t; -typedef struct ggml_backend_buffer * ggml_backend_buffer_t; -typedef struct ggml_backend * ggml_backend_t; +typedef struct ggml_backend_buffer * ggml_backend_buffer_t; +typedef struct ggml_backend * ggml_backend_t; +typedef struct ggml_compute_threadpool * ggml_compute_threadpool_t; // Tensor allocator struct ggml_tallocr { diff --git a/ggml-backend-impl.h b/ggml-backend-impl.h index f121e1de420fa..950711dd5ebd5 100644 --- a/ggml-backend-impl.h +++ b/ggml-backend-impl.h @@ -97,6 +97,7 @@ extern "C" { // compute graph with a plan enum ggml_status (*GGML_CALL graph_plan_compute)(ggml_backend_t backend, ggml_backend_graph_plan_t plan); + // compute graph without a plan (async) enum ggml_status (*GGML_CALL graph_compute) (ggml_backend_t backend, struct ggml_cgraph * cgraph); diff --git a/ggml-backend.c b/ggml-backend.c index 9e35ce98d7ace..9565e55d37ea9 100644 --- a/ggml-backend.c +++ b/ggml-backend.c @@ -254,7 +254,10 @@ void ggml_backend_synchronize(ggml_backend_t backend) { backend->iface.synchronize(backend); } -ggml_backend_graph_plan_t ggml_backend_graph_plan_create(ggml_backend_t backend, struct ggml_cgraph * cgraph) { +ggml_backend_graph_plan_t ggml_backend_graph_plan_create( + ggml_backend_t backend, + const struct ggml_cgraph * cgraph +) { GGML_ASSERT(backend->iface.graph_plan_create != NULL); return backend->iface.graph_plan_create(backend, cgraph); @@ -266,19 +269,28 @@ void ggml_backend_graph_plan_free(ggml_backend_t backend, ggml_backend_graph_pla backend->iface.graph_plan_free(backend, plan); } -enum ggml_status ggml_backend_graph_plan_compute(ggml_backend_t backend, ggml_backend_graph_plan_t plan) { +enum ggml_status ggml_backend_graph_plan_compute( + ggml_backend_t backend, + ggml_backend_graph_plan_t plan +) { GGML_ASSERT(backend->iface.graph_plan_compute != NULL); return backend->iface.graph_plan_compute(backend, plan); } -enum ggml_status ggml_backend_graph_compute(ggml_backend_t backend, struct ggml_cgraph * cgraph) { +enum ggml_status ggml_backend_graph_compute( + ggml_backend_t backend, + struct ggml_cgraph * cgraph +) { enum ggml_status err = ggml_backend_graph_compute_async(backend, cgraph); ggml_backend_synchronize(backend); return err; } -enum ggml_status ggml_backend_graph_compute_async(ggml_backend_t backend, struct ggml_cgraph * cgraph) { +enum ggml_status ggml_backend_graph_compute_async( + ggml_backend_t backend, + struct ggml_cgraph * cgraph +) { return backend->iface.graph_compute(backend, cgraph); } @@ -726,7 +738,9 @@ ggml_backend_buffer_type_t ggml_backend_cpu_hbm_buffer_type(void) { #endif struct ggml_backend_cpu_context { - int n_threads; + int n_threads; + ggml_compute_threadpool_t threadpool; + void * work_data; size_t work_size; @@ -758,12 +772,15 @@ struct ggml_backend_plan_cpu { struct ggml_cgraph cgraph; }; -GGML_CALL static ggml_backend_graph_plan_t ggml_backend_cpu_graph_plan_create(ggml_backend_t backend, const struct ggml_cgraph * cgraph) { +GGML_CALL static ggml_backend_graph_plan_t ggml_backend_cpu_graph_plan_create( + ggml_backend_t backend, + const struct ggml_cgraph * cgraph +) { struct ggml_backend_cpu_context * cpu_ctx = (struct ggml_backend_cpu_context *)backend->context; struct ggml_backend_plan_cpu * cpu_plan = malloc(sizeof(struct ggml_backend_plan_cpu)); - cpu_plan->cplan = ggml_graph_plan(cgraph, cpu_ctx->n_threads); + cpu_plan->cplan = ggml_graph_plan(cgraph, cpu_ctx->n_threads, cpu_ctx->threadpool); cpu_plan->cgraph = *cgraph; // FIXME: deep copy if (cpu_plan->cplan.work_size > 0) { @@ -797,10 +814,13 @@ GGML_CALL static enum ggml_status ggml_backend_cpu_graph_plan_compute(ggml_backe GGML_UNUSED(backend); } -GGML_CALL static enum ggml_status ggml_backend_cpu_graph_compute(ggml_backend_t backend, struct ggml_cgraph * cgraph) { +GGML_CALL static enum ggml_status ggml_backend_cpu_graph_compute( + ggml_backend_t backend, + struct ggml_cgraph * cgraph +) { struct ggml_backend_cpu_context * cpu_ctx = (struct ggml_backend_cpu_context *)backend->context; - struct ggml_cplan cplan = ggml_graph_plan(cgraph, cpu_ctx->n_threads); + struct ggml_cplan cplan = ggml_graph_plan(cgraph, cpu_ctx->n_threads, cpu_ctx->threadpool); if (cpu_ctx->work_size < cplan.work_size) { free(cpu_ctx->work_data); @@ -869,6 +889,7 @@ ggml_backend_t ggml_backend_cpu_init(void) { } ctx->n_threads = GGML_DEFAULT_N_THREADS; + ctx->threadpool = NULL; ctx->work_data = NULL; ctx->work_size = 0; ctx->abort_callback = NULL; @@ -899,6 +920,13 @@ void ggml_backend_cpu_set_n_threads(ggml_backend_t backend_cpu, int n_threads) { ctx->n_threads = n_threads; } +void ggml_backend_cpu_set_threadpool(ggml_backend_t backend_cpu, ggml_compute_threadpool_t threadpool) { + GGML_ASSERT(ggml_backend_is_cpu(backend_cpu)); + + struct ggml_backend_cpu_context * ctx = (struct ggml_backend_cpu_context *)backend_cpu->context; + ctx->threadpool = threadpool; +} + void ggml_backend_cpu_set_abort_callback(ggml_backend_t backend_cpu, ggml_abort_callback abort_callback, void * abort_callback_data) { GGML_ASSERT(ggml_backend_is_cpu(backend_cpu)); @@ -1825,13 +1853,19 @@ bool ggml_backend_sched_alloc_graph(ggml_backend_sched_t sched, struct ggml_cgra return true; } -enum ggml_status ggml_backend_sched_graph_compute(ggml_backend_sched_t sched, struct ggml_cgraph * graph) { +enum ggml_status ggml_backend_sched_graph_compute( + ggml_backend_sched_t sched, + struct ggml_cgraph * graph +) { enum ggml_status err = ggml_backend_sched_graph_compute_async(sched, graph); ggml_backend_sched_synchronize(sched); return err; } -enum ggml_status ggml_backend_sched_graph_compute_async(ggml_backend_sched_t sched, struct ggml_cgraph * graph) { +enum ggml_status ggml_backend_sched_graph_compute_async( + ggml_backend_sched_t sched, + struct ggml_cgraph * graph +) { if (!sched->is_reset && !sched->is_alloc) { ggml_backend_sched_reset(sched); } diff --git a/ggml-backend.h b/ggml-backend.h index 744b6a77457d7..ba3a898cda29e 100644 --- a/ggml-backend.h +++ b/ggml-backend.h @@ -67,12 +67,21 @@ extern "C" { GGML_API void ggml_backend_synchronize(ggml_backend_t backend); - GGML_API ggml_backend_graph_plan_t ggml_backend_graph_plan_create(ggml_backend_t backend, struct ggml_cgraph * cgraph); - GGML_API void ggml_backend_graph_plan_free (ggml_backend_t backend, ggml_backend_graph_plan_t plan); - - GGML_API enum ggml_status ggml_backend_graph_plan_compute (ggml_backend_t backend, ggml_backend_graph_plan_t plan); - GGML_API enum ggml_status ggml_backend_graph_compute (ggml_backend_t backend, struct ggml_cgraph * cgraph); - GGML_API enum ggml_status ggml_backend_graph_compute_async(ggml_backend_t backend, struct ggml_cgraph * cgraph); + GGML_API ggml_backend_graph_plan_t ggml_backend_graph_plan_create( + ggml_backend_t backend, + const struct ggml_cgraph * cgraph); + + GGML_API void ggml_backend_graph_plan_free (ggml_backend_t backend, ggml_backend_graph_plan_t plan); + + GGML_API enum ggml_status ggml_backend_graph_plan_compute ( + ggml_backend_t backend, + ggml_backend_graph_plan_t plan); + GGML_API enum ggml_status ggml_backend_graph_compute( + ggml_backend_t backend, + struct ggml_cgraph * cgraph); + GGML_API enum ggml_status ggml_backend_graph_compute_async( + ggml_backend_t backend, + struct ggml_cgraph * cgraph); GGML_API bool ggml_backend_supports_op(ggml_backend_t backend, const struct ggml_tensor * op); GGML_API bool ggml_backend_offload_op(ggml_backend_t backend, const struct ggml_tensor * op); @@ -100,6 +109,7 @@ extern "C" { GGML_API GGML_CALL bool ggml_backend_is_cpu (ggml_backend_t backend); GGML_API void ggml_backend_cpu_set_n_threads (ggml_backend_t backend_cpu, int n_threads); + GGML_API void ggml_backend_cpu_set_threadpool (ggml_backend_t backend_cpu, ggml_compute_threadpool_t threadpool); GGML_API void ggml_backend_cpu_set_abort_callback(ggml_backend_t backend_cpu, ggml_abort_callback abort_callback, void * abort_callback_data); // Create a backend buffer from an existing pointer diff --git a/ggml.c b/ggml.c index f479dc3e1a8f5..264023d6ba10b 100644 --- a/ggml.c +++ b/ggml.c @@ -1749,30 +1749,104 @@ struct ggml_context_container { struct ggml_context context; }; -struct ggml_compute_state_shared { - const struct ggml_cgraph* cgraph; - const struct ggml_cplan* cplan; +// +// thread data +// - int64_t perf_node_start_cycles; - int64_t perf_node_start_time_us; +typedef pthread_t ggml_thread_t; + +#if defined(_WIN32) + +typedef CONDITION_VARIABLE ggml_cond_t; +typedef SRWLOCK ggml_mutex_t; + +#define ggml_mutex_init(m) InitializeSRWLock(m) +#define ggml_mutex_destroy(m) +#define ggml_mutex_lock(m) AcquireSRWLockExclusive(m) +#define ggml_mutex_unlock(m) ReleaseSRWLockExclusive(m) +#define ggml_mutex_lock_shared(m) AcquireSRWLockShared(m) +#define ggml_mutex_unlock_shared(m) ReleaseSRWLockShared(m) + +#define ggml_cond_init(c) InitializeConditionVariable(c) +#define ggml_cond_destroy(c) +#define ggml_cond_wait(c, m) SleepConditionVariableSRW(c, m, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED) +#define ggml_cond_broadcast(c) WakeAllConditionVariable(c) + +#define ggml_thread_create pthread_create +#define ggml_thread_join pthread_join + +#else + +typedef pthread_cond_t ggml_cond_t; +typedef pthread_mutex_t ggml_mutex_t; + +#define ggml_mutex_init(m) pthread_mutex_init(m, NULL) +#define ggml_mutex_destroy(m) pthread_mutex_destroy(m) +#define ggml_mutex_lock(m) pthread_mutex_lock(m) +#define ggml_mutex_unlock(m) pthread_mutex_unlock(m) +#define ggml_mutex_lock_shared(m) pthread_mutex_lock(m) +#define ggml_mutex_unlock_shared(m) pthread_mutex_unlock(m) + +#define ggml_lock_init(x) UNUSED(x) +#define ggml_lock_destroy(x) UNUSED(x) +#if defined(__x86_64__) || (defined(_MSC_VER) && defined(_M_AMD64)) +#define ggml_lock_lock(x) _mm_pause() +#else +#define ggml_lock_lock(x) UNUSED(x) +#endif +#define ggml_lock_unlock(x) UNUSED(x) + +#define GGML_LOCK_INITIALIZER 0 +#define ggml_cond_init(c) pthread_cond_init(c, NULL) +#define ggml_cond_destroy(c) pthread_cond_destroy(c) +#define ggml_cond_wait(c, m) pthread_cond_wait(c, m) +#define ggml_cond_broadcast(c) pthread_cond_broadcast(c) + +#define ggml_thread_create pthread_create +#define ggml_thread_join pthread_join + +#endif - const int n_threads; +// Threadpool def +struct ggml_compute_threadpool { // synchronization primitives - atomic_int n_active; // num active threads - atomic_int node_n; // active graph node - atomic_int node_task; // active graph node task phase + atomic_int n_ready; // number of ready threads (inter-graph sync) + atomic_int n_active; // number of active threads (intra-graph sync) + atomic_int node_n; // active graph node + atomic_int node_task; // active graph node task phase + volatile bool stop; // Used for stopping the threadpool altogether + volatile bool pause; // Used for pausing the threadpool or individual threads + + struct ggml_cgraph * cgraph; + struct ggml_cplan * cplan; + + struct ggml_compute_state * workers; // per thread state + int32_t n_threads_max; // number of threads in the pool + int32_t n_threads_cur; // number of threads used in the current graph + + bool poll; // Use polling (busywait) // TODO + int32_t prio; // Scheduling priority + + ggml_mutex_t mutex; // mutex for cond.var + ggml_cond_t cond; // cond.var for waiting for new work + + int64_t perf_node_start_cycles; + int64_t perf_node_start_time_us; ggml_abort_callback abort_callback; // abort ggml_graph_compute when true - void* abort_callback_data; + void * abort_callback_data; atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads. }; +// Per-thread state struct ggml_compute_state { ggml_thread_t thrd; + bool cpumask[GGML_N_CORES_MAX]; + bool mask_specified; int ith; - struct ggml_compute_state_shared* shared; + struct ggml_compute_threadpool * threadpool; enum ggml_status ec; }; @@ -12512,7 +12586,7 @@ UseGgmlGemm1:; return; } // Every thread starts at ith, so the first unprocessed chunk is nth. This save a bit of coordination right at the start. - atomic_store(&state->shared->current_chunk, nth); + atomic_store(&state->threadpool->current_chunk, nth); if (src1->type != vec_dot_type) { char * wdata = params->wdata; const size_t row_size = ggml_row_size(vec_dot_type, ne10); @@ -12634,7 +12708,7 @@ UseGgmlGemm2:; break; } - current_chunk = atomic_fetch_add(&state->shared->current_chunk, 1); + current_chunk = atomic_fetch_add(&state->threadpool->current_chunk, 1); } #ifdef GGML_PERF @@ -18919,65 +18993,6 @@ void ggml_graph_clear(struct ggml_cgraph * cgraph) { memset(cgraph->visited_hash_table.keys, 0, cgraph->visited_hash_table.size * sizeof(struct ggml_tensor *)); } -// -// thread data -// -// synchronization is done via busy loops -// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops -// - -#ifdef __APPLE__ - -//#include -// -//typedef os_unfair_lock ggml_lock_t; -// -//#define ggml_lock_init(x) UNUSED(x) -//#define ggml_lock_destroy(x) UNUSED(x) -//#define ggml_lock_lock os_unfair_lock_lock -//#define ggml_lock_unlock os_unfair_lock_unlock -// -//#define GGML_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT - -typedef int ggml_lock_t; - -#define ggml_lock_init(x) UNUSED(x) -#define ggml_lock_destroy(x) UNUSED(x) -#define ggml_lock_lock(x) UNUSED(x) -#define ggml_lock_unlock(x) UNUSED(x) - -#define GGML_LOCK_INITIALIZER 0 - -#define ggml_thread_create pthread_create -#define ggml_thread_join pthread_join - -#else - -//typedef pthread_spinlock_t ggml_lock_t; - -//#define ggml_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE) -//#define ggml_lock_destroy pthread_spin_destroy -//#define ggml_lock_lock pthread_spin_lock -//#define ggml_lock_unlock pthread_spin_unlock - -typedef int ggml_lock_t; - -#define ggml_lock_init(x) UNUSED(x) -#define ggml_lock_destroy(x) UNUSED(x) -#if defined(__x86_64__) || (defined(_MSC_VER) && defined(_M_AMD64)) -#define ggml_lock_lock(x) _mm_pause() -#else -#define ggml_lock_lock(x) UNUSED(x) -#endif -#define ggml_lock_unlock(x) UNUSED(x) - -#define GGML_LOCK_INITIALIZER 0 - -#define ggml_thread_create pthread_create -#define ggml_thread_join pthread_join - -#endif - // Android's libc implementation "bionic" does not support setting affinity #if defined(__gnu_linux__) static void set_numa_thread_affinity(int thread_n) { @@ -19052,9 +19067,10 @@ static void set_numa_thread_affinity(int thread_n) { UNUSED(thread_n); } static void clear_numa_thread_affinity(void) {} #endif -static void ggml_graph_compute_perf_stats_node(struct ggml_tensor * node, const struct ggml_compute_state_shared * st) { - int64_t cycles_cur = ggml_perf_cycles() - st->perf_node_start_cycles; - int64_t time_us_cur = ggml_perf_time_us() - st->perf_node_start_time_us; + +static void ggml_graph_compute_perf_stats_node(struct ggml_tensor * node, const struct ggml_compute_threadpool * tp) { + int64_t cycles_cur = ggml_perf_cycles() - tp->perf_node_start_cycles; + int64_t time_us_cur = ggml_perf_time_us() - tp->perf_node_start_time_us; node->perf_runs++; node->perf_cycles += cycles_cur; @@ -19308,16 +19324,346 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_ return n_tasks; } +static thread_ret_t ggml_graph_compute_secondary_thread(void* data); + +enum { + SCHED_PRIO_NORMAL, + SCHED_PRIO_MEDIUM, + SCHED_PRIO_HIGH, + SCHED_PRIO_REALTIME +}; + +#if defined(_WIN32) +#include "windows.h" + +// TODO: support > 64 CPUs +static bool __thread_affinity(bool * mask) { + HANDLE h = GetCurrentThread(); + uint64_t bitmask = 0ULL; + + assert(GGML_N_CORES_MAX >= 64); + + for (int32_t i = 0; i < 8; i++) { + int32_t idx = i * 8; + uint8_t val = 0; + val |= mask[idx + 0] << 0; + val |= mask[idx + 1] << 1; + val |= mask[idx + 2] << 2; + val |= mask[idx + 3] << 3; + val |= mask[idx + 4] << 4; + val |= mask[idx + 5] << 5; + val |= mask[idx + 6] << 6; + val |= mask[idx + 7] << 7; + bitmask |= (uint64_t)val << idx; + } + + for (int32_t i = 64; i < GGML_N_CORES_MAX; i++) { + if (mask[i]) { + fprintf(stderr, "warn: setting thread-affinity for > 64 CPUs isn't supported on windows!\n"); + break; + } + } + + DWORD_PTR m = (DWORD_PTR)bitmask; + + m = SetThreadAffinityMask(h, m); + + return m != 0; +} + +static bool __process_priority(int32_t prio) { + DWORD p = NORMAL_PRIORITY_CLASS; + + switch (prio) { + case SCHED_PRIO_NORMAL: p = NORMAL_PRIORITY_CLASS; break; + case SCHED_PRIO_MEDIUM: p = ABOVE_NORMAL_PRIORITY_CLASS; break; + case SCHED_PRIO_HIGH: p = HIGH_PRIORITY_CLASS; break; + case SCHED_PRIO_REALTIME: p = REALTIME_PRIORITY_CLASS; break; + } + + return SetPriorityClass(GetCurrentProcess(), p); +} + +static bool __thread_priority(int32_t prio) { + DWORD p = NORMAL_PRIORITY_CLASS; + + switch (prio) { + case SCHED_PRIO_NORMAL: p = THREAD_PRIORITY_NORMAL; break; + case SCHED_PRIO_MEDIUM: p = THREAD_PRIORITY_ABOVE_NORMAL; break; + case SCHED_PRIO_HIGH: p = THREAD_PRIORITY_HIGHEST; break; + case SCHED_PRIO_REALTIME: p = THREAD_PRIORITY_TIME_CRITICAL; break; + } + + return SetThreadPriority(GetCurrentThread(), p); + +} + +#elif defined(__APPLE__) +#include +#include + +static bool __thread_affinity(const bool * mask) { + UNUSED(mask); + return true; +} + +static bool __process_priority(int32_t prio) { + int32_t p = 0; + + switch (prio) { + case SCHED_PRIO_NORMAL: p = 0; break; + case SCHED_PRIO_MEDIUM: p = -5; break; + case SCHED_PRIO_HIGH: p = -10; break; + case SCHED_PRIO_REALTIME: p = -20; break; + } + + int32_t r = setpriority(PRIO_PROCESS, 0, p); + return r != -1; +} + +static bool __thread_priority(int32_t prio) { + UNUSED(prio); + return true; +} + +#else // posix? + +#ifndef __USE_GNU +#define __USE_GNU +#endif +#include + +static bool __thread_affinity(const bool * mask) { + cpu_set_t cpuset; + int32_t err; + + CPU_ZERO(&cpuset); + + for (uint32_t i = 0; i < GGML_N_CORES_MAX; i++) { + if (mask[i]) { + CPU_SET(i, &cpuset); + } + } + +#ifdef __ANDROID__ + err = sched_setaffinity(0, sizeof(cpuset), &cpuset); + if (err < 0) { + err = errno; + } +#else + err = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); +#endif + if (err != 0) { + //fprintf(stderr, "warn: failed to set affinity mask 0x%llx (err %d: %s)\n", (unsigned long long)mask, err, strerror(err)); + return false; + } + + return true; +} + +static bool __process_priority(int32_t prio) { + struct sched_param p; + int32_t policy = SCHED_OTHER; + + switch (prio) { + case SCHED_PRIO_NORMAL: policy = SCHED_OTHER; p.sched_priority = 0; break; + case SCHED_PRIO_MEDIUM: policy = SCHED_FIFO; p.sched_priority = 40; break; + case SCHED_PRIO_HIGH: policy = SCHED_FIFO; p.sched_priority = 80; break; + case SCHED_PRIO_REALTIME: policy = SCHED_FIFO; p.sched_priority = 90; break; + } + + int32_t err = sched_setscheduler(0, policy, &p); + if (err != 0) { + //fprintf(stderr, "warn: failed to set process priority %d (err %d)\n", prio, err); + return false; + } + + return true; +} + +static bool __thread_priority(int32_t prio) { + struct sched_param p; + int32_t policy = SCHED_OTHER; + switch (prio) { + case SCHED_PRIO_NORMAL: policy = SCHED_OTHER; p.sched_priority = 0; break; + case SCHED_PRIO_MEDIUM: policy = SCHED_FIFO; p.sched_priority = 40; break; + case SCHED_PRIO_HIGH: policy = SCHED_FIFO; p.sched_priority = 80; break; + case SCHED_PRIO_REALTIME: policy = SCHED_FIFO; p.sched_priority = 90; break; + } + + int32_t err = pthread_setschedparam(pthread_self(), policy, &p); + if (err != 0) { + //fprintf(stderr, "warn: failed to set thread priority %d (err %d)\n", prio, err); + return false; + } + + return true; +} + +#endif + +#if defined(__aarch64__) && ( defined(__clang__) || defined(__GNUC__) ) +static inline void __cpu_relax(void) { + __asm__ volatile("yield" ::: "memory"); +} +#elif defined(__x86_64__) +static inline void __cpu_relax(void) { + _mm_pause(); +} +#else +static inline void __cpu_relax(void) {;} +#endif + +static void __cpumask_next(const bool * global_mask, bool * local_mask, bool strict, int32_t* iter) { + if (!global_mask) { + memset(local_mask, 1, GGML_N_CORES_MAX); + return; + } + if (!strict) { + memcpy(local_mask, global_mask, GGML_N_CORES_MAX); + return; + } else { + memset(local_mask, 0, GGML_N_CORES_MAX); + int32_t base_idx = *iter; + for (int32_t i = 0; i < GGML_N_CORES_MAX; i++) { + int32_t idx = base_idx + i; + if (idx >= GGML_N_CORES_MAX) { + // Just a cheaper modulo + idx -= GGML_N_CORES_MAX; + } + if (global_mask[idx]) { + local_mask[idx] = 1; + *iter = idx + 1; + return; + } + } + } +} + +struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) { + struct ggml_compute_threadpool * threadpool = + GGML_ALIGNED_MALLOC(sizeof(struct ggml_compute_threadpool)); + + { + threadpool->n_ready = 1; // the main thread is "ready" + threadpool->n_active = 0; + threadpool->node_n = 0; + threadpool->node_task = GGML_TASK_TYPE_FINALIZE; + threadpool->stop = false; + threadpool->pause = true; + threadpool->cgraph = NULL; + threadpool->cplan = NULL; + threadpool->workers = NULL; + threadpool->n_threads_max = tpp->n_threads; + threadpool->n_threads_cur = 0; + threadpool->poll = tpp->poll; + threadpool->prio = tpp->prio; + + threadpool->perf_node_start_cycles = 0ULL; + threadpool->perf_node_start_time_us = 0ULL; + + threadpool->abort_callback = NULL; + threadpool->abort_callback_data = NULL; + threadpool->current_chunk = 0; + } + + ggml_mutex_init(&threadpool->mutex); + ggml_cond_init(&threadpool->cond); + + struct ggml_compute_state * workers = + GGML_ALIGNED_MALLOC(sizeof(struct ggml_compute_state) * tpp->n_threads); + + threadpool->workers = workers; + + int cpumask_iter = 0; + + __process_priority(tpp->prio); + __thread_priority(tpp->prio); + + for (int j = 0; j < tpp->n_threads; j++) { + workers[j] = (struct ggml_compute_state) { + .thrd = 0, + .ith = j, + .threadpool = threadpool, + .ec = GGML_STATUS_SUCCESS, + .mask_specified = false + }; + + if (tpp->mask_specified) { + __cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter); + workers[j].mask_specified = true; + } + + // Spin threads for all secondary workers + if (j > 0) { + int32_t rc = ggml_thread_create( + &workers[j].thrd, + NULL, + ggml_graph_compute_secondary_thread, + &workers[j] + ); + GGML_ASSERT(rc == 0); + } + } + + // Ensure all threads entered the compute loop before returning. + while (atomic_load(&threadpool->n_ready) != threadpool->n_threads_max) { ; } + + return threadpool; +} + +void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) { + if (!threadpool) return; + + struct ggml_compute_state* workers = threadpool->workers; + const int32_t n_threads = threadpool->n_threads_max; + + // Don't really need to lock in the polling mode but it doesn't hurt + ggml_mutex_lock(&threadpool->mutex); + threadpool->n_threads_cur = n_threads; + threadpool->stop = true; + threadpool->pause = false; + + ggml_cond_broadcast(&threadpool->cond); + ggml_mutex_unlock(&threadpool->mutex); + + for (int32_t j = 1; j < n_threads; j++) { + int32_t rc = ggml_thread_join(workers[j].thrd, NULL); + GGML_ASSERT(rc == GGML_EXIT_SUCCESS || rc == GGML_EXIT_ABORTED); + UNUSED(rc); + } + + GGML_ALIGNED_FREE(workers); + + ggml_mutex_destroy(&threadpool->mutex); + ggml_cond_destroy(&threadpool->cond); + + GGML_ALIGNED_FREE(threadpool); +} + +void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) { + GGML_PRINT_DEBUG("Pausing threadpool\n"); + threadpool->pause = true; +} + +void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) { + ggml_mutex_lock(&threadpool->mutex); + GGML_PRINT_DEBUG("Resuming threadpool\n"); + threadpool->pause = false; + ggml_cond_broadcast(&threadpool->cond); + ggml_mutex_unlock(&threadpool->mutex); +} + static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) { // wait for other threads to finish const int last_node_n = * node_n; while (true) { if (do_yield) { - sched_yield(); + __cpu_relax(); } - * node_n = atomic_load(&state->shared->node_n); + * node_n = atomic_load(&state->threadpool->node_n); if (* node_n != last_node_n) break; #if defined(__SSE3__) // Tell the processor we're spinning. It's a processor hint for spinlocks. @@ -19332,10 +19678,10 @@ static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_co while (true) { if (do_yield) { - sched_yield(); + __cpu_relax(); } - * task_phase = atomic_load(&state->shared->node_task); + * task_phase = atomic_load(&state->threadpool->node_task); if (* task_phase != last_task_phase) break; #if defined(__SSE3__) // Tell the processor we're spinning. It's a processor hint for spinlocks. @@ -19344,13 +19690,13 @@ static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_co } } -static thread_ret_t ggml_graph_compute_thread(void * data) { - struct ggml_compute_state * state = (struct ggml_compute_state *) data; +static thread_ret_t ggml_graph_compute_thread(struct ggml_compute_state * state) { + struct ggml_compute_threadpool * threadpool = state->threadpool; - const struct ggml_cgraph * cgraph = state->shared->cgraph; - const struct ggml_cplan * cplan = state->shared->cplan; + const struct ggml_cgraph * cgraph = threadpool->cgraph; + const struct ggml_cplan * cplan = threadpool->cplan; - const int n_threads = state->shared->n_threads; + const int n_threads = threadpool->n_threads_cur; set_numa_thread_affinity(state->ith); @@ -19359,12 +19705,12 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { while (true) { if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - state->shared->node_n += 1; + threadpool->node_n += 1; state->ec = GGML_STATUS_ABORTED; return 0; } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { + if (atomic_fetch_sub(&threadpool->n_active, 1) == 1) { // all other threads are finished and spinning // do finalize and init here so we don't have synchronize again struct ggml_compute_params params = { @@ -19379,20 +19725,20 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { /* FINALIZE */ struct ggml_tensor * node = cgraph->nodes[node_n]; if (GGML_OP_HAS_FINALIZE[node->op]) { - params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); + params.nth = ggml_get_n_tasks(node, n_threads, threadpool->n_threads_cur); ggml_compute_forward(¶ms, node, state); } - ggml_graph_compute_perf_stats_node(node, state->shared); + ggml_graph_compute_perf_stats_node(node, threadpool); } // distribute new work or execute it direct if 1T while (++node_n < cgraph->n_nodes) { GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes); struct ggml_tensor * node = cgraph->nodes[node_n]; - const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); + const int n_tasks = ggml_get_n_tasks(node, n_threads, threadpool->n_threads_cur); - state->shared->perf_node_start_cycles = ggml_perf_cycles(); - state->shared->perf_node_start_time_us = ggml_perf_time_us(); + threadpool->perf_node_start_cycles = ggml_perf_cycles(); + threadpool->perf_node_start_time_us = ggml_perf_time_us(); params.nth = n_tasks; @@ -19413,7 +19759,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { ggml_compute_forward(¶ms, node, state); } - ggml_graph_compute_perf_stats_node(node, state->shared); + ggml_graph_compute_perf_stats_node(node, threadpool); } else { break; } @@ -19424,9 +19770,9 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { } task_phase = GGML_TASK_TYPE_INIT; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_n, node_n); - atomic_store(&state->shared->node_task, task_phase); + atomic_store(&threadpool->n_active, n_threads); + atomic_store(&threadpool->node_n, node_n); + atomic_store(&threadpool->node_task, task_phase); } else { ggml_graph_compute_thread_sync_node(&node_n, state, false); ggml_graph_compute_thread_sync_task(&task_phase, state, false); @@ -19437,7 +19783,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { /* INIT & COMPUTE */ struct ggml_tensor * node = cgraph->nodes[node_n]; - const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); + const int n_tasks = ggml_get_n_tasks(node, n_threads, threadpool->n_threads_cur); struct ggml_compute_params params = { /*.type =*/ GGML_TASK_TYPE_INIT, @@ -19453,12 +19799,11 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { } } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { + if (atomic_fetch_sub(&threadpool->n_active, 1) == 1) { task_phase = GGML_TASK_TYPE_COMPUTE; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_task, task_phase); - } - else { + atomic_store(&threadpool->n_active, n_threads); + atomic_store(&threadpool->node_task, task_phase); + } else { // TODO: this sched_yield can have significant impact on the performance - either positive or negative // depending on the workload and the operating system. // since it is not clear what is the best approach, it should potentially become user-configurable @@ -19473,10 +19818,10 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { ggml_compute_forward(¶ms, node, state); } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { + if (atomic_fetch_sub(&threadpool->n_active, 1) == 1) { task_phase = GGML_TASK_TYPE_FINALIZE; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_task, task_phase); + atomic_store(&threadpool->n_active, n_threads); + atomic_store(&threadpool->node_task, task_phase); } else { ggml_graph_compute_thread_sync_task(&task_phase, state, false); @@ -19486,9 +19831,88 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { return 0; } -struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threads) { - if (n_threads <= 0) { - n_threads = GGML_DEFAULT_N_THREADS; +static inline int32_t ggml_graph_compute_check_for_work(struct ggml_compute_state * state) { + int32_t node_n; + struct ggml_compute_threadpool * threadpool = state->threadpool; + + do { + if (threadpool->poll) { + node_n = atomic_load(&threadpool->node_n); + if (node_n != -1) { + // No new work. Yield, and keep polling. + sched_yield(); + node_n = atomic_load(&threadpool->node_n); + } + } else { + ggml_mutex_lock_shared(&threadpool->mutex); + node_n = atomic_load(&threadpool->node_n); + if (node_n != -1 && !threadpool->stop && !threadpool->pause) { + // No new work. Wait for the signal. + ggml_cond_wait(&threadpool->cond, &threadpool->mutex); + node_n = atomic_load(&threadpool->node_n); + } + ggml_mutex_unlock_shared(&threadpool->mutex); + } + } while (state->ith >= threadpool->n_threads_cur); + return node_n; +} + +static thread_ret_t ggml_graph_compute_secondary_thread(void* data) { + struct ggml_compute_state * state = (struct ggml_compute_state *) data; + struct ggml_compute_threadpool * threadpool = state->threadpool; + + __thread_priority(threadpool->prio); + if (state->mask_specified) + __thread_affinity(state->cpumask); + + // Indicate that we're ready to go + atomic_fetch_add(&threadpool->n_ready, 1); + + while (true) { + // Check if we need to sleep + while (threadpool->pause) { + GGML_PRINT_DEBUG("thread #%d inside pause loop\n", state->ith); + ggml_mutex_lock_shared(&threadpool->mutex); + if (threadpool->pause) { + ggml_cond_wait(&threadpool->cond, &threadpool->mutex); + } + GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith); + ggml_mutex_unlock_shared(&threadpool->mutex); + } + // This needs to be checked for after the cond_wait + if (threadpool->stop) break; + + // Check if there is new work + // node_n == -1 means we have a fresh graph to compute on. + // Only the main thread sets node_n back to -1. + + int32_t node_n = ggml_graph_compute_check_for_work(state); + if (node_n == -1) { + atomic_fetch_sub(&threadpool->n_ready, 1); + + int64_t ret = (int64_t) ggml_graph_compute_thread(state); + if (ret == GGML_EXIT_ABORTED) + return (thread_ret_t) ret; + + if (ret != GGML_EXIT_SUCCESS && ret != GGML_EXIT_ABORTED) { + fprintf(stderr, "ggml_graph_compute_thread exited with an unexpected error: %lld\n", (long long int) ret); + GGML_ASSERT(false); + } + + atomic_fetch_add(&threadpool->n_ready, 1); + } + } + + return (thread_ret_t) 0; +} + +struct ggml_cplan ggml_graph_plan( + const struct ggml_cgraph * cgraph, + int32_t n_threads, + struct ggml_compute_threadpool * threadpool +) { + if (threadpool == NULL) { + //GGML_PRINT("WARNING: Threadpool is not specified. Will create a disposable threadpool\n"); } size_t work_size = 0; @@ -19660,12 +20084,13 @@ struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threa } if (work_size > 0) { - work_size += CACHE_LINE_SIZE*(n_threads - 1); + work_size += CACHE_LINE_SIZE*(n_threads); } - cplan.n_threads = MIN(max_tasks, n_threads); - cplan.work_size = work_size; - cplan.work_data = NULL; + cplan.threadpool = threadpool; + cplan.n_threads = MIN(max_tasks, n_threads); + cplan.work_size = work_size; + cplan.work_data = NULL; return cplan; } @@ -19680,63 +20105,84 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl } } - const int n_threads = cplan->n_threads; + const int64_t perf_start_cycles = ggml_perf_cycles(); + const int64_t perf_start_time_us = ggml_perf_time_us(); - struct ggml_compute_state_shared state_shared = { - /*.cgraph =*/ cgraph, - /*.cgraph_plan =*/ cplan, - /*.perf_node_start_cycles =*/ 0, - /*.perf_node_start_time_us =*/ 0, - /*.n_threads =*/ n_threads, - /*.n_active =*/ n_threads, - /*.node_n =*/ -1, - /*.node_task =*/ GGML_TASK_TYPE_FINALIZE, - /*.abort_callback =*/ NULL, - /*.abort_callback_data =*/ NULL, - /*.current_chunk; =*/ 0, - }; - struct ggml_compute_state * workers = alloca(sizeof(struct ggml_compute_state)*n_threads); + const int n_threads = cplan->n_threads; + struct ggml_compute_threadpool * threadpool = cplan->threadpool; + bool disposable_threadpool = false; + + if (threadpool == NULL) { + // GGML_PRINT("NOTE: Threadpool is not specified. Will create a disposable threadpool\n"); + struct ggml_threadpool_params tpp = { + .mask_specified = false, + .n_threads = n_threads, + .strict_cpu = false, + .prio = 1, + .poll = false + }; - // create thread pool - if (n_threads > 1) { - for (int j = 1; j < n_threads; ++j) { - workers[j] = (struct ggml_compute_state) { - .thrd = 0, - .ith = j, - .shared = &state_shared, - .ec = GGML_STATUS_SUCCESS, - }; + threadpool = ggml_create_threadpool(&tpp); + ggml_resume_threadpool(threadpool); + disposable_threadpool = true; + } else if (n_threads > threadpool->n_threads_max) { + GGML_PRINT("WARNING: Requesting more threads that the threadpool contains. Expect a bad time.\n"); + } - const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]); - GGML_ASSERT(rc == 0); - UNUSED(rc); - } + // Initialize worker ordering + for (int j = 0; j < n_threads; ++j) { + threadpool->workers[j].ith = j; } - workers[0].ith = 0; - workers[0].shared = &state_shared; - workers[0].ec = GGML_STATUS_SUCCESS; + // Update main thread affinity to match the current threadpool + if (threadpool->workers[0].mask_specified) + __thread_affinity(threadpool->workers[0].cpumask); - const int64_t perf_start_cycles = ggml_perf_cycles(); - const int64_t perf_start_time_us = ggml_perf_time_us(); + // Set up work + threadpool->cgraph = cgraph; + threadpool->cplan = cplan; + threadpool->n_active = n_threads; + threadpool->n_threads_cur = n_threads; + + atomic_store(&threadpool->node_n, -1); + + // Kick threadpool + if (!threadpool->poll && n_threads > 1) { + ggml_mutex_lock(&threadpool->mutex); + ggml_cond_broadcast(&threadpool->cond); + ggml_mutex_unlock(&threadpool->mutex); + } + + int compute_status = GGML_STATUS_SUCCESS; - // this is a work thread too - ggml_graph_compute_thread(&workers[0]); - enum ggml_status compute_status = workers[0].ec; + // The main-thread is a work thread too. Start computing... + if (n_threads > 1) { + atomic_fetch_sub(&threadpool->n_ready, 1); + } + compute_status = (size_t) ggml_graph_compute_thread(&threadpool->workers[0]); // don't leave affinity set on the main thread clear_numa_thread_affinity(); - // join or kill thread pool + // Wait for all other threads to finish if (n_threads > 1) { - for (int j = 1; j < n_threads; j++) { - const int rc = ggml_thread_join(workers[j].thrd, NULL); - GGML_ASSERT(rc == 0); - if (workers[j].ec != GGML_STATUS_SUCCESS) - compute_status = workers[j].ec; + atomic_fetch_add(&threadpool->n_ready, 1); + // wait for thread pool + while (atomic_load(&threadpool->n_ready) < threadpool->n_threads_max) { + __cpu_relax(); + } + } + + for (int j = 0; j < n_threads; ++j) { + if (threadpool->workers[j].ec != GGML_STATUS_SUCCESS) { + compute_status = threadpool->workers[j].ec; } } + if (disposable_threadpool) { + ggml_release_threadpool(threadpool); + } + // performance stats (graph) { int64_t perf_cycles_cur = ggml_perf_cycles() - perf_start_cycles; @@ -19757,8 +20203,12 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl return compute_status; } -enum ggml_status ggml_graph_compute_with_ctx(struct ggml_context * ctx, struct ggml_cgraph * cgraph, int n_threads) { - struct ggml_cplan cplan = ggml_graph_plan(cgraph, n_threads); +enum ggml_status ggml_graph_compute_with_ctx( + struct ggml_context * ctx, + struct ggml_cgraph * cgraph, + int32_t n_threads +) { + struct ggml_cplan cplan = ggml_graph_plan(cgraph, n_threads, NULL); struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size); @@ -20569,7 +21019,7 @@ static enum ggml_opt_result ggml_opt_adam( float * pf = params.past > 0 ? opt->adam.pf->data : NULL; // past function values - struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads); + struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads, NULL); struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size); cplan.work_data = (uint8_t *)ctx->mem_buffer + obj->offs; @@ -20916,7 +21366,7 @@ static enum ggml_opt_result ggml_opt_lbfgs( opt->iter = iter; } - struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads); + struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads, NULL); struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size); cplan.work_data = (uint8_t *)ctx->mem_buffer + obj->offs; diff --git a/ggml.h b/ggml.h index f38699698b1e9..75b97a62f127c 100644 --- a/ggml.h +++ b/ggml.h @@ -274,6 +274,8 @@ #define GGML_UNREACHABLE() ((void) 0) #endif +#define GGML_N_CORES_MAX 512 + // used to copy the number of elements and stride in bytes of tensors into local variables. // main purpose is to reduce code duplication and improve readability. // @@ -609,6 +611,8 @@ extern "C" { // If it returns true, the computation is aborted typedef bool (*ggml_abort_callback)(void * data); + struct ggml_compute_threadpool; + // the compute plan that needs to be prepared for ggml_graph_compute() // since https://github.com/ggerganov/ggml/issues/287 struct ggml_cplan { @@ -616,6 +620,7 @@ extern "C" { uint8_t * work_data; // work buffer, to be allocated by caller before calling to `ggml_graph_compute()` int n_threads; + struct ggml_compute_threadpool * threadpool; // abort ggml_graph_compute when true ggml_abort_callback abort_callback; @@ -653,6 +658,15 @@ extern "C" { int64_t perf_time_us; }; + struct ggml_threadpool_params { + bool cpumask[GGML_N_CORES_MAX]; + bool mask_specified; + int32_t n_threads; + int32_t prio; + bool poll; + bool strict_cpu; + }; + // scratch buffer struct ggml_scratch { size_t offs; @@ -2038,10 +2052,19 @@ extern "C" { GGML_API size_t ggml_graph_overhead(void); GGML_API size_t ggml_graph_overhead_custom(size_t size, bool grads); + GGML_API struct ggml_compute_threadpool* ggml_create_threadpool (struct ggml_threadpool_params * params); + GGML_API void ggml_release_threadpool (struct ggml_compute_threadpool * threadpool); + GGML_API int32_t ggml_threadpool_get_n_threads(struct ggml_compute_threadpool * threadpool); + GGML_API void ggml_pause_threadpool (struct ggml_compute_threadpool * threadpool); + GGML_API void ggml_resume_threadpool (struct ggml_compute_threadpool * threadpool); + // ggml_graph_plan() has to be called before ggml_graph_compute() // when plan.work_size > 0, caller must allocate memory for plan.work_data - GGML_API struct ggml_cplan ggml_graph_plan (const struct ggml_cgraph * cgraph, int n_threads /*= GGML_DEFAULT_N_THREADS*/); - GGML_API enum ggml_status ggml_graph_compute ( struct ggml_cgraph * cgraph, struct ggml_cplan * cplan); + GGML_API struct ggml_cplan ggml_graph_plan( + const struct ggml_cgraph * cgraph, + int n_threads, + struct ggml_compute_threadpool * threadpool); + GGML_API enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan); // same as ggml_graph_compute() but the work data is allocated as a part of the context // note: the drawback of this API is that you must have ensured that the context has enough memory for the work data GGML_API enum ggml_status ggml_graph_compute_with_ctx(struct ggml_context * ctx, struct ggml_cgraph * cgraph, int n_threads); diff --git a/llama.cpp b/llama.cpp index 40d2ec2c967f2..cd0ba2ede3401 100644 --- a/llama.cpp +++ b/llama.cpp @@ -2304,6 +2304,9 @@ struct llama_context { #endif ggml_backend_t backend_cpu = nullptr; + ggml_compute_threadpool_t threadpool = nullptr; + ggml_compute_threadpool_t threadpool_batch = nullptr; + const llama_model & model; // key + value cache for the self attention @@ -11942,9 +11945,15 @@ static size_t llama_output_reserve(llama_context & lctx, size_t n_outputs) { static void llama_graph_compute( - llama_context & lctx, - ggml_cgraph * gf, - int n_threads) { + llama_context & lctx, + ggml_cgraph * gf, + int n_threads, + ggml_compute_threadpool * threadpool) { +#ifdef GGML_USE_MPI + const int64_t n_layer = lctx.model.hparams.n_layer; + ggml_mpi_graph_compute_pre(lctx.ctx_mpi, gf, n_layer); +#endif + #ifdef GGML_USE_METAL if (ggml_backend_is_metal(lctx.backend_metal)) { ggml_backend_metal_set_n_cb(lctx.backend_metal, n_threads); @@ -11953,6 +11962,7 @@ static void llama_graph_compute( if (lctx.backend_cpu != nullptr) { ggml_backend_cpu_set_n_threads(lctx.backend_cpu, n_threads); + ggml_backend_cpu_set_threadpool(lctx.backend_cpu, threadpool); ggml_backend_cpu_set_abort_callback(lctx.backend_cpu, lctx.abort_callback, lctx.abort_callback_data); } @@ -12079,7 +12089,32 @@ static int llama_decode_internal( lctx.n_outputs = n_outputs_new; } - int n_threads = n_tokens == 1 ? cparams.n_threads : cparams.n_threads_batch; + int n_threads; + ggml_compute_threadpool* threadpool = nullptr; // nullptr -> disposable threadpool + if (n_tokens == 1) { + if (lctx.threadpool_batch) { + ggml_pause_threadpool(lctx.threadpool_batch); + } + if (lctx.threadpool) { + ggml_resume_threadpool(lctx.threadpool); + threadpool = lctx.threadpool; + } + n_threads = cparams.n_threads; + + } else { + if (lctx.threadpool && !lctx.threadpool_batch) { + ggml_pause_threadpool(lctx.threadpool); + } + if (lctx.threadpool_batch) { + ggml_resume_threadpool(lctx.threadpool_batch); + threadpool = lctx.threadpool_batch; + } else if (lctx.threadpool) { + ggml_resume_threadpool(lctx.threadpool); + threadpool = lctx.threadpool; + } + n_threads = cparams.n_threads_batch; + } + GGML_ASSERT(n_threads > 0); // helpers for smoother batch API transition @@ -12192,7 +12227,7 @@ static int llama_decode_internal( llama_set_inputs(lctx, u_batch); - llama_graph_compute(lctx, gf, n_threads); + llama_graph_compute(lctx, gf, n_threads, threadpool); // update the kv ring buffer { @@ -12516,7 +12551,7 @@ static void llama_kv_cache_defrag_internal(struct llama_context & lctx) { ggml_cgraph * gf = llama_build_graph_defrag(lctx, ids); - llama_graph_compute(lctx, gf, lctx.cparams.n_threads); + llama_graph_compute(lctx, gf, lctx.cparams.n_threads, lctx.threadpool); #endif //const int64_t t_end = ggml_time_us(); @@ -12538,7 +12573,7 @@ static void llama_kv_cache_update_internal(struct llama_context & lctx) { llama_set_k_shift(lctx); - llama_graph_compute(lctx, gf, lctx.cparams.n_threads); + llama_graph_compute(lctx, gf, lctx.cparams.n_threads, lctx.threadpool); need_reserve = true; } @@ -12564,7 +12599,7 @@ static void llama_kv_cache_update_internal(struct llama_context & lctx) { llama_set_s_copy(lctx); - llama_graph_compute(lctx, gf, lctx.cparams.n_threads); + llama_graph_compute(lctx, gf, lctx.cparams.n_threads, lctx.threadpool); need_reserve = true; } @@ -16136,6 +16171,31 @@ void llama_numa_init(enum ggml_numa_strategy numa) { } } +void llama_attach_threadpool( + struct llama_context * ctx, + ggml_compute_threadpool_t threadpool) { + ctx->threadpool = threadpool; +} + +void llama_attach_batch_threadpool( + struct llama_context * ctx, + ggml_compute_threadpool_t threadpool_batch) { + ctx->threadpool_batch = threadpool_batch; +} + +void llama_detach_threadpool(struct llama_context * ctx) { + ctx->threadpool = nullptr; +} + +void llama_detach_batch_threadpool(struct llama_context * ctx) { + ctx->threadpool = nullptr; +} + +void llama_detach_threadpools(struct llama_context * ctx) { + llama_detach_threadpool(ctx); + llama_detach_batch_threadpool(ctx); +} + void llama_backend_free(void) { ggml_quantize_free(); } diff --git a/llama.h b/llama.h index 95105c28e5e42..e2bfbfa9bdfb6 100644 --- a/llama.h +++ b/llama.h @@ -391,6 +391,16 @@ extern "C" { //optional: LLAMA_API void llama_numa_init(enum ggml_numa_strategy numa); + LLAMA_API void llama_attach_threadpool( + struct llama_context * ctx, + ggml_compute_threadpool_t threadpool); + LLAMA_API void llama_attach_batch_threadpool( + struct llama_context * ctx, + ggml_compute_threadpool_t threadpool); + LLAMA_API void llama_detach_threadpool(struct llama_context * ctx); + LLAMA_API void llama_detach_batch_threadpool(struct llama_context * ctx); + LLAMA_API void llama_detach_threadpools(struct llama_context * ctx); + // Call once at the end of the program - currently only used for MPI LLAMA_API void llama_backend_free(void); diff --git a/tests/test-rope.cpp b/tests/test-rope.cpp index 26c1f42dc0e95..7e55cca579866 100644 --- a/tests/test-rope.cpp +++ b/tests/test-rope.cpp @@ -113,7 +113,7 @@ static struct ggml_tensor * get_random_tensor_f32( } static void ggml_graph_compute_helper(std::vector & buf, ggml_cgraph * graph, int n_threads) { - struct ggml_cplan plan = ggml_graph_plan(graph, n_threads); + struct ggml_cplan plan = ggml_graph_plan(graph, n_threads, nullptr); if (plan.work_size > 0) { buf.resize(plan.work_size);