diff --git a/src/event/workqueue.c b/src/event/workqueue.c index 0b9bc0ac5..dbc65938c 100644 --- a/src/event/workqueue.c +++ b/src/event/workqueue.c @@ -69,7 +69,7 @@ typedef struct dispatch_workq_monitor_s { int num_registered_tids; } dispatch_workq_monitor_s, *dispatch_workq_monitor_t; -static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES]; +static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX]; #pragma mark Implementation of the monitoring subsystem. @@ -80,12 +80,13 @@ static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED); static dispatch_once_t _dispatch_workq_init_once_pred; void -_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority) +_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls) { dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once); #if HAVE_DISPATCH_WORKQ_MONITORING - dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority]; + dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls); + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1]; dispatch_assert(mon->dq == root_q); dispatch_tid tid = _dispatch_thread_getspecific(tid); _dispatch_unfair_lock_lock(&mon->registered_tid_lock); @@ -97,10 +98,11 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority) } void -_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority) +_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls) { #if HAVE_DISPATCH_WORKQ_MONITORING - dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority]; + dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls); + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1]; dispatch_tid tid = _dispatch_thread_getspecific(tid); _dispatch_unfair_lock_lock(&mon->registered_tid_lock); for (int i = 0; i < mon->num_registered_tids; i++) { @@ -177,16 +179,10 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon) static void _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) { - // TODO: Once we switch away from the legacy priorities to - // newer QoS, we can loop in order of decreasing QoS - // and track the total number of runnable threads seen - // across pools. We can then use that number to - // implement a global policy where low QoS queues - // are not eligible for over-subscription if the higher - // QoS queues have already consumed the target - // number of threads. - for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) { - dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i]; + int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus); + int global_runnable = 0; + for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) { + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1]; dispatch_queue_t dq = mon->dq; if (!_dispatch_queue_class_probe(dq)) { @@ -198,8 +194,10 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) _dispatch_debug("workq: %s has %d runnable wokers (target is %d)", dq->dq_label, mon->num_runnable, mon->target_runnable); + global_runnable += mon->num_runnable; + if (mon->num_runnable == 0) { - // We are below target, and no worker is runnable. + // We have work, but no worker is runnable. // It is likely the program is stalled. Therefore treat // this as if dq were an overcommit queue and call poke // with the limit being the maximum number of workers for dq. @@ -207,7 +205,9 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) _dispatch_debug("workq: %s has no runnable workers; poking with floor %d", dq->dq_label, floor); _dispatch_global_queue_poke(dq, 1, floor); - } else if (mon->num_runnable < mon->target_runnable) { + global_runnable += 1; // account for poke in global estimate + } else if (mon->num_runnable < mon->target_runnable && + global_runnable < global_soft_max) { // We are below target, but some workers are still runnable. // We want to oversubscribe to hit the desired load target. // However, this under-utilization may be transitory so set the @@ -218,42 +218,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) _dispatch_debug("workq: %s under utilization target; poking with floor %d", dq->dq_label, floor); _dispatch_global_queue_poke(dq, 1, floor); + global_runnable += 1; // account for poke in global estimate } } } #endif // HAVE_DISPATCH_WORKQ_MONITORING - -// temporary until we switch over to QoS based interface. -static dispatch_queue_t -get_root_queue_from_legacy_priority(int priority) -{ - switch (priority) { - case WORKQ_HIGH_PRIOQUEUE: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS]; - case WORKQ_DEFAULT_PRIOQUEUE: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS]; - case WORKQ_LOW_PRIOQUEUE: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS]; - case WORKQ_BG_PRIOQUEUE: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]; - case WORKQ_BG_PRIOQUEUE_CONDITIONAL: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS]; - case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]; - default: - return NULL; - } -} - static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED) { #if HAVE_DISPATCH_WORKQ_MONITORING int target_runnable = dispatch_hw_config(active_cpus); - for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) { - dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i]; - mon->dq = get_root_queue_from_legacy_priority(i); + for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) { + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1]; + mon->dq = _dispatch_get_root_queue(i, false); void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid)); mon->registered_tids = buf; mon->target_runnable = target_runnable; diff --git a/src/event/workqueue_internal.h b/src/event/workqueue_internal.h index 9f8fc3adb..94dfe4e36 100644 --- a/src/event/workqueue_internal.h +++ b/src/event/workqueue_internal.h @@ -27,22 +27,12 @@ #ifndef __DISPATCH_WORKQUEUE_INTERNAL__ #define __DISPATCH_WORKQUEUE_INTERNAL__ -/* Work queue priority attributes. */ -#define WORKQ_HIGH_PRIOQUEUE 0 -#define WORKQ_DEFAULT_PRIOQUEUE 1 -#define WORKQ_LOW_PRIOQUEUE 2 -#define WORKQ_BG_PRIOQUEUE 3 -#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4 -#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5 - -#define WORKQ_NUM_PRIORITIES 6 - #define WORKQ_ADDTHREADS_OPTION_OVERCOMMIT 0x1 #define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255 -void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority); -void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority); +void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls); +void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls); #if defined(__linux__) #define HAVE_DISPATCH_WORKQ_MONITORING 1 diff --git a/src/queue.c b/src/queue.c index 435ac96ea..4d506ef05 100644 --- a/src/queue.c +++ b/src/queue.c @@ -37,6 +37,9 @@ !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK) #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1 #endif +#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK +#define DISPATCH_USE_WORKQ_PRIORITY 1 +#endif #if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \ !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK #define pthread_workqueue_t void* @@ -158,7 +161,10 @@ struct dispatch_root_queue_context_s { int volatile dgq_pending; #if DISPATCH_USE_WORKQUEUES qos_class_t dgq_qos; - int dgq_wq_priority, dgq_wq_options; +#if DISPATCH_USE_WORKQ_PRIORITY + int dgq_wq_priority; +#endif + int dgq_wq_options; #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL pthread_workqueue_t dgq_kworkqueue; #endif @@ -186,7 +192,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_MAINTENANCE, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -197,7 +205,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_MAINTENANCE, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -208,7 +218,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_BACKGROUND, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -219,7 +231,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_BACKGROUND, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -230,7 +244,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_UTILITY, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -241,7 +257,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_UTILITY, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -252,7 +270,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_DEFAULT, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -263,7 +283,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_DEFAULT, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -274,7 +296,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_USER_INITIATED, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -285,7 +309,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_USER_INITIATED, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -296,7 +322,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_USER_INTERACTIVE, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -307,7 +335,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_USER_INTERACTIVE, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -5809,7 +5839,7 @@ _dispatch_worker_thread(void *context) bool manager = (dq == &_dispatch_mgr_root_queue); bool monitored = !(overcommit || manager); if (monitored) { - _dispatch_workq_worker_register(dq, qc->dgq_wq_priority); + _dispatch_workq_worker_register(dq, qc->dgq_qos); } #endif @@ -5823,7 +5853,7 @@ _dispatch_worker_thread(void *context) #if DISPATCH_USE_INTERNAL_WORKQUEUE if (monitored) { - _dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority); + _dispatch_workq_worker_unregister(dq, qc->dgq_qos); } #endif (void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);