Skip to content

Convert dispatch_workq from legacy priorities to qos #253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 21 additions & 43 deletions src/event/workqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ typedef struct dispatch_workq_monitor_s {
int num_registered_tids;
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;

static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES];
static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];

#pragma mark Implementation of the monitoring subsystem.

Expand All @@ -80,12 +80,13 @@ static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
static dispatch_once_t _dispatch_workq_init_once_pred;

void
_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
{
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);

#if HAVE_DISPATCH_WORKQ_MONITORING
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
dispatch_assert(mon->dq == root_q);
dispatch_tid tid = _dispatch_thread_getspecific(tid);
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
Expand All @@ -97,10 +98,11 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
}

void
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority)
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
{
#if HAVE_DISPATCH_WORKQ_MONITORING
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
dispatch_tid tid = _dispatch_thread_getspecific(tid);
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
for (int i = 0; i < mon->num_registered_tids; i++) {
Expand Down Expand Up @@ -177,16 +179,10 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
static void
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
{
// TODO: Once we switch away from the legacy priorities to
// newer QoS, we can loop in order of decreasing QoS
// and track the total number of runnable threads seen
// across pools. We can then use that number to
// implement a global policy where low QoS queues
// are not eligible for over-subscription if the higher
// QoS queues have already consumed the target
// number of threads.
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus);
int global_runnable = 0;
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
dispatch_queue_t dq = mon->dq;

if (!_dispatch_queue_class_probe(dq)) {
Expand All @@ -198,16 +194,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
dq->dq_label, mon->num_runnable, mon->target_runnable);

global_runnable += mon->num_runnable;

if (mon->num_runnable == 0) {
// We are below target, and no worker is runnable.
// We have work, but no worker is runnable.
// It is likely the program is stalled. Therefore treat
// this as if dq were an overcommit queue and call poke
// with the limit being the maximum number of workers for dq.
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
dq->dq_label, floor);
_dispatch_global_queue_poke(dq, 1, floor);
} else if (mon->num_runnable < mon->target_runnable) {
global_runnable += 1; // account for poke in global estimate
} else if (mon->num_runnable < mon->target_runnable &&
global_runnable < global_soft_max) {
// We are below target, but some workers are still runnable.
// We want to oversubscribe to hit the desired load target.
// However, this under-utilization may be transitory so set the
Expand All @@ -218,42 +218,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
dq->dq_label, floor);
_dispatch_global_queue_poke(dq, 1, floor);
global_runnable += 1; // account for poke in global estimate
}
}
}
#endif // HAVE_DISPATCH_WORKQ_MONITORING


// temporary until we switch over to QoS based interface.
static dispatch_queue_t
get_root_queue_from_legacy_priority(int priority)
{
switch (priority) {
case WORKQ_HIGH_PRIOQUEUE:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
case WORKQ_DEFAULT_PRIOQUEUE:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
case WORKQ_LOW_PRIOQUEUE:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
case WORKQ_BG_PRIOQUEUE:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS];
case WORKQ_BG_PRIOQUEUE_CONDITIONAL:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS];
case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS];
default:
return NULL;
}
}

static void
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
{
#if HAVE_DISPATCH_WORKQ_MONITORING
int target_runnable = dispatch_hw_config(active_cpus);
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
mon->dq = get_root_queue_from_legacy_priority(i);
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
mon->dq = _dispatch_get_root_queue(i, false);
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
mon->registered_tids = buf;
mon->target_runnable = target_runnable;
Expand Down
14 changes: 2 additions & 12 deletions src/event/workqueue_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,12 @@
#ifndef __DISPATCH_WORKQUEUE_INTERNAL__
#define __DISPATCH_WORKQUEUE_INTERNAL__

/* Work queue priority attributes. */
#define WORKQ_HIGH_PRIOQUEUE 0
#define WORKQ_DEFAULT_PRIOQUEUE 1
#define WORKQ_LOW_PRIOQUEUE 2
#define WORKQ_BG_PRIOQUEUE 3
#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4
#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5

#define WORKQ_NUM_PRIORITIES 6

#define WORKQ_ADDTHREADS_OPTION_OVERCOMMIT 0x1

#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255

void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority);
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority);
void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls);
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls);

#if defined(__linux__)
#define HAVE_DISPATCH_WORKQ_MONITORING 1
Expand Down
36 changes: 33 additions & 3 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
#endif
#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
#define DISPATCH_USE_WORKQ_PRIORITY 1
#endif
#if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
#define pthread_workqueue_t void*
Expand Down Expand Up @@ -158,7 +161,10 @@ struct dispatch_root_queue_context_s {
int volatile dgq_pending;
#if DISPATCH_USE_WORKQUEUES
qos_class_t dgq_qos;
int dgq_wq_priority, dgq_wq_options;
#if DISPATCH_USE_WORKQ_PRIORITY
int dgq_wq_priority;
#endif
int dgq_wq_options;
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
pthread_workqueue_t dgq_kworkqueue;
#endif
Expand Down Expand Up @@ -186,7 +192,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_MAINTENANCE,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -197,7 +205,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_MAINTENANCE,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -208,7 +218,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_BACKGROUND,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -219,7 +231,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_BACKGROUND,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -230,7 +244,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_UTILITY,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -241,7 +257,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_UTILITY,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -252,7 +270,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_DEFAULT,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -263,7 +283,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_DEFAULT,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -274,7 +296,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INITIATED,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -285,7 +309,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INITIATED,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -296,7 +322,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand All @@ -307,7 +335,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
Expand Down Expand Up @@ -5809,7 +5839,7 @@ _dispatch_worker_thread(void *context)
bool manager = (dq == &_dispatch_mgr_root_queue);
bool monitored = !(overcommit || manager);
if (monitored) {
_dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
_dispatch_workq_worker_register(dq, qc->dgq_qos);
}
#endif

Expand All @@ -5823,7 +5853,7 @@ _dispatch_worker_thread(void *context)

#if DISPATCH_USE_INTERNAL_WORKQUEUE
if (monitored) {
_dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority);
_dispatch_workq_worker_unregister(dq, qc->dgq_qos);
}
#endif
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);
Expand Down