From 930ff62792f5ea3be91270b2ac650d646dbd459b Mon Sep 17 00:00:00 2001 From: David Grove Date: Mon, 29 May 2017 16:45:44 -0400 Subject: [PATCH] Convert dispatch_workq from legacy priorities to qos Update dispatch_workq (DISPATCH_USE_INTERNAL_WORKQUEUE) to use QoS-based constants instead of legacy priorities. Enhance monitoring code to count runnable threads from highest QoS to lowest and to suppress voluntary oversubscription for lower QoS queues if the total count of runnable worker threads is already over the desired threshold. --- src/event/workqueue.c | 64 +++++++++++----------------------- src/event/workqueue_internal.h | 14 ++------ src/queue.c | 36 +++++++++++++++++-- 3 files changed, 56 insertions(+), 58 deletions(-) diff --git a/src/event/workqueue.c b/src/event/workqueue.c index 0b9bc0ac5..dbc65938c 100644 --- a/src/event/workqueue.c +++ b/src/event/workqueue.c @@ -69,7 +69,7 @@ typedef struct dispatch_workq_monitor_s { int num_registered_tids; } dispatch_workq_monitor_s, *dispatch_workq_monitor_t; -static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES]; +static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX]; #pragma mark Implementation of the monitoring subsystem. @@ -80,12 +80,13 @@ static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED); static dispatch_once_t _dispatch_workq_init_once_pred; void -_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority) +_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls) { dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once); #if HAVE_DISPATCH_WORKQ_MONITORING - dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority]; + dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls); + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1]; dispatch_assert(mon->dq == root_q); dispatch_tid tid = _dispatch_thread_getspecific(tid); _dispatch_unfair_lock_lock(&mon->registered_tid_lock); @@ -97,10 +98,11 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority) } void -_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority) +_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls) { #if HAVE_DISPATCH_WORKQ_MONITORING - dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority]; + dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls); + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1]; dispatch_tid tid = _dispatch_thread_getspecific(tid); _dispatch_unfair_lock_lock(&mon->registered_tid_lock); for (int i = 0; i < mon->num_registered_tids; i++) { @@ -177,16 +179,10 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon) static void _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) { - // TODO: Once we switch away from the legacy priorities to - // newer QoS, we can loop in order of decreasing QoS - // and track the total number of runnable threads seen - // across pools. We can then use that number to - // implement a global policy where low QoS queues - // are not eligible for over-subscription if the higher - // QoS queues have already consumed the target - // number of threads. - for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) { - dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i]; + int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus); + int global_runnable = 0; + for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) { + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1]; dispatch_queue_t dq = mon->dq; if (!_dispatch_queue_class_probe(dq)) { @@ -198,8 +194,10 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) _dispatch_debug("workq: %s has %d runnable wokers (target is %d)", dq->dq_label, mon->num_runnable, mon->target_runnable); + global_runnable += mon->num_runnable; + if (mon->num_runnable == 0) { - // We are below target, and no worker is runnable. + // We have work, but no worker is runnable. // It is likely the program is stalled. Therefore treat // this as if dq were an overcommit queue and call poke // with the limit being the maximum number of workers for dq. @@ -207,7 +205,9 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) _dispatch_debug("workq: %s has no runnable workers; poking with floor %d", dq->dq_label, floor); _dispatch_global_queue_poke(dq, 1, floor); - } else if (mon->num_runnable < mon->target_runnable) { + global_runnable += 1; // account for poke in global estimate + } else if (mon->num_runnable < mon->target_runnable && + global_runnable < global_soft_max) { // We are below target, but some workers are still runnable. // We want to oversubscribe to hit the desired load target. // However, this under-utilization may be transitory so set the @@ -218,42 +218,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) _dispatch_debug("workq: %s under utilization target; poking with floor %d", dq->dq_label, floor); _dispatch_global_queue_poke(dq, 1, floor); + global_runnable += 1; // account for poke in global estimate } } } #endif // HAVE_DISPATCH_WORKQ_MONITORING - -// temporary until we switch over to QoS based interface. -static dispatch_queue_t -get_root_queue_from_legacy_priority(int priority) -{ - switch (priority) { - case WORKQ_HIGH_PRIOQUEUE: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS]; - case WORKQ_DEFAULT_PRIOQUEUE: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS]; - case WORKQ_LOW_PRIOQUEUE: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS]; - case WORKQ_BG_PRIOQUEUE: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]; - case WORKQ_BG_PRIOQUEUE_CONDITIONAL: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS]; - case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL: - return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]; - default: - return NULL; - } -} - static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED) { #if HAVE_DISPATCH_WORKQ_MONITORING int target_runnable = dispatch_hw_config(active_cpus); - for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) { - dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i]; - mon->dq = get_root_queue_from_legacy_priority(i); + for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) { + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1]; + mon->dq = _dispatch_get_root_queue(i, false); void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid)); mon->registered_tids = buf; mon->target_runnable = target_runnable; diff --git a/src/event/workqueue_internal.h b/src/event/workqueue_internal.h index 012e554fb..02b6903e9 100644 --- a/src/event/workqueue_internal.h +++ b/src/event/workqueue_internal.h @@ -27,20 +27,10 @@ #ifndef __DISPATCH_WORKQUEUE_INTERNAL__ #define __DISPATCH_WORKQUEUE_INTERNAL__ -/* Work queue priority attributes. */ -#define WORKQ_HIGH_PRIOQUEUE 0 -#define WORKQ_DEFAULT_PRIOQUEUE 1 -#define WORKQ_LOW_PRIOQUEUE 2 -#define WORKQ_BG_PRIOQUEUE 3 -#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4 -#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5 - -#define WORKQ_NUM_PRIORITIES 6 - #define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255 -void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority); -void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority); +void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls); +void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls); #if defined(__linux__) #define HAVE_DISPATCH_WORKQ_MONITORING 1 diff --git a/src/queue.c b/src/queue.c index 6d74b7972..10291856e 100644 --- a/src/queue.c +++ b/src/queue.c @@ -37,6 +37,9 @@ !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK) #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1 #endif +#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK +#define DISPATCH_USE_WORKQ_PRIORITY 1 +#endif #if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \ !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK #define pthread_workqueue_t void* @@ -154,7 +157,10 @@ struct dispatch_root_queue_context_s { int volatile dgq_pending; #if DISPATCH_USE_WORKQUEUES qos_class_t dgq_qos; - int dgq_wq_priority, dgq_wq_options; +#if DISPATCH_USE_WORKQ_PRIORITY + int dgq_wq_priority; +#endif + int dgq_wq_options; #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL pthread_workqueue_t dgq_kworkqueue; #endif @@ -182,7 +188,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_MAINTENANCE, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -193,7 +201,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_MAINTENANCE, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -204,7 +214,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_BACKGROUND, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -215,7 +227,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_BACKGROUND, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -226,7 +240,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_UTILITY, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -237,7 +253,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_UTILITY, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -248,7 +266,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_DEFAULT, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -259,7 +279,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_DEFAULT, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -270,7 +292,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_USER_INITIATED, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -281,7 +305,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_USER_INITIATED, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -292,7 +318,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_USER_INTERACTIVE, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL, +#endif .dgq_wq_options = 0, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -303,7 +331,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{ #if DISPATCH_USE_WORKQUEUES .dgq_qos = QOS_CLASS_USER_INTERACTIVE, +#if DISPATCH_USE_WORKQ_PRIORITY .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL, +#endif .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, #endif #if DISPATCH_ENABLE_THREAD_POOL @@ -5511,7 +5541,7 @@ _dispatch_worker_thread(void *context) bool manager = (dq == &_dispatch_mgr_root_queue); bool monitored = !(overcommit || manager); if (monitored) { - _dispatch_workq_worker_register(dq, qc->dgq_wq_priority); + _dispatch_workq_worker_register(dq, qc->dgq_qos); } #endif @@ -5525,7 +5555,7 @@ _dispatch_worker_thread(void *context) #if DISPATCH_USE_INTERNAL_WORKQUEUE if (monitored) { - _dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority); + _dispatch_workq_worker_unregister(dq, qc->dgq_qos); } #endif (void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);