Skip to content

Commit 11eccaf

Browse files
committed
Convert dispatch_workq from legacy priorities to qos
Update dispatch_workq (DISPATCH_USE_INTERNAL_WORKQUEUE) to use QoS-based constants instead of legacy priorities. Enhance monitoring code to count runnable threads from highest QoS to lowest and to suppress voluntary oversubscription for lower QoS queues if the total count of runnable worker threads is already over the desired threshold.
1 parent acb1b1e commit 11eccaf

File tree

3 files changed

+62
-58
lines changed

3 files changed

+62
-58
lines changed

src/event/workqueue.c

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,15 @@ typedef struct dispatch_workq_monitor_s {
6969
int num_registered_tids;
7070
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
7171

72-
static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES];
72+
static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
73+
74+
DISPATCH_ALWAYS_INLINE
75+
static inline dispatch_workq_monitor_t
76+
_dispatch_workq_get_monitor(qos_class_t cls)
77+
{
78+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
79+
return &_dispatch_workq_monitors[qos-1];
80+
}
7381

7482
#pragma mark Implementation of the monitoring subsystem.
7583

@@ -80,12 +88,12 @@ static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
8088
static dispatch_once_t _dispatch_workq_init_once_pred;
8189

8290
void
83-
_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
91+
_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
8492
{
8593
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
8694

8795
#if HAVE_DISPATCH_WORKQ_MONITORING
88-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
96+
dispatch_workq_monitor_t mon = _dispatch_workq_get_monitor(cls);
8997
dispatch_assert(mon->dq == root_q);
9098
dispatch_tid tid = _dispatch_thread_getspecific(tid);
9199
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
@@ -97,10 +105,10 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
97105
}
98106

99107
void
100-
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority)
108+
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
101109
{
102110
#if HAVE_DISPATCH_WORKQ_MONITORING
103-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
111+
dispatch_workq_monitor_t mon = _dispatch_workq_get_monitor(cls);
104112
dispatch_tid tid = _dispatch_thread_getspecific(tid);
105113
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
106114
for (int i = 0; i < mon->num_registered_tids; i++) {
@@ -177,16 +185,10 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
177185
static void
178186
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
179187
{
180-
// TODO: Once we switch away from the legacy priorities to
181-
// newer QoS, we can loop in order of decreasing QoS
182-
// and track the total number of runnable threads seen
183-
// across pools. We can then use that number to
184-
// implement a global policy where low QoS queues
185-
// are not eligible for over-subscription if the higher
186-
// QoS queues have already consumed the target
187-
// number of threads.
188-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
189-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
188+
int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus);
189+
int global_runnable = 0;
190+
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
191+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
190192
dispatch_queue_t dq = mon->dq;
191193

192194
if (!_dispatch_queue_class_probe(dq)) {
@@ -198,16 +200,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
198200
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
199201
dq->dq_label, mon->num_runnable, mon->target_runnable);
200202

203+
global_runnable += mon->num_runnable;
204+
201205
if (mon->num_runnable == 0) {
202-
// We are below target, and no worker is runnable.
206+
// We have work, but no worker is runnable.
203207
// It is likely the program is stalled. Therefore treat
204208
// this as if dq were an overcommit queue and call poke
205209
// with the limit being the maximum number of workers for dq.
206210
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
207211
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
208212
dq->dq_label, floor);
209213
_dispatch_global_queue_poke(dq, 1, floor);
210-
} else if (mon->num_runnable < mon->target_runnable) {
214+
global_runnable += 1; // account for poke in global estimate
215+
} else if (mon->num_runnable < mon->target_runnable &&
216+
global_runnable < global_soft_max) {
211217
// We are below target, but some workers are still runnable.
212218
// We want to oversubscribe to hit the desired load target.
213219
// However, this under-utilization may be transitory so set the
@@ -218,42 +224,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
218224
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
219225
dq->dq_label, floor);
220226
_dispatch_global_queue_poke(dq, 1, floor);
227+
global_runnable += 1; // account for poke in global estimate
221228
}
222229
}
223230
}
224231
#endif // HAVE_DISPATCH_WORKQ_MONITORING
225232

