From 203725aabc0ab9dfbb03e77aa73bd8e6340c3eaf Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 14 Apr 2025 08:56:13 +0200 Subject: [PATCH 1/7] server : use std::move whenever possible --- examples/server/server.cpp | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index d87bda1a0d5af..22a10de393176 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1552,11 +1552,11 @@ struct server_queue { std::condition_variable condition_tasks; // callback functions - std::function callback_new_task; - std::function callback_update_slots; + std::function callback_new_task; + std::function callback_update_slots; // Add a new task to the end of the queue - int post(server_task task, bool front = false) { + int post(server_task & task, bool front = false) { std::unique_lock lock(mutex_tasks); GGML_ASSERT(task.id != -1); // if this is cancel task make sure to clean up pending tasks @@ -1596,7 +1596,7 @@ struct server_queue { } // Add a new task, but defer until one slot is available - void defer(server_task task) { + void defer(server_task & task) { std::unique_lock lock(mutex_tasks); QUE_DBG("defer task, id = %d\n", task.id); queue_tasks_deferred.push_back(std::move(task)); @@ -1611,7 +1611,7 @@ struct server_queue { } // Register function to process a new task - void on_new_task(std::function callback) { + void on_new_task(std::function callback) { callback_new_task = std::move(callback); } @@ -1660,12 +1660,12 @@ struct server_queue { lock.unlock(); break; } - server_task task = queue_tasks.front(); + server_task task = std::move(queue_tasks.front()); queue_tasks.pop_front(); lock.unlock(); QUE_DBG("processing task, id = %d\n", task.id); - callback_new_task(std::move(task)); + callback_new_task(task); } // all tasks in the current loop is processed, slots data is now ready @@ -2004,7 +2004,7 @@ struct server_context { slot.reset(); - slots.push_back(slot); + slots.push_back(std::move(slot)); } default_generation_settings_for_props = slots[0].to_json(); @@ -2547,7 +2547,7 @@ struct server_context { server_task task(SERVER_TASK_TYPE_CANCEL); task.id_target = id_task; queue_results.remove_waiting_task_id(id_task); - cancel_tasks.push_back(task); + cancel_tasks.push_back(std::move(task)); } // push to beginning of the queue, so it has highest priority queue_tasks.post(cancel_tasks, true); @@ -2637,7 +2637,7 @@ struct server_context { // Functions to process the task // - void process_single_task(server_task task) { + void process_single_task(server_task & task) { switch (task.type) { case SERVER_TASK_TYPE_COMPLETION: case SERVER_TASK_TYPE_INFILL: @@ -3965,7 +3965,7 @@ int main(int argc, char ** argv) { task.params.oaicompat_cmpl_id = completion_id; // oaicompat_model is already populated by params_from_json_cmpl - tasks.push_back(task); + tasks.push_back(std::move(task)); } } catch (const std::exception & e) { res_error(res, format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST)); @@ -4280,7 +4280,7 @@ int main(int argc, char ** argv) { // OAI-compat task.params.oaicompat = oaicompat; - tasks.push_back(task); + tasks.push_back(std::move(task)); } ctx_server.queue_results.add_waiting_tasks(tasks); @@ -4376,7 +4376,7 @@ int main(int argc, char ** argv) { task.id = ctx_server.queue_tasks.get_new_id(); task.index = i; task.prompt_tokens = format_rerank(ctx_server.vocab, tokenized_query, tokenized_docs[i]); - tasks.push_back(task); + tasks.push_back(std::move(task)); } ctx_server.queue_results.add_waiting_tasks(tasks); @@ -4582,7 +4582,7 @@ int main(int argc, char ** argv) { common_chat_templates_source(ctx_server.chat_templates.get()), common_chat_format_example(ctx_server.chat_templates.get(), ctx_server.params_base.use_jinja).c_str()); - ctx_server.queue_tasks.on_new_task([&ctx_server](const server_task & task) { + ctx_server.queue_tasks.on_new_task([&ctx_server](server_task & task) { ctx_server.process_single_task(task); }); From b7aea032eb85725ca4150bbc8963b88088ba8b1d Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 14 Apr 2025 19:10:56 +0200 Subject: [PATCH 2/7] use r-value ref --- examples/server/server.cpp | 60 +++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 22a10de393176..eebc123f374fb 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1552,11 +1552,11 @@ struct server_queue { std::condition_variable condition_tasks; // callback functions - std::function callback_new_task; + std::function callback_new_task; std::function callback_update_slots; // Add a new task to the end of the queue - int post(server_task & task, bool front = false) { + int post(server_task && task, bool front = false) { std::unique_lock lock(mutex_tasks); GGML_ASSERT(task.id != -1); // if this is cancel task make sure to clean up pending tasks @@ -1565,16 +1565,16 @@ struct server_queue { } QUE_DBG("new task, id = %d, front = %d\n", task.id, front); if (front) { - queue_tasks.push_front(std::move(task)); + queue_tasks.push_front(task); } else { - queue_tasks.push_back(std::move(task)); + queue_tasks.push_back(task); } condition_tasks.notify_one(); return task.id; } // multi-task version of post() - int post(std::vector & tasks, bool front = false) { + int post(std::vector && tasks, bool front = false) { std::unique_lock lock(mutex_tasks); for (auto & task : tasks) { if (task.id == -1) { @@ -1596,10 +1596,10 @@ struct server_queue { } // Add a new task, but defer until one slot is available - void defer(server_task & task) { + void defer(server_task && task) { std::unique_lock lock(mutex_tasks); QUE_DBG("defer task, id = %d\n", task.id); - queue_tasks_deferred.push_back(std::move(task)); + queue_tasks_deferred.push_back(task); condition_tasks.notify_one(); } @@ -1611,7 +1611,7 @@ struct server_queue { } // Register function to process a new task - void on_new_task(std::function callback) { + void on_new_task(std::function callback) { callback_new_task = std::move(callback); } @@ -1665,7 +1665,7 @@ struct server_queue { lock.unlock(); QUE_DBG("processing task, id = %d\n", task.id); - callback_new_task(task); + callback_new_task(std::move(task)); } // all tasks in the current loop is processed, slots data is now ready @@ -2105,7 +2105,7 @@ struct server_context { return true; } - bool launch_slot_with_task(server_slot & slot, const server_task & task) { + bool launch_slot_with_task(server_slot & slot, const server_task && task) { slot.reset(); slot.id_task = task.id; slot.index = task.index; @@ -2550,7 +2550,7 @@ struct server_context { cancel_tasks.push_back(std::move(task)); } // push to beginning of the queue, so it has highest priority - queue_tasks.post(cancel_tasks, true); + queue_tasks.post(std::move(cancel_tasks), true); } // receive the results from task(s) @@ -2637,7 +2637,7 @@ struct server_context { // Functions to process the task // - void process_single_task(server_task & task) { + void process_single_task(server_task && task) { switch (task.type) { case SERVER_TASK_TYPE_COMPLETION: case SERVER_TASK_TYPE_INFILL: @@ -2651,17 +2651,17 @@ struct server_context { if (slot == nullptr) { // if no slot is available, we defer this task for processing later SRV_DBG("no slot is available, defer task, id_task = %d\n", task.id); - queue_tasks.defer(task); + queue_tasks.defer(std::move(task)); break; } if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id); - queue_tasks.defer(task); + queue_tasks.defer(std::move(task)); break; } - if (!launch_slot_with_task(*slot, task)) { + if (!launch_slot_with_task(*slot, std::move(task))) { SRV_ERR("failed to launch slot with task, id_task = %d\n", task.id); break; } @@ -2740,7 +2740,7 @@ struct server_context { if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id); - queue_tasks.defer(task); + queue_tasks.defer(std::move(task)); break; } @@ -2776,7 +2776,7 @@ struct server_context { if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id); - queue_tasks.defer(task); + queue_tasks.defer(std::move(task)); break; } @@ -2819,7 +2819,7 @@ struct server_context { if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id); - queue_tasks.defer(task); + queue_tasks.defer(std::move(task)); break; } @@ -2871,7 +2871,7 @@ struct server_context { server_task task(SERVER_TASK_TYPE_NEXT_RESPONSE); task.id = queue_tasks.get_new_id(); - queue_tasks.post(task); + queue_tasks.post(std::move(task)); } // apply context-shift if needed @@ -3636,7 +3636,7 @@ int main(int argc, char ** argv) { server_task task(SERVER_TASK_TYPE_METRICS); task.id = ctx_server.queue_tasks.get_new_id(); ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(task, true); // high-priority task + ctx_server.queue_tasks.post(std::move(task), true); // high-priority task // get the result server_task_result_ptr result = ctx_server.queue_results.recv(task.id); @@ -3674,7 +3674,7 @@ int main(int argc, char ** argv) { task.metrics_reset_bucket = true; ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(task, true); // high-priority task + ctx_server.queue_tasks.post(std::move(task), true); // high-priority task // get the result server_task_result_ptr result = ctx_server.queue_results.recv(task.id); @@ -3782,7 +3782,7 @@ int main(int argc, char ** argv) { task.slot_action.filepath = filepath; ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(task); + ctx_server.queue_tasks.post(std::move(task)); server_task_result_ptr result = ctx_server.queue_results.recv(task.id); ctx_server.queue_results.remove_waiting_task_id(task.id); @@ -3811,7 +3811,7 @@ int main(int argc, char ** argv) { task.slot_action.filepath = filepath; ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(task); + ctx_server.queue_tasks.post(std::move(task)); server_task_result_ptr result = ctx_server.queue_results.recv(task.id); ctx_server.queue_results.remove_waiting_task_id(task.id); @@ -3831,7 +3831,7 @@ int main(int argc, char ** argv) { task.slot_action.slot_id = id_slot; ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(task); + ctx_server.queue_tasks.post(std::move(task)); server_task_result_ptr result = ctx_server.queue_results.recv(task.id); ctx_server.queue_results.remove_waiting_task_id(task.id); @@ -3973,7 +3973,7 @@ int main(int argc, char ** argv) { } ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(tasks); + ctx_server.queue_tasks.post(std::move(tasks)); bool stream = json_value(data, "stream", false); const auto task_ids = server_task::get_list_id(tasks); @@ -4284,7 +4284,7 @@ int main(int argc, char ** argv) { } ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(tasks); + ctx_server.queue_tasks.post(std::move(tasks)); // get the result std::unordered_set task_ids = server_task::get_list_id(tasks); @@ -4380,7 +4380,7 @@ int main(int argc, char ** argv) { } ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(tasks); + ctx_server.queue_tasks.post(std::move(tasks)); // get the result std::unordered_set task_ids = server_task::get_list_id(tasks); @@ -4435,7 +4435,7 @@ int main(int argc, char ** argv) { task.id = ctx_server.queue_tasks.get_new_id(); task.set_lora = parse_lora_request(ctx_server.params_base.lora_adapters, body); ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(task); + ctx_server.queue_tasks.post(std::move(task)); server_task_result_ptr result = ctx_server.queue_results.recv(task.id); ctx_server.queue_results.remove_waiting_task_id(task.id); @@ -4582,8 +4582,8 @@ int main(int argc, char ** argv) { common_chat_templates_source(ctx_server.chat_templates.get()), common_chat_format_example(ctx_server.chat_templates.get(), ctx_server.params_base.use_jinja).c_str()); - ctx_server.queue_tasks.on_new_task([&ctx_server](server_task & task) { - ctx_server.process_single_task(task); + ctx_server.queue_tasks.on_new_task([&ctx_server](server_task && task) { + ctx_server.process_single_task(std::move(task)); }); ctx_server.queue_tasks.on_update_slots([&ctx_server]() { From d8656a140fc3faaf44adceb203f312434c0606b7 Mon Sep 17 00:00:00 2001 From: Xuan-Son Nguyen Date: Tue, 15 Apr 2025 08:54:34 +0200 Subject: [PATCH 3/7] Apply suggestions from code review Co-authored-by: Georgi Gerganov --- examples/server/server.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index eebc123f374fb..3490561a7a838 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1552,8 +1552,8 @@ struct server_queue { std::condition_variable condition_tasks; // callback functions - std::function callback_new_task; - std::function callback_update_slots; + std::function callback_new_task; + std::function callback_update_slots; // Add a new task to the end of the queue int post(server_task && task, bool front = false) { @@ -1611,7 +1611,7 @@ struct server_queue { } // Register function to process a new task - void on_new_task(std::function callback) { + void on_new_task(std::function callback) { callback_new_task = std::move(callback); } From 9487165dfca38f3f7ad8c9173d080c5e4f255a2e Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 15 Apr 2025 09:12:13 +0200 Subject: [PATCH 4/7] make task creation scoped --- examples/server/server.cpp | 234 ++++++++++++++++++++----------------- 1 file changed, 127 insertions(+), 107 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 3490561a7a838..83bd0b8ae1871 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -3633,14 +3633,17 @@ int main(int argc, char ** argv) { } // request slots data using task queue - server_task task(SERVER_TASK_TYPE_METRICS); - task.id = ctx_server.queue_tasks.get_new_id(); - ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(std::move(task), true); // high-priority task + int task_id = ctx_server.queue_tasks.get_new_id(); + { + server_task task(SERVER_TASK_TYPE_METRICS); + task.id = task_id; + ctx_server.queue_results.add_waiting_task_id(task_id); + ctx_server.queue_tasks.post(std::move(task), true); // high-priority task + } // get the result - server_task_result_ptr result = ctx_server.queue_results.recv(task.id); - ctx_server.queue_results.remove_waiting_task_id(task.id); + server_task_result_ptr result = ctx_server.queue_results.recv(task_id); + ctx_server.queue_results.remove_waiting_task_id(task_id); if (result->is_error()) { res_error(res, result->to_json()); @@ -3669,16 +3672,17 @@ int main(int argc, char ** argv) { } // request slots data using task queue - server_task task(SERVER_TASK_TYPE_METRICS); - task.id = ctx_server.queue_tasks.get_new_id(); - task.metrics_reset_bucket = true; - - ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(std::move(task), true); // high-priority task + int task_id = ctx_server.queue_tasks.get_new_id(); + { + server_task task(SERVER_TASK_TYPE_METRICS); + task.id = task_id; + ctx_server.queue_results.add_waiting_task_id(task_id); + ctx_server.queue_tasks.post(std::move(task), true); // high-priority task + } // get the result - server_task_result_ptr result = ctx_server.queue_results.recv(task.id); - ctx_server.queue_results.remove_waiting_task_id(task.id); + server_task_result_ptr result = ctx_server.queue_results.recv(task_id); + ctx_server.queue_results.remove_waiting_task_id(task_id); if (result->is_error()) { res_error(res, result->to_json()); @@ -3775,17 +3779,20 @@ int main(int argc, char ** argv) { } std::string filepath = params.slot_save_path + filename; - server_task task(SERVER_TASK_TYPE_SLOT_SAVE); - task.id = ctx_server.queue_tasks.get_new_id(); - task.slot_action.slot_id = id_slot; - task.slot_action.filename = filename; - task.slot_action.filepath = filepath; + int task_id = ctx_server.queue_tasks.get_new_id(); + { + server_task task(SERVER_TASK_TYPE_SLOT_SAVE); + task.id = ctx_server.queue_tasks.get_new_id(); + task.slot_action.slot_id = id_slot; + task.slot_action.filename = filename; + task.slot_action.filepath = filepath; - ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(std::move(task)); + ctx_server.queue_results.add_waiting_task_id(task_id); + ctx_server.queue_tasks.post(std::move(task)); + } - server_task_result_ptr result = ctx_server.queue_results.recv(task.id); - ctx_server.queue_results.remove_waiting_task_id(task.id); + server_task_result_ptr result = ctx_server.queue_results.recv(task_id); + ctx_server.queue_results.remove_waiting_task_id(task_id); if (result->is_error()) { res_error(res, result->to_json()); @@ -3804,17 +3811,20 @@ int main(int argc, char ** argv) { } std::string filepath = params.slot_save_path + filename; - server_task task(SERVER_TASK_TYPE_SLOT_RESTORE); - task.id = ctx_server.queue_tasks.get_new_id(); - task.slot_action.slot_id = id_slot; - task.slot_action.filename = filename; - task.slot_action.filepath = filepath; + int task_id = ctx_server.queue_tasks.get_new_id(); + { + server_task task(SERVER_TASK_TYPE_SLOT_RESTORE); + task.id = ctx_server.queue_tasks.get_new_id(); + task.slot_action.slot_id = id_slot; + task.slot_action.filename = filename; + task.slot_action.filepath = filepath; - ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(std::move(task)); + ctx_server.queue_results.add_waiting_task_id(task_id); + ctx_server.queue_tasks.post(std::move(task)); + } - server_task_result_ptr result = ctx_server.queue_results.recv(task.id); - ctx_server.queue_results.remove_waiting_task_id(task.id); + server_task_result_ptr result = ctx_server.queue_results.recv(task_id); + ctx_server.queue_results.remove_waiting_task_id(task_id); if (result->is_error()) { res_error(res, result->to_json()); @@ -3826,15 +3836,18 @@ int main(int argc, char ** argv) { }; const auto handle_slots_erase = [&ctx_server, &res_error, &res_ok](const httplib::Request & /* req */, httplib::Response & res, int id_slot) { - server_task task(SERVER_TASK_TYPE_SLOT_ERASE); - task.id = ctx_server.queue_tasks.get_new_id(); - task.slot_action.slot_id = id_slot; + int task_id = ctx_server.queue_tasks.get_new_id(); + { + server_task task(SERVER_TASK_TYPE_SLOT_ERASE); + task.id = ctx_server.queue_tasks.get_new_id(); + task.slot_action.slot_id = id_slot; - ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(std::move(task)); + ctx_server.queue_results.add_waiting_task_id(task_id); + ctx_server.queue_tasks.post(std::move(task)); + } - server_task_result_ptr result = ctx_server.queue_results.recv(task.id); - ctx_server.queue_results.remove_waiting_task_id(task.id); + server_task_result_ptr result = ctx_server.queue_results.recv(task_id); + ctx_server.queue_results.remove_waiting_task_id(task_id); if (result->is_error()) { res_error(res, result->to_json()); @@ -3938,45 +3951,48 @@ int main(int argc, char ** argv) { } auto completion_id = gen_chatcmplid(); - std::vector tasks; - - try { - const auto & prompt = data.at("prompt"); - // TODO: this log can become very long, put it behind a flag or think about a more compact format - //SRV_DBG("Prompt: %s\n", prompt.is_string() ? prompt.get().c_str() : prompt.dump(2).c_str()); - - std::vector tokenized_prompts = tokenize_input_prompts(ctx_server.vocab, prompt, true, true); - tasks.reserve(tokenized_prompts.size()); - for (size_t i = 0; i < tokenized_prompts.size(); i++) { - server_task task = server_task(type); - - task.id = ctx_server.queue_tasks.get_new_id(); - task.index = i; - - task.prompt_tokens = std::move(tokenized_prompts[i]); - task.params = server_task::params_from_json_cmpl( - ctx_server.ctx, - ctx_server.params_base, - data); - task.id_selected_slot = json_value(data, "id_slot", -1); - - // OAI-compat - task.params.oaicompat = oaicompat; - task.params.oaicompat_cmpl_id = completion_id; - // oaicompat_model is already populated by params_from_json_cmpl + std::unordered_set task_ids; + { + std::vector tasks; - tasks.push_back(std::move(task)); + try { + const auto & prompt = data.at("prompt"); + // TODO: this log can become very long, put it behind a flag or think about a more compact format + //SRV_DBG("Prompt: %s\n", prompt.is_string() ? prompt.get().c_str() : prompt.dump(2).c_str()); + + std::vector tokenized_prompts = tokenize_input_prompts(ctx_server.vocab, prompt, true, true); + tasks.reserve(tokenized_prompts.size()); + for (size_t i = 0; i < tokenized_prompts.size(); i++) { + server_task task = server_task(type); + + task.id = ctx_server.queue_tasks.get_new_id(); + task.index = i; + + task.prompt_tokens = std::move(tokenized_prompts[i]); + task.params = server_task::params_from_json_cmpl( + ctx_server.ctx, + ctx_server.params_base, + data); + task.id_selected_slot = json_value(data, "id_slot", -1); + + // OAI-compat + task.params.oaicompat = oaicompat; + task.params.oaicompat_cmpl_id = completion_id; + // oaicompat_model is already populated by params_from_json_cmpl + + tasks.push_back(std::move(task)); + } + } catch (const std::exception & e) { + res_error(res, format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST)); + return; } - } catch (const std::exception & e) { - res_error(res, format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST)); - return; - } - ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(std::move(tasks)); + task_ids = server_task::get_list_id(tasks); + ctx_server.queue_results.add_waiting_tasks(tasks); + ctx_server.queue_tasks.post(std::move(tasks)); + } bool stream = json_value(data, "stream", false); - const auto task_ids = server_task::get_list_id(tasks); if (!stream) { ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { @@ -4268,6 +4284,7 @@ int main(int argc, char ** argv) { // create and queue the task json responses = json::array(); bool error = false; + std::unordered_set task_ids; { std::vector tasks; for (size_t i = 0; i < tokenized_prompts.size(); i++) { @@ -4283,24 +4300,23 @@ int main(int argc, char ** argv) { tasks.push_back(std::move(task)); } + task_ids = server_task::get_list_id(tasks); ctx_server.queue_results.add_waiting_tasks(tasks); ctx_server.queue_tasks.post(std::move(tasks)); + } - // get the result - std::unordered_set task_ids = server_task::get_list_id(tasks); + // get the result + ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { + for (auto & res : results) { + GGML_ASSERT(dynamic_cast(res.get()) != nullptr); + responses.push_back(res->to_json()); + } + }, [&](const json & error_data) { + res_error(res, error_data); + error = true; + }, req.is_connection_closed); - ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { - for (auto & res : results) { - GGML_ASSERT(dynamic_cast(res.get()) != nullptr); - responses.push_back(res->to_json()); - } - }, [&](const json & error_data) { - res_error(res, error_data); - error = true; - }, req.is_connection_closed); - - ctx_server.queue_results.remove_waiting_task_ids(task_ids); - } + ctx_server.queue_results.remove_waiting_task_ids(task_ids); if (error) { return; @@ -4367,6 +4383,7 @@ int main(int argc, char ** argv) { // create and queue the task json responses = json::array(); bool error = false; + std::unordered_set task_ids; { std::vector tasks; std::vector tokenized_docs = tokenize_input_prompts(ctx_server.vocab, documents, /* add_special */ false, true); @@ -4379,23 +4396,21 @@ int main(int argc, char ** argv) { tasks.push_back(std::move(task)); } + task_ids = server_task::get_list_id(tasks); ctx_server.queue_results.add_waiting_tasks(tasks); ctx_server.queue_tasks.post(std::move(tasks)); - - // get the result - std::unordered_set task_ids = server_task::get_list_id(tasks); - - ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { - for (auto & res : results) { - GGML_ASSERT(dynamic_cast(res.get()) != nullptr); - responses.push_back(res->to_json()); - } - }, [&](const json & error_data) { - res_error(res, error_data); - error = true; - }, req.is_connection_closed); } + ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { + for (auto & res : results) { + GGML_ASSERT(dynamic_cast(res.get()) != nullptr); + responses.push_back(res->to_json()); + } + }, [&](const json & error_data) { + res_error(res, error_data); + error = true; + }, req.is_connection_closed); + if (error) { return; } @@ -4431,14 +4446,19 @@ int main(int argc, char ** argv) { res_error(res, format_error_response("Request body must be an array", ERROR_TYPE_INVALID_REQUEST)); return; } - server_task task(SERVER_TASK_TYPE_SET_LORA); - task.id = ctx_server.queue_tasks.get_new_id(); - task.set_lora = parse_lora_request(ctx_server.params_base.lora_adapters, body); - ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(std::move(task)); - server_task_result_ptr result = ctx_server.queue_results.recv(task.id); - ctx_server.queue_results.remove_waiting_task_id(task.id); + int task_id = ctx_server.queue_tasks.get_new_id(); + { + server_task task(SERVER_TASK_TYPE_SET_LORA); + task.id = ctx_server.queue_tasks.get_new_id(); + task.set_lora = parse_lora_request(ctx_server.params_base.lora_adapters, body); + ctx_server.queue_results.add_waiting_task_id(task_id); + ctx_server.queue_tasks.post(std::move(task)); + } + + // get the result + server_task_result_ptr result = ctx_server.queue_results.recv(task_id); + ctx_server.queue_results.remove_waiting_task_id(task_id); if (result->is_error()) { res_error(res, result->to_json()); From 240ea2451cf6986b1268c59e69d6a849701c86c6 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 15 Apr 2025 20:27:36 +0200 Subject: [PATCH 5/7] restore std::move --- examples/server/server.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 83bd0b8ae1871..3aa59e387c7a7 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1565,9 +1565,9 @@ struct server_queue { } QUE_DBG("new task, id = %d, front = %d\n", task.id, front); if (front) { - queue_tasks.push_front(task); + queue_tasks.push_front(std::move(task)); } else { - queue_tasks.push_back(task); + queue_tasks.push_back(std::move(task)); } condition_tasks.notify_one(); return task.id; @@ -1599,7 +1599,7 @@ struct server_queue { void defer(server_task && task) { std::unique_lock lock(mutex_tasks); QUE_DBG("defer task, id = %d\n", task.id); - queue_tasks_deferred.push_back(task); + queue_tasks_deferred.push_back(std::move(task)); condition_tasks.notify_one(); } @@ -4450,7 +4450,7 @@ int main(int argc, char ** argv) { int task_id = ctx_server.queue_tasks.get_new_id(); { server_task task(SERVER_TASK_TYPE_SET_LORA); - task.id = ctx_server.queue_tasks.get_new_id(); + task.id = task_id; task.set_lora = parse_lora_request(ctx_server.params_base.lora_adapters, body); ctx_server.queue_results.add_waiting_task_id(task_id); ctx_server.queue_tasks.post(std::move(task)); From 081d72d168e6acc41a431406801a4ea1260a823e Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 15 Apr 2025 21:19:49 +0200 Subject: [PATCH 6/7] fix task_id not set correctly --- examples/server/server.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 3aa59e387c7a7..528d4e6afb76b 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -3782,7 +3782,7 @@ int main(int argc, char ** argv) { int task_id = ctx_server.queue_tasks.get_new_id(); { server_task task(SERVER_TASK_TYPE_SLOT_SAVE); - task.id = ctx_server.queue_tasks.get_new_id(); + task.id = task_id; task.slot_action.slot_id = id_slot; task.slot_action.filename = filename; task.slot_action.filepath = filepath; @@ -3814,7 +3814,7 @@ int main(int argc, char ** argv) { int task_id = ctx_server.queue_tasks.get_new_id(); { server_task task(SERVER_TASK_TYPE_SLOT_RESTORE); - task.id = ctx_server.queue_tasks.get_new_id(); + task.id = task_id; task.slot_action.slot_id = id_slot; task.slot_action.filename = filename; task.slot_action.filepath = filepath; @@ -3839,7 +3839,7 @@ int main(int argc, char ** argv) { int task_id = ctx_server.queue_tasks.get_new_id(); { server_task task(SERVER_TASK_TYPE_SLOT_ERASE); - task.id = ctx_server.queue_tasks.get_new_id(); + task.id = task_id; task.slot_action.slot_id = id_slot; ctx_server.queue_results.add_waiting_task_id(task_id); From c64d6bc0228ad64dc12befc286c2b2efa3653e4c Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Fri, 18 Apr 2025 17:28:14 +0200 Subject: [PATCH 7/7] apply changes from suggestion Co-authored-by: ggerganov --- examples/server/server.cpp | 71 +++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 528d4e6afb76b..c580ec123299c 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1563,14 +1563,15 @@ struct server_queue { if (task.type == SERVER_TASK_TYPE_CANCEL) { cleanup_pending_task(task.id_target); } - QUE_DBG("new task, id = %d, front = %d\n", task.id, front); + const int task_id = task.id; + QUE_DBG("new task, id = %d, front = %d\n", task_id, front); if (front) { queue_tasks.push_front(std::move(task)); } else { queue_tasks.push_back(std::move(task)); } condition_tasks.notify_one(); - return task.id; + return task_id; } // multi-task version of post() @@ -2105,7 +2106,7 @@ struct server_context { return true; } - bool launch_slot_with_task(server_slot & slot, const server_task && task) { + bool launch_slot_with_task(server_slot & slot, server_task && task) { slot.reset(); slot.id_task = task.id; slot.index = task.index; @@ -2113,10 +2114,10 @@ struct server_context { slot.params = std::move(task.params); slot.prompt_tokens = std::move(task.prompt_tokens); - if (!are_lora_equal(task.params.lora, slot.lora)) { + if (!are_lora_equal(slot.params.lora, slot.lora)) { // if lora is changed, we cannot reuse cached tokens slot.cache_tokens.clear(); - slot.lora = task.params.lora; + slot.lora = slot.params.lora; } bool can_detokenize = can_be_detokenized(ctx, slot.prompt_tokens); @@ -3952,44 +3953,42 @@ int main(int argc, char ** argv) { auto completion_id = gen_chatcmplid(); std::unordered_set task_ids; - { + try { std::vector tasks; - try { - const auto & prompt = data.at("prompt"); - // TODO: this log can become very long, put it behind a flag or think about a more compact format - //SRV_DBG("Prompt: %s\n", prompt.is_string() ? prompt.get().c_str() : prompt.dump(2).c_str()); - - std::vector tokenized_prompts = tokenize_input_prompts(ctx_server.vocab, prompt, true, true); - tasks.reserve(tokenized_prompts.size()); - for (size_t i = 0; i < tokenized_prompts.size(); i++) { - server_task task = server_task(type); - - task.id = ctx_server.queue_tasks.get_new_id(); - task.index = i; - - task.prompt_tokens = std::move(tokenized_prompts[i]); - task.params = server_task::params_from_json_cmpl( - ctx_server.ctx, - ctx_server.params_base, - data); - task.id_selected_slot = json_value(data, "id_slot", -1); - - // OAI-compat - task.params.oaicompat = oaicompat; - task.params.oaicompat_cmpl_id = completion_id; - // oaicompat_model is already populated by params_from_json_cmpl - - tasks.push_back(std::move(task)); - } - } catch (const std::exception & e) { - res_error(res, format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST)); - return; + const auto & prompt = data.at("prompt"); + // TODO: this log can become very long, put it behind a flag or think about a more compact format + //SRV_DBG("Prompt: %s\n", prompt.is_string() ? prompt.get().c_str() : prompt.dump(2).c_str()); + + std::vector tokenized_prompts = tokenize_input_prompts(ctx_server.vocab, prompt, true, true); + tasks.reserve(tokenized_prompts.size()); + for (size_t i = 0; i < tokenized_prompts.size(); i++) { + server_task task = server_task(type); + + task.id = ctx_server.queue_tasks.get_new_id(); + task.index = i; + + task.prompt_tokens = std::move(tokenized_prompts[i]); + task.params = server_task::params_from_json_cmpl( + ctx_server.ctx, + ctx_server.params_base, + data); + task.id_selected_slot = json_value(data, "id_slot", -1); + + // OAI-compat + task.params.oaicompat = oaicompat; + task.params.oaicompat_cmpl_id = completion_id; + // oaicompat_model is already populated by params_from_json_cmpl + + tasks.push_back(std::move(task)); } task_ids = server_task::get_list_id(tasks); ctx_server.queue_results.add_waiting_tasks(tasks); ctx_server.queue_tasks.post(std::move(tasks)); + } catch (const std::exception & e) { + res_error(res, format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST)); + return; } bool stream = json_value(data, "stream", false);