Skip to content

Commit 930ff62

Browse files
committed
Convert dispatch_workq from legacy priorities to qos
Update dispatch_workq (DISPATCH_USE_INTERNAL_WORKQUEUE) to use QoS-based constants instead of legacy priorities. Enhance monitoring code to count runnable threads from highest QoS to lowest and to suppress voluntary oversubscription for lower QoS queues if the total count of runnable worker threads is already over the desired threshold.
1 parent acb1b1e commit 930ff62

File tree

3 files changed

+56
-58
lines changed

3 files changed

+56
-58
lines changed

src/event/workqueue.c

Lines changed: 21 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ typedef struct dispatch_workq_monitor_s {
6969
int num_registered_tids;
7070
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
7171

72-
static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES];
72+
static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
7373

7474
#pragma mark Implementation of the monitoring subsystem.
7575

@@ -80,12 +80,13 @@ static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
8080
static dispatch_once_t _dispatch_workq_init_once_pred;
8181

8282
void
83-
_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
83+
_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
8484
{
8585
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
8686

8787
#if HAVE_DISPATCH_WORKQ_MONITORING
88-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
88+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
89+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
8990
dispatch_assert(mon->dq == root_q);
9091
dispatch_tid tid = _dispatch_thread_getspecific(tid);
9192
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
@@ -97,10 +98,11 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
9798
}
9899

99100
void
100-
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority)
101+
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
101102
{
102103
#if HAVE_DISPATCH_WORKQ_MONITORING
103-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
104+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
105+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
104106
dispatch_tid tid = _dispatch_thread_getspecific(tid);
105107
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
106108
for (int i = 0; i < mon->num_registered_tids; i++) {
@@ -177,16 +179,10 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
177179
static void
178180
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
179181
{
180-
// TODO: Once we switch away from the legacy priorities to
181-
// newer QoS, we can loop in order of decreasing QoS
182-
// and track the total number of runnable threads seen
183-
// across pools. We can then use that number to
184-
// implement a global policy where low QoS queues
185-
// are not eligible for over-subscription if the higher
186-
// QoS queues have already consumed the target
187-
// number of threads.
188-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
189-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
182+
int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus);
183+
int global_runnable = 0;
184+
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
185+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
190186
dispatch_queue_t dq = mon->dq;
191187

192188
if (!_dispatch_queue_class_probe(dq)) {
@@ -198,16 +194,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
198194
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
199195
dq->dq_label, mon->num_runnable, mon->target_runnable);
200196

197+
global_runnable += mon->num_runnable;
198+
201199
if (mon->num_runnable == 0) {
202-
// We are below target, and no worker is runnable.
200+
// We have work, but no worker is runnable.
203201
// It is likely the program is stalled. Therefore treat
204202
// this as if dq were an overcommit queue and call poke
205203
// with the limit being the maximum number of workers for dq.
206204
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
207205
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
208206
dq->dq_label, floor);
209207
_dispatch_global_queue_poke(dq, 1, floor);
210-
} else if (mon->num_runnable < mon->target_runnable) {
208+
global_runnable += 1; // account for poke in global estimate
209+
} else if (mon->num_runnable < mon->target_runnable &&
210+
global_runnable < global_soft_max) {
211211
// We are below target, but some workers are still runnable.
212212
// We want to oversubscribe to hit the desired load target.
213213
// However, this under-utilization may be transitory so set the
@@ -218,42 +218,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
218218
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
219219
dq->dq_label, floor);
220220
_dispatch_global_queue_poke(dq, 1, floor);
221+
global_runnable += 1; // account for poke in global estimate
221222
}
222223
}
223224
}
224225
#endif // HAVE_DISPATCH_WORKQ_MONITORING
225226

226-
227-
// temporary until we switch over to QoS based interface.
228-
static dispatch_queue_t
229-
get_root_queue_from_legacy_priority(int priority)
230-
{
231-
switch (priority) {
232-
case WORKQ_HIGH_PRIOQUEUE:
233-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
234-
case WORKQ_DEFAULT_PRIOQUEUE:
235-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
236-
case WORKQ_LOW_PRIOQUEUE:
237-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
238-
case WORKQ_BG_PRIOQUEUE:
239-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS];
240-
case WORKQ_BG_PRIOQUEUE_CONDITIONAL:
241-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS];
242-
case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL:
243-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS];
244-
default:
245-
return NULL;
246-
}
247-
}
248-
249227
static void
250228
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
251229
{
252230
#if HAVE_DISPATCH_WORKQ_MONITORING
253231
int target_runnable = dispatch_hw_config(active_cpus);
254-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
255-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
256-
mon->dq = get_root_queue_from_legacy_priority(i);
232+
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
233+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
234+
mon->dq = _dispatch_get_root_queue(i, false);
257235
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
258236
mon->registered_tids = buf;
259237
mon->target_runnable = target_runnable;

src/event/workqueue_internal.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,10 @@
2727
#ifndef __DISPATCH_WORKQUEUE_INTERNAL__
2828
#define __DISPATCH_WORKQUEUE_INTERNAL__
2929

30-
/* Work queue priority attributes. */
31-
#define WORKQ_HIGH_PRIOQUEUE 0
32-
#define WORKQ_DEFAULT_PRIOQUEUE 1
33-
#define WORKQ_LOW_PRIOQUEUE 2
34-
#define WORKQ_BG_PRIOQUEUE 3
35-
#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4
36-
#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5
37-
38-
#define WORKQ_NUM_PRIORITIES 6
39-
4030
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
4131

42-
void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority);
43-
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority);
32+
void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls);
33+
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls);
4434

4535
#if defined(__linux__)
4636
#define HAVE_DISPATCH_WORKQ_MONITORING 1

src/queue.c

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
3838
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
3939
#endif
40+
#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
41+
#define DISPATCH_USE_WORKQ_PRIORITY 1
42+
#endif
4043
#if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
4144
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4245
#define pthread_workqueue_t void*
@@ -154,7 +157,10 @@ struct dispatch_root_queue_context_s {
154157
int volatile dgq_pending;
155158
#if DISPATCH_USE_WORKQUEUES
156159
qos_class_t dgq_qos;
157-
int dgq_wq_priority, dgq_wq_options;
160+
#if DISPATCH_USE_WORKQ_PRIORITY
161+
int dgq_wq_priority;
162+
#endif
163+
int dgq_wq_options;
158164
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
159165
pthread_workqueue_t dgq_kworkqueue;
160166
#endif
@@ -182,7 +188,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
182188
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
183189
#if DISPATCH_USE_WORKQUEUES
184190
.dgq_qos = QOS_CLASS_MAINTENANCE,
191+
#if DISPATCH_USE_WORKQ_PRIORITY
185192
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
193+
#endif
186194
.dgq_wq_options = 0,
187195
#endif
188196
#if DISPATCH_ENABLE_THREAD_POOL
@@ -193,7 +201,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
193201
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
194202
#if DISPATCH_USE_WORKQUEUES
195203
.dgq_qos = QOS_CLASS_MAINTENANCE,
204+
#if DISPATCH_USE_WORKQ_PRIORITY
196205
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
206+
#endif
197207
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
198208
#endif
199209
#if DISPATCH_ENABLE_THREAD_POOL
@@ -204,7 +214,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
204214
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
205215
#if DISPATCH_USE_WORKQUEUES
206216
.dgq_qos = QOS_CLASS_BACKGROUND,
217+
#if DISPATCH_USE_WORKQ_PRIORITY
207218
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
219+
#endif
208220
.dgq_wq_options = 0,
209221
#endif
210222
#if DISPATCH_ENABLE_THREAD_POOL
@@ -215,7 +227,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
215227
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
216228
#if DISPATCH_USE_WORKQUEUES
217229
.dgq_qos = QOS_CLASS_BACKGROUND,
230+
#if DISPATCH_USE_WORKQ_PRIORITY
218231
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
232+
#endif
219233
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
220234
#endif
221235
#if DISPATCH_ENABLE_THREAD_POOL
@@ -226,7 +240,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
226240
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
227241
#if DISPATCH_USE_WORKQUEUES
228242
.dgq_qos = QOS_CLASS_UTILITY,
243+
#if DISPATCH_USE_WORKQ_PRIORITY
229244
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
245+
#endif
230246
.dgq_wq_options = 0,
231247
#endif
232248
#if DISPATCH_ENABLE_THREAD_POOL
@@ -237,7 +253,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
237253
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
238254
#if DISPATCH_USE_WORKQUEUES
239255
.dgq_qos = QOS_CLASS_UTILITY,
256+
#if DISPATCH_USE_WORKQ_PRIORITY
240257
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
258+
#endif
241259
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
242260
#endif
243261
#if DISPATCH_ENABLE_THREAD_POOL
@@ -248,7 +266,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
248266
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
249267
#if DISPATCH_USE_WORKQUEUES
250268
.dgq_qos = QOS_CLASS_DEFAULT,
269+
#if DISPATCH_USE_WORKQ_PRIORITY
251270
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
271+
#endif
252272
.dgq_wq_options = 0,
253273
#endif
254274
#if DISPATCH_ENABLE_THREAD_POOL
@@ -259,7 +279,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
259279
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
260280
#if DISPATCH_USE_WORKQUEUES
261281
.dgq_qos = QOS_CLASS_DEFAULT,
282+
#if DISPATCH_USE_WORKQ_PRIORITY
262283
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
284+
#endif
263285
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
264286
#endif
265287
#if DISPATCH_ENABLE_THREAD_POOL
@@ -270,7 +292,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
270292
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
271293
#if DISPATCH_USE_WORKQUEUES
272294
.dgq_qos = QOS_CLASS_USER_INITIATED,
295+
#if DISPATCH_USE_WORKQ_PRIORITY
273296
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
297+
#endif
274298
.dgq_wq_options = 0,
275299
#endif
276300
#if DISPATCH_ENABLE_THREAD_POOL
@@ -281,7 +305,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
281305
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
282306
#if DISPATCH_USE_WORKQUEUES
283307
.dgq_qos = QOS_CLASS_USER_INITIATED,
308+
#if DISPATCH_USE_WORKQ_PRIORITY
284309
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
310+
#endif
285311
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
286312
#endif
287313
#if DISPATCH_ENABLE_THREAD_POOL
@@ -292,7 +318,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
292318
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
293319
#if DISPATCH_USE_WORKQUEUES
294320
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
321+
#if DISPATCH_USE_WORKQ_PRIORITY
295322
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
323+
#endif
296324
.dgq_wq_options = 0,
297325
#endif
298326
#if DISPATCH_ENABLE_THREAD_POOL
@@ -303,7 +331,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
303331
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
304332
#if DISPATCH_USE_WORKQUEUES
305333
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
334+
#if DISPATCH_USE_WORKQ_PRIORITY
306335
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
336+
#endif
307337
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
308338
#endif
309339
#if DISPATCH_ENABLE_THREAD_POOL
@@ -5511,7 +5541,7 @@ _dispatch_worker_thread(void *context)
55115541
bool manager = (dq == &_dispatch_mgr_root_queue);
55125542
bool monitored = !(overcommit || manager);
55135543
if (monitored) {
5514-
_dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
5544+
_dispatch_workq_worker_register(dq, qc->dgq_qos);
55155545
}
55165546
#endif
55175547

@@ -5525,7 +5555,7 @@ _dispatch_worker_thread(void *context)
55255555

55265556
#if DISPATCH_USE_INTERNAL_WORKQUEUE
55275557
if (monitored) {
5528-
_dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority);
5558+
_dispatch_workq_worker_unregister(dq, qc->dgq_qos);
55295559
}
55305560
#endif
55315561
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);

0 commit comments

Comments
 (0)