226-
227-
// temporary until we switch over to QoS based interface.
228-
static dispatch_queue_t
229-
get_root_queue_from_legacy_priority(int priority)
230-
{
231-
switch (priority) {
232-
case WORKQ_HIGH_PRIOQUEUE:
233-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
234-
case WORKQ_DEFAULT_PRIOQUEUE:
235-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
236-
case WORKQ_LOW_PRIOQUEUE:
237-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
238-
case WORKQ_BG_PRIOQUEUE:
239-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS];
240-
case WORKQ_BG_PRIOQUEUE_CONDITIONAL:
241-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS];
242-
case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL:
243-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS];
244-
default:
245-
return NULL;
246-
}
247-
}
248-
249233
static void
250234
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
251235
{
252236
#if HAVE_DISPATCH_WORKQ_MONITORING
253237
int target_runnable = dispatch_hw_config(active_cpus);
254-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
255-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
256-
mon->dq = get_root_queue_from_legacy_priority(i);
238+
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
239+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
240+
mon->dq = _dispatch_get_root_queue(i, false);
257241
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
258242
mon->registered_tids = buf;
259243
mon->target_runnable = target_runnable;

src/event/workqueue_internal.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,10 @@
2727
#ifndef __DISPATCH_WORKQUEUE_INTERNAL__
2828
#define __DISPATCH_WORKQUEUE_INTERNAL__
2929

30-
/* Work queue priority attributes. */
31-
#define WORKQ_HIGH_PRIOQUEUE 0
32-
#define WORKQ_DEFAULT_PRIOQUEUE 1
33-
#define WORKQ_LOW_PRIOQUEUE 2
34-
#define WORKQ_BG_PRIOQUEUE 3
35-
#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4
36-
#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5
37-
38-
#define WORKQ_NUM_PRIORITIES 6
39-
4030
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
4131

42-
void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority);
43-
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority);
32+
void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls);
33+
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls);
4434

4535
#if defined(__linux__)
4636
#define HAVE_DISPATCH_WORKQ_MONITORING 1

