@@ -1552,11 +1552,11 @@ struct server_queue {
1552
1552
std::condition_variable condition_tasks;
1553
1553
1554
1554
// callback functions
1555
- std::function<void (server_task&)> callback_new_task;
1555
+ std::function<void (server_task&& )> callback_new_task;
1556
1556
std::function<void (void )> callback_update_slots;
1557
1557
1558
1558
// Add a new task to the end of the queue
1559
- int post (server_task & task, bool front = false ) {
1559
+ int post (server_task && task, bool front = false ) {
1560
1560
std::unique_lock<std::mutex> lock (mutex_tasks);
1561
1561
GGML_ASSERT (task.id != -1 );
1562
1562
// if this is cancel task make sure to clean up pending tasks
@@ -1565,16 +1565,16 @@ struct server_queue {
1565
1565
}
1566
1566
QUE_DBG (" new task, id = %d, front = %d\n " , task.id , front);
1567
1567
if (front) {
1568
- queue_tasks.push_front (std::move ( task) );
1568
+ queue_tasks.push_front (task);
1569
1569
} else {
1570
- queue_tasks.push_back (std::move ( task) );
1570
+ queue_tasks.push_back (task);
1571
1571
}
1572
1572
condition_tasks.notify_one ();
1573
1573
return task.id ;
1574
1574
}
1575
1575
1576
1576
// multi-task version of post()
1577
- int post (std::vector<server_task> & tasks, bool front = false ) {
1577
+ int post (std::vector<server_task> && tasks, bool front = false ) {
1578
1578
std::unique_lock<std::mutex> lock (mutex_tasks);
1579
1579
for (auto & task : tasks) {
1580
1580
if (task.id == -1 ) {
@@ -1596,10 +1596,10 @@ struct server_queue {
1596
1596
}
1597
1597
1598
1598
// Add a new task, but defer until one slot is available
1599
- void defer (server_task & task) {
1599
+ void defer (server_task && task) {
1600
1600
std::unique_lock<std::mutex> lock (mutex_tasks);
1601
1601
QUE_DBG (" defer task, id = %d\n " , task.id );
1602
- queue_tasks_deferred.push_back (std::move ( task) );
1602
+ queue_tasks_deferred.push_back (task);
1603
1603
condition_tasks.notify_one ();
1604
1604
}
1605
1605
@@ -1611,7 +1611,7 @@ struct server_queue {
1611
1611
}
1612
1612
1613
1613
// Register function to process a new task
1614
- void on_new_task (std::function<void (server_task&)> callback) {
1614
+ void on_new_task (std::function<void (server_task&& )> callback) {
1615
1615
callback_new_task = std::move (callback);
1616
1616
}
1617
1617
@@ -1665,7 +1665,7 @@ struct server_queue {
1665
1665
lock.unlock ();
1666
1666
1667
1667
QUE_DBG (" processing task, id = %d\n " , task.id );
1668
- callback_new_task (task);
1668
+ callback_new_task (std::move ( task) );
1669
1669
}
1670
1670
1671
1671
// all tasks in the current loop is processed, slots data is now ready
@@ -2105,7 +2105,7 @@ struct server_context {
2105
2105
return true ;
2106
2106
}
2107
2107
2108
- bool launch_slot_with_task (server_slot & slot, const server_task & task) {
2108
+ bool launch_slot_with_task (server_slot & slot, const server_task && task) {
2109
2109
slot.reset ();
2110
2110
slot.id_task = task.id ;
2111
2111
slot.index = task.index ;
@@ -2550,7 +2550,7 @@ struct server_context {
2550
2550
cancel_tasks.push_back (std::move (task));
2551
2551
}
2552
2552
// push to beginning of the queue, so it has highest priority
2553
- queue_tasks.post (cancel_tasks, true );
2553
+ queue_tasks.post (std::move ( cancel_tasks) , true );
2554
2554
}
2555
2555
2556
2556
// receive the results from task(s)
@@ -2637,7 +2637,7 @@ struct server_context {
2637
2637
// Functions to process the task
2638
2638
//
2639
2639
2640
- void process_single_task (server_task & task) {
2640
+ void process_single_task (server_task && task) {
2641
2641
switch (task.type ) {
2642
2642
case SERVER_TASK_TYPE_COMPLETION:
2643
2643
case SERVER_TASK_TYPE_INFILL:
@@ -2651,17 +2651,17 @@ struct server_context {
2651
2651
if (slot == nullptr ) {
2652
2652
// if no slot is available, we defer this task for processing later
2653
2653
SRV_DBG (" no slot is available, defer task, id_task = %d\n " , task.id );
2654
- queue_tasks.defer (task);
2654
+ queue_tasks.defer (std::move ( task) );
2655
2655
break ;
2656
2656
}
2657
2657
if (slot->is_processing ()) {
2658
2658
// if requested slot is unavailable, we defer this task for processing later
2659
2659
SRV_DBG (" requested slot is unavailable, defer task, id_task = %d\n " , task.id );
2660
- queue_tasks.defer (task);
2660
+ queue_tasks.defer (std::move ( task) );
2661
2661
break ;
2662
2662
}
2663
2663
2664
- if (!launch_slot_with_task (*slot, task)) {
2664
+ if (!launch_slot_with_task (*slot, std::move ( task) )) {
2665
2665
SRV_ERR (" failed to launch slot with task, id_task = %d\n " , task.id );
2666
2666
break ;
2667
2667
}
@@ -2740,7 +2740,7 @@ struct server_context {
2740
2740
if (slot->is_processing ()) {
2741
2741
// if requested slot is unavailable, we defer this task for processing later
2742
2742
SRV_DBG (" requested slot is unavailable, defer task, id_task = %d\n " , task.id );
2743
- queue_tasks.defer (task);
2743
+ queue_tasks.defer (std::move ( task) );
2744
2744
break ;
2745
2745
}
2746
2746
@@ -2776,7 +2776,7 @@ struct server_context {
2776
2776
if (slot->is_processing ()) {
2777
2777
// if requested slot is unavailable, we defer this task for processing later
2778
2778
SRV_DBG (" requested slot is unavailable, defer task, id_task = %d\n " , task.id );
2779
- queue_tasks.defer (task);
2779
+ queue_tasks.defer (std::move ( task) );
2780
2780
break ;
2781
2781
}
2782
2782
@@ -2819,7 +2819,7 @@ struct server_context {
2819
2819
if (slot->is_processing ()) {
2820
2820
// if requested slot is unavailable, we defer this task for processing later
2821
2821
SRV_DBG (" requested slot is unavailable, defer task, id_task = %d\n " , task.id );
2822
- queue_tasks.defer (task);
2822
+ queue_tasks.defer (std::move ( task) );
2823
2823
break ;
2824
2824
}
2825
2825
@@ -2871,7 +2871,7 @@ struct server_context {
2871
2871
2872
2872
server_task task (SERVER_TASK_TYPE_NEXT_RESPONSE);
2873
2873
task.id = queue_tasks.get_new_id ();
2874
- queue_tasks.post (task);
2874
+ queue_tasks.post (std::move ( task) );
2875
2875
}
2876
2876
2877
2877
// apply context-shift if needed
@@ -3636,7 +3636,7 @@ int main(int argc, char ** argv) {
3636
3636
server_task task (SERVER_TASK_TYPE_METRICS);
3637
3637
task.id = ctx_server.queue_tasks .get_new_id ();
3638
3638
ctx_server.queue_results .add_waiting_task_id (task.id );
3639
- ctx_server.queue_tasks .post (task, true ); // high-priority task
3639
+ ctx_server.queue_tasks .post (std::move ( task) , true ); // high-priority task
3640
3640
3641
3641
// get the result
3642
3642
server_task_result_ptr result = ctx_server.queue_results .recv (task.id );
@@ -3674,7 +3674,7 @@ int main(int argc, char ** argv) {
3674
3674
task.metrics_reset_bucket = true ;
3675
3675
3676
3676
ctx_server.queue_results .add_waiting_task_id (task.id );
3677
- ctx_server.queue_tasks .post (task, true ); // high-priority task
3677
+ ctx_server.queue_tasks .post (std::move ( task) , true ); // high-priority task
3678
3678
3679
3679
// get the result
3680
3680
server_task_result_ptr result = ctx_server.queue_results .recv (task.id );
@@ -3782,7 +3782,7 @@ int main(int argc, char ** argv) {
3782
3782
task.slot_action .filepath = filepath;
3783
3783
3784
3784
ctx_server.queue_results .add_waiting_task_id (task.id );
3785
- ctx_server.queue_tasks .post (task);
3785
+ ctx_server.queue_tasks .post (std::move ( task) );
3786
3786
3787
3787
server_task_result_ptr result = ctx_server.queue_results .recv (task.id );
3788
3788
ctx_server.queue_results .remove_waiting_task_id (task.id );
@@ -3811,7 +3811,7 @@ int main(int argc, char ** argv) {
3811
3811
task.slot_action .filepath = filepath;
3812
3812
3813
3813
ctx_server.queue_results .add_waiting_task_id (task.id );
3814
- ctx_server.queue_tasks .post (task);
3814
+ ctx_server.queue_tasks .post (std::move ( task) );
3815
3815
3816
3816
server_task_result_ptr result = ctx_server.queue_results .recv (task.id );
3817
3817
ctx_server.queue_results .remove_waiting_task_id (task.id );
@@ -3831,7 +3831,7 @@ int main(int argc, char ** argv) {
3831
3831
task.slot_action .slot_id = id_slot;
3832
3832
3833
3833
ctx_server.queue_results .add_waiting_task_id (task.id );
3834
- ctx_server.queue_tasks .post (task);
3834
+ ctx_server.queue_tasks .post (std::move ( task) );
3835
3835
3836
3836
server_task_result_ptr result = ctx_server.queue_results .recv (task.id );
3837
3837
ctx_server.queue_results .remove_waiting_task_id (task.id );
@@ -3973,7 +3973,7 @@ int main(int argc, char ** argv) {
3973
3973
}
3974
3974
3975
3975
ctx_server.queue_results .add_waiting_tasks (tasks);
3976
- ctx_server.queue_tasks .post (tasks);
3976
+ ctx_server.queue_tasks .post (std::move ( tasks) );
3977
3977
3978
3978
bool stream = json_value (data, " stream" , false );
3979
3979
const auto task_ids = server_task::get_list_id (tasks);
@@ -4284,7 +4284,7 @@ int main(int argc, char ** argv) {
4284
4284
}
4285
4285
4286
4286
ctx_server.queue_results .add_waiting_tasks (tasks);
4287
- ctx_server.queue_tasks .post (tasks);
4287
+ ctx_server.queue_tasks .post (std::move ( tasks) );
4288
4288
4289
4289
// get the result
4290
4290
std::unordered_set<int > task_ids = server_task::get_list_id (tasks);
@@ -4380,7 +4380,7 @@ int main(int argc, char ** argv) {
4380
4380
}
4381
4381
4382
4382
ctx_server.queue_results .add_waiting_tasks (tasks);
4383
- ctx_server.queue_tasks .post (tasks);
4383
+ ctx_server.queue_tasks .post (std::move ( tasks) );
4384
4384
4385
4385
// get the result
4386
4386
std::unordered_set<int > task_ids = server_task::get_list_id (tasks);
@@ -4435,7 +4435,7 @@ int main(int argc, char ** argv) {
4435
4435
task.id = ctx_server.queue_tasks .get_new_id ();
4436
4436
task.set_lora = parse_lora_request (ctx_server.params_base .lora_adapters , body);
4437
4437
ctx_server.queue_results .add_waiting_task_id (task.id );
4438
- ctx_server.queue_tasks .post (task);
4438
+ ctx_server.queue_tasks .post (std::move ( task) );
4439
4439
4440
4440
server_task_result_ptr result = ctx_server.queue_results .recv (task.id );
4441
4441
ctx_server.queue_results .remove_waiting_task_id (task.id );
@@ -4582,8 +4582,8 @@ int main(int argc, char ** argv) {
4582
4582
common_chat_templates_source (ctx_server.chat_templates .get ()),
4583
4583
common_chat_format_example (ctx_server.chat_templates .get (), ctx_server.params_base .use_jinja ).c_str ());
4584
4584
4585
- ctx_server.queue_tasks .on_new_task ([&ctx_server](server_task & task) {
4586
- ctx_server.process_single_task (task);
4585
+ ctx_server.queue_tasks .on_new_task ([&ctx_server](server_task && task) {
4586
+ ctx_server.process_single_task (std::move ( task) );
4587
4587
});
4588
4588
4589
4589
ctx_server.queue_tasks .on_update_slots ([&ctx_server]() {
0 commit comments