Skip to content

Commit 201f3c7

Browse files
dgrove-ossdas
authored andcommitted
Convert dispatch_workq from legacy priorities to qos
Update dispatch_workq (DISPATCH_USE_INTERNAL_WORKQUEUE) to use QoS-based constants instead of legacy priorities. Enhance monitoring code to count runnable threads from highest QoS to lowest and to suppress voluntary oversubscription for lower QoS queues if the total count of runnable worker threads is already over the desired threshold. Signed-off-by: Daniel A. Steffen <dsteffen@apple.com>
1 parent 5b1738b commit 201f3c7

File tree

3 files changed

+56
-58
lines changed

3 files changed

+56
-58
lines changed

src/event/workqueue.c

Lines changed: 21 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ typedef struct dispatch_workq_monitor_s {
6969
int num_registered_tids;
7070
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
7171

72-
static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES];
72+
static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
7373

7474
#pragma mark Implementation of the monitoring subsystem.
7575

@@ -80,12 +80,13 @@ static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
8080
static dispatch_once_t _dispatch_workq_init_once_pred;
8181

8282
void
83-
_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
83+
_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
8484
{
8585
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
8686

8787
#if HAVE_DISPATCH_WORKQ_MONITORING
88-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
88+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
89+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
8990
dispatch_assert(mon->dq == root_q);
9091
dispatch_tid tid = _dispatch_thread_getspecific(tid);
9192
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
@@ -97,10 +98,11 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
9798
}
9899

99100
void
100-
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority)
101+
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
101102
{
102103
#if HAVE_DISPATCH_WORKQ_MONITORING
103-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
104+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
105+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
104106
dispatch_tid tid = _dispatch_thread_getspecific(tid);
105107
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
106108
for (int i = 0; i < mon->num_registered_tids; i++) {
@@ -177,16 +179,10 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
177179
static void
178180
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
179181
{
180-
// TODO: Once we switch away from the legacy priorities to
181-
// newer QoS, we can loop in order of decreasing QoS
182-
// and track the total number of runnable threads seen
183-
// across pools. We can then use that number to
184-
// implement a global policy where low QoS queues
185-
// are not eligible for over-subscription if the higher
186-
// QoS queues have already consumed the target
187-
// number of threads.
188-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
189-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
182+
int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus);
183+
int global_runnable = 0;
184+
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
185+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
190186
dispatch_queue_t dq = mon->dq;
191187

192188
if (!_dispatch_queue_class_probe(dq)) {
@@ -198,16 +194,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
198194
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
199195
dq->dq_label, mon->num_runnable, mon->target_runnable);
200196

197+
global_runnable += mon->num_runnable;
198+
201199
if (mon->num_runnable == 0) {
202-
// We are below target, and no worker is runnable.
200+
// We have work, but no worker is runnable.
203201
// It is likely the program is stalled. Therefore treat
204202
// this as if dq were an overcommit queue and call poke
205203
// with the limit being the maximum number of workers for dq.
206204
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
207205
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
208206
dq->dq_label, floor);
209207
_dispatch_global_queue_poke(dq, 1, floor);
210-
} else if (mon->num_runnable < mon->target_runnable) {
208+
global_runnable += 1; // account for poke in global estimate
209+
} else if (mon->num_runnable < mon->target_runnable &&
210+
global_runnable < global_soft_max) {
211211
// We are below target, but some workers are still runnable.
212212
// We want to oversubscribe to hit the desired load target.
213213
// However, this under-utilization may be transitory so set the
@@ -218,42 +218,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
218218
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
219219
dq->dq_label, floor);
220220
_dispatch_global_queue_poke(dq, 1, floor);
221+
global_runnable += 1; // account for poke in global estimate
221222
}
222223
}
223224
}
224225
#endif // HAVE_DISPATCH_WORKQ_MONITORING
225226