src/queue.c

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
3838
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
3939
#endif
40+
#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
41+
#define DISPATCH_USE_WORKQ_PRIORITY 1
42+
#endif
4043
#if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
4144
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4245
#define pthread_workqueue_t void*
@@ -154,7 +157,10 @@ struct dispatch_root_queue_context_s {
154157
int volatile dgq_pending;
155158
#if DISPATCH_USE_WORKQUEUES
156159
qos_class_t dgq_qos;
157-
int dgq_wq_priority, dgq_wq_options;
160+
#if DISPATCH_USE_WORKQ_PRIORITY
161+
int dgq_wq_priority;
162+
#endif
163+
int dgq_wq_options;
158164
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
159165
pthread_workqueue_t dgq_kworkqueue;
160166
#endif
@@ -182,7 +188,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
182188
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
183189
#if DISPATCH_USE_WORKQUEUES
184190
.dgq_qos = QOS_CLASS_MAINTENANCE,
191+
#if DISPATCH_USE_WORKQ_PRIORITY
185192
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
193+
#endif
186194
.dgq_wq_options = 0,
187195
#endif
188196
#if DISPATCH_ENABLE_THREAD_POOL
@@ -193,7 +201,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
193201
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
194202
#if DISPATCH_USE_WORKQUEUES
195203
.dgq_qos = QOS_CLASS_MAINTENANCE,
204+
#if DISPATCH_USE_WORKQ_PRIORITY
196205
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
206+
#endif
197207
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
198208
#endif
199209
#if DISPATCH_ENABLE_THREAD_POOL
@@ -204,7 +214,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
204214
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
205215
#if DISPATCH_USE_WORKQUEUES
206216
.dgq_qos = QOS_CLASS_BACKGROUND,
217+
#if DISPATCH_USE_WORKQ_PRIORITY
207218
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
219+
#endif
208220
.dgq_wq_options = 0,
209221
#endif
210222
#if DISPATCH_ENABLE_THREAD_POOL
@@ -215,7 +227,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
215227
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
216228
#if DISPATCH_USE_WORKQUEUES
217229
.dgq_qos = QOS_CLASS_BACKGROUND,
230+
#if DISPATCH_USE_WORKQ_PRIORITY
218231
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
232+
#endif
219233
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
220234
#endif
221235
#if DISPATCH_ENABLE_THREAD_POOL
@@ -226,7 +240,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
226240
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
227241
#if DISPATCH_USE_WORKQUEUES
228242
.dgq_qos = QOS_CLASS_UTILITY,
243+
#if DISPATCH_USE_WORKQ_PRIORITY
229244
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
245+
#endif
230246
.dgq_wq_options = 0,
231247
#endif
232248
#if DISPATCH_ENABLE_THREAD_POOL
@@ -237,7 +253,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
237253
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
238254
#if DISPATCH_USE_WORKQUEUES
239255
.dgq_qos = QOS_CLASS_UTILITY,
256+
#if DISPATCH_USE_WORKQ_PRIORITY
240257
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
258+
#endif
241259
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
242260
#endif
243261
#if DISPATCH_ENABLE_THREAD_POOL
@@ -248,7 +266,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
248266
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
249267
#if DISPATCH_USE_WORKQUEUES
250268
.dgq_qos = QOS_CLASS_DEFAULT,
269+
#if DISPATCH_USE_WORKQ_PRIORITY
251270
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
271+
#endif
252272
.dgq_wq_options = 0,
253273
#endif
254274
#if DISPATCH_ENABLE_THREAD_POOL
@@ -259,7 +279,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
259279
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
260280
#if DISPATCH_USE_WORKQUEUES
261281
.dgq_qos = QOS_CLASS_DEFAULT,
282+
#if DISPATCH_USE_WORKQ_PRIORITY
262283
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
284+
#endif
263285
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
264286
#endif
265287
#if DISPATCH_ENABLE_THREAD_POOL
@@ -270,7 +292,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
270292
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
271293
#if DISPATCH_USE_WORKQUEUES
272294
.dgq_qos = QOS_CLASS_USER_INITIATED,
295+
#if DISPATCH_USE_WORKQ_PRIORITY
273296
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
297+
#endif
274298
.dgq_wq_options = 0,
275299
#endif
276300
#if DISPATCH_ENABLE_THREAD_POOL
@@ -281,7 +305,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
281305
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
282306
#if DISPATCH_USE_WORKQUEUES
283307
.dgq_qos = QOS_CLASS_USER_INITIATED,
308+
#if DISPATCH_USE_WORKQ_PRIORITY
284309
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
310+
#endif
285311
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
286312
#endif
287313
#if DISPATCH_ENABLE_THREAD_POOL
@@ -292,7 +318,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
292318
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
293319
#if DISPATCH_USE_WORKQUEUES
294320
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
321+
#if DISPATCH_USE_WORKQ_PRIORITY
295322
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
323+
#endif
296324
.dgq_wq_options = 0,
297325
#endif
298326
#if DISPATCH_ENABLE_THREAD_POOL
@@ -303,7 +331,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
303331
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
304332
#if DISPATCH_USE_WORKQUEUES
305333
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
334+
#if DISPATCH_USE_WORKQ_PRIORITY
306335
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
336+
#endif
307337
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
308338
#endif
309339
#if DISPATCH_ENABLE_THREAD_POOL
@@ -5511,7 +5541,7 @@ _dispatch_worker_thread(void *context)
55115541
bool manager = (dq == &_dispatch_mgr_root_queue);
55125542
bool monitored = !(overcommit || manager);
55135543
if (monitored) {
5514-
_dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
5544+
_dispatch_workq_worker_register(dq, qc->dgq_qos);
55155545
}
55165546
#endif
55175547

@@ -5525,7 +5555,7 @@ _dispatch_worker_thread(void *context)
55255555

55265556
#if DISPATCH_USE_INTERNAL_WORKQUEUE
55275557
if (monitored) {
5528-
_dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority);
5558+
_dispatch_workq_worker_unregister(dq, qc->dgq_qos);
55295559
}
55305560
#endif
55315561
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);

0 commit comments

Comments
 (0)