226-
227-
// temporary until we switch over to QoS based interface.
228-
static dispatch_queue_t
229-
get_root_queue_from_legacy_priority(int priority)
230-
{
231-
switch (priority) {
232-
case WORKQ_HIGH_PRIOQUEUE:
233-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
234-
case WORKQ_DEFAULT_PRIOQUEUE:
235-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
236-
case WORKQ_LOW_PRIOQUEUE:
237-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
238-
case WORKQ_BG_PRIOQUEUE:
239-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS];
240-
case WORKQ_BG_PRIOQUEUE_CONDITIONAL:
241-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS];
242-
case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL:
243-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS];
244-
default:
245-
return NULL;
246-
}
247-
}
248-
249227
static void
250228
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
251229
{
252230
#if HAVE_DISPATCH_WORKQ_MONITORING
253231
int target_runnable = dispatch_hw_config(active_cpus);
254-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
255-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
256-
mon->dq = get_root_queue_from_legacy_priority(i);
232+
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
233+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
234+
mon->dq = _dispatch_get_root_queue(i, false);
257235
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
258236
mon->registered_tids = buf;
259237
mon->target_runnable = target_runnable;

src/event/workqueue_internal.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,12 @@
2727
#ifndef __DISPATCH_WORKQUEUE_INTERNAL__
2828
#define __DISPATCH_WORKQUEUE_INTERNAL__
2929

30-
/* Work queue priority attributes. */
31-
#define WORKQ_HIGH_PRIOQUEUE 0
32-
#define WORKQ_DEFAULT_PRIOQUEUE 1
33-
#define WORKQ_LOW_PRIOQUEUE 2
34-
#define WORKQ_BG_PRIOQUEUE 3
35-
#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4
36-
#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5
37-
38-
#define WORKQ_NUM_PRIORITIES 6
39-
4030
#define WORKQ_ADDTHREADS_OPTION_OVERCOMMIT 0x1
4131

4232
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
4333

44-
void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority);
45-
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority);
34+
void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls);
35+
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls);
4636

4737
#if defined(__linux__)
4838
#define HAVE_DISPATCH_WORKQ_MONITORING 1

src/queue.c

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
3838
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
3939
#endif
40+
#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
41+
#define DISPATCH_USE_WORKQ_PRIORITY 1
42+
#endif
4043
#if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
4144
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4245
#define pthread_workqueue_t void*
@@ -158,7 +161,10 @@ struct dispatch_root_queue_context_s {
158161
int volatile dgq_pending;
159162
#if DISPATCH_USE_WORKQUEUES
160163
qos_class_t dgq_qos;
161-
int dgq_wq_priority, dgq_wq_options;
164+
#if DISPATCH_USE_WORKQ_PRIORITY
165+
int dgq_wq_priority;
166+
#endif
167+
int dgq_wq_options;
162168
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
163169
pthread_workqueue_t dgq_kworkqueue;
164170
#endif
@@ -186,7 +192,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
186192
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
187193
#if DISPATCH_USE_WORKQUEUES
188194
.dgq_qos = QOS_CLASS_MAINTENANCE,
195+
#if DISPATCH_USE_WORKQ_PRIORITY
189196
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
197+
#endif
190198
.dgq_wq_options = 0,
191199
#endif
192200
#if DISPATCH_ENABLE_THREAD_POOL
@@ -197,7 +205,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
197205
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
198206
#if DISPATCH_USE_WORKQUEUES
199207
.dgq_qos = QOS_CLASS_MAINTENANCE,
208+
#if DISPATCH_USE_WORKQ_PRIORITY
200209
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
210+
#endif
201211
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
202212
#endif
203213
#if DISPATCH_ENABLE_THREAD_POOL
@@ -208,7 +218,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
208218
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
209219
#if DISPATCH_USE_WORKQUEUES
210220
.dgq_qos = QOS_CLASS_BACKGROUND,
221+
#if DISPATCH_USE_WORKQ_PRIORITY
211222
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
223+
#endif
212224
.dgq_wq_options = 0,
213225
#endif
214226
#if DISPATCH_ENABLE_THREAD_POOL
@@ -219,7 +231,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
219231
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
220232
#if DISPATCH_USE_WORKQUEUES
221233
.dgq_qos = QOS_CLASS_BACKGROUND,
234+
#if DISPATCH_USE_WORKQ_PRIORITY
222235
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
236+
#endif
223237
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
224238
#endif
225239
#if DISPATCH_ENABLE_THREAD_POOL
@@ -230,7 +244,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
230244
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
231245
#if DISPATCH_USE_WORKQUEUES
232246
.dgq_qos = QOS_CLASS_UTILITY,
247+
#if DISPATCH_USE_WORKQ_PRIORITY
233248
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
249+
#endif
234250
.dgq_wq_options = 0,
235251
#endif
236252
#if DISPATCH_ENABLE_THREAD_POOL
@@ -241,7 +257,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
241257
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
242258
#if DISPATCH_USE_WORKQUEUES
243259
.dgq_qos = QOS_CLASS_UTILITY,
260+
#if DISPATCH_USE_WORKQ_PRIORITY
244261
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
262+
#endif
245263
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
246264
#endif
247265
#if DISPATCH_ENABLE_THREAD_POOL
@@ -252,7 +270,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
252270
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
253271
#if DISPATCH_USE_WORKQUEUES
254272
.dgq_qos = QOS_CLASS_DEFAULT,
273+
#if DISPATCH_USE_WORKQ_PRIORITY
255274
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
275+
#endif
256276
.dgq_wq_options = 0,
257277
#endif
258278
#if DISPATCH_ENABLE_THREAD_POOL
@@ -263,7 +283,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
263283
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
264284
#if DISPATCH_USE_WORKQUEUES
265285
.dgq_qos = QOS_CLASS_DEFAULT,
286+
#if DISPATCH_USE_WORKQ_PRIORITY
266287
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
288+
#endif
267289
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
268290
#endif
269291
#if DISPATCH_ENABLE_THREAD_POOL
@@ -274,7 +296,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
274296
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
275297
#if DISPATCH_USE_WORKQUEUES
276298
.dgq_qos = QOS_CLASS_USER_INITIATED,
299+
#if DISPATCH_USE_WORKQ_PRIORITY
277300
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
301+
#endif
278302
.dgq_wq_options = 0,
279303
#endif
280304
#if DISPATCH_ENABLE_THREAD_POOL
@@ -285,7 +309,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
285309
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
286310
#if DISPATCH_USE_WORKQUEUES
287311
.dgq_qos = QOS_CLASS_USER_INITIATED,
312+
#if DISPATCH_USE_WORKQ_PRIORITY
288313
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
314+
#endif
289315
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
290316
#endif
291317
#if DISPATCH_ENABLE_THREAD_POOL
@@ -296,7 +322,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
296322
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
297323
#if DISPATCH_USE_WORKQUEUES
298324
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
325+
#if DISPATCH_USE_WORKQ_PRIORITY
299326
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
327+
#endif
300328
.dgq_wq_options = 0,
301329
#endif
302330
#if DISPATCH_ENABLE_THREAD_POOL
@@ -307,7 +335,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
307335
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
308336
#if DISPATCH_USE_WORKQUEUES
309337
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
338+
#if DISPATCH_USE_WORKQ_PRIORITY
310339
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
340+
#endif
311341
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
312342
#endif
313343
#if DISPATCH_ENABLE_THREAD_POOL
@@ -5809,7 +5839,7 @@ _dispatch_worker_thread(void *context)
58095839
bool manager = (dq == &_dispatch_mgr_root_queue);
58105840
bool monitored = !(overcommit || manager);
58115841
if (monitored) {
5812-
_dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
5842+
_dispatch_workq_worker_register(dq, qc->dgq_qos);
58135843
}
58145844
#endif
58155845

@@ -5823,7 +5853,7 @@ _dispatch_worker_thread(void *context)
58235853

58245854
#if DISPATCH_USE_INTERNAL_WORKQUEUE
58255855
if (monitored) {
5826-
_dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority);
5856+
_dispatch_workq_worker_unregister(dq, qc->dgq_qos);
58275857
}
58285858
#endif
58295859
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);

0 commit comments

Comments
 (0)