Skip to content

Commit c175b6a

Browse files
committed
Convert dispatch_workq from legacy priorities to qos
Update dispatch_workq (DISPATCH_USE_INTERNAL_WORKQUEUE) to use QoS-based constants instead of legacy priorities. Enhance monitoring code to count runnable threads from highest QoS to lowest and to suppress voluntary oversubscription for lower QoS queues if the total count of runnable worker threads is already over the desired threshold.
1 parent acb1b1e commit c175b6a

File tree

3 files changed

+62
-46
lines changed

3 files changed

+62
-46
lines changed

src/event/workqueue.c

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ typedef struct dispatch_workq_monitor_s {
6969
int num_registered_tids;
7070
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
7171

72-
static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES];
72+
static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX+1];
7373

7474
#pragma mark Implementation of the monitoring subsystem.
7575

@@ -80,12 +80,13 @@ static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
8080
static dispatch_once_t _dispatch_workq_init_once_pred;
8181

8282
void
83-
_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
83+
_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
8484
{
8585
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
8686

8787
#if HAVE_DISPATCH_WORKQ_MONITORING
88-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
88+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
89+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos];
8990
dispatch_assert(mon->dq == root_q);
9091
dispatch_tid tid = _dispatch_thread_getspecific(tid);
9192
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
@@ -97,10 +98,11 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
9798
}
9899

99100
void
100-
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority)
101+
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
101102
{
102103
#if HAVE_DISPATCH_WORKQ_MONITORING
103-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
104+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
105+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos];
104106
dispatch_tid tid = _dispatch_thread_getspecific(tid);
105107
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
106108
for (int i = 0; i < mon->num_registered_tids; i++) {
@@ -177,15 +179,9 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
177179
static void
178180
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
179181
{
180-
// TODO: Once we switch away from the legacy priorities to
181-
// newer QoS, we can loop in order of decreasing QoS
182-
// and track the total number of runnable threads seen
183-
// across pools. We can then use that number to
184-
// implement a global policy where low QoS queues
185-
// are not eligible for over-subscription if the higher
186-
// QoS queues have already consumed the target
187-
// number of threads.
188-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
182+
int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus);
183+
int global_runnable = 0;
184+
for (int i = DISPATCH_QOS_MAX; i > 0; i--) {
189185
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
190186
dispatch_queue_t dq = mon->dq;
191187

@@ -198,16 +194,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
198194
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
199195
dq->dq_label, mon->num_runnable, mon->target_runnable);
200196

197+
global_runnable += mon->num_runnable;
198+
201199
if (mon->num_runnable == 0) {
202-
// We are below target, and no worker is runnable.
200+
// We have work, but no worker is runnable.
203201
// It is likely the program is stalled. Therefore treat
204202
// this as if dq were an overcommit queue and call poke
205203
// with the limit being the maximum number of workers for dq.
206204
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
207205
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
208206
dq->dq_label, floor);
209207
_dispatch_global_queue_poke(dq, 1, floor);
210-
} else if (mon->num_runnable < mon->target_runnable) {
208+
global_runnable += 1; // account for poke in global estimate
209+
} else if (mon->num_runnable < mon->target_runnable &&
210+
global_runnable < global_soft_max) {
211211
// We are below target, but some workers are still runnable.
212212
// We want to oversubscribe to hit the desired load target.
213213
// However, this under-utilization may be transitory so set the
@@ -218,28 +218,27 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
218218
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
219219
dq->dq_label, floor);
220220
_dispatch_global_queue_poke(dq, 1, floor);
221+
global_runnable += 1; // account for poke in global estimate
221222
}
222223
}
223224
}
224225
#endif // HAVE_DISPATCH_WORKQ_MONITORING
225226

226-
227-
// temporary until we switch over to QoS based interface.
228227
static dispatch_queue_t
229-
get_root_queue_from_legacy_priority(int priority)
228+
get_root_queue_from_dispatch_qos(dispatch_qos_t qos)
230229
{
231-
switch (priority) {
232-
case WORKQ_HIGH_PRIOQUEUE:
233-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
234-
case WORKQ_DEFAULT_PRIOQUEUE:
235-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
236-
case WORKQ_LOW_PRIOQUEUE:
237-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
238-
case WORKQ_BG_PRIOQUEUE:
230+
switch (qos) {
231+
case DISPATCH_QOS_MAINTENANCE:
239232
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS];
240-
case WORKQ_BG_PRIOQUEUE_CONDITIONAL:
233+
case DISPATCH_QOS_BACKGROUND:
241234
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS];
242-
case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL:
235+
case DISPATCH_QOS_UTILITY:
236+
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
237+
case DISPATCH_QOS_DEFAULT:
238+
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
239+
case DISPATCH_QOS_USER_INITIATED:
240+
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
241+
case DISPATCH_QOS_USER_INTERACTIVE:
243242
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS];
244243
default:
245244
return NULL;
@@ -251,9 +250,9 @@ _dispatch_workq_init_once(void *context DISPATCH_UNUSED)
251250
{
252251
#if HAVE_DISPATCH_WORKQ_MONITORING
253252
int target_runnable = dispatch_hw_config(active_cpus);
254-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
253+
for (int i = DISPATCH_QOS_MAX; i > 0; i--) {
255254
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
256-
mon->dq = get_root_queue_from_legacy_priority(i);
255+
mon->dq = get_root_queue_from_dispatch_qos(i);
257256
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
258257
mon->registered_tids = buf;
259258
mon->target_runnable = target_runnable;

src/event/workqueue_internal.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,10 @@
2727
#ifndef __DISPATCH_WORKQUEUE_INTERNAL__
2828
#define __DISPATCH_WORKQUEUE_INTERNAL__
2929

30-
/* Work queue priority attributes. */
31-
#define WORKQ_HIGH_PRIOQUEUE 0
32-
#define WORKQ_DEFAULT_PRIOQUEUE 1
33-
#define WORKQ_LOW_PRIOQUEUE 2
34-
#define WORKQ_BG_PRIOQUEUE 3
35-
#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4
36-
#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5
37-
38-
#define WORKQ_NUM_PRIORITIES 6
39-
4030
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
4131

42-
void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority);
43-
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority);
32+
void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls);
33+
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls);
4434

4535
#if defined(__linux__)
4636
#define HAVE_DISPATCH_WORKQ_MONITORING 1

src/queue.c

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,10 @@ struct dispatch_root_queue_context_s {
154154
int volatile dgq_pending;
155155
#if DISPATCH_USE_WORKQUEUES
156156
qos_class_t dgq_qos;
157-
int dgq_wq_priority, dgq_wq_options;
157+
#if HAVE_PTHREAD_WORKQUEUES
158+
int dgq_wq_priority;
159+
#endif
160+
int dgq_wq_options;
158161
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
159162
pthread_workqueue_t dgq_kworkqueue;
160163
#endif
@@ -182,7 +185,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
182185
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
183186
#if DISPATCH_USE_WORKQUEUES
184187
.dgq_qos = QOS_CLASS_MAINTENANCE,
188+
#if HAVE_PTHREAD_WORKQUEUES
185189
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
190+
#endif
186191
.dgq_wq_options = 0,
187192
#endif
188193
#if DISPATCH_ENABLE_THREAD_POOL
@@ -193,7 +198,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
193198
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
194199
#if DISPATCH_USE_WORKQUEUES
195200
.dgq_qos = QOS_CLASS_MAINTENANCE,
201+
#if HAVE_PTHREAD_WORKQUEUES
196202
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
203+
#endif
197204
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
198205
#endif
199206
#if DISPATCH_ENABLE_THREAD_POOL
@@ -204,7 +211,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
204211
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
205212
#if DISPATCH_USE_WORKQUEUES
206213
.dgq_qos = QOS_CLASS_BACKGROUND,
214+
#if HAVE_PTHREAD_WORKQUEUES
207215
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
216+
#endif
208217
.dgq_wq_options = 0,
209218
#endif
210219
#if DISPATCH_ENABLE_THREAD_POOL
@@ -215,7 +224,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
215224
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
216225
#if DISPATCH_USE_WORKQUEUES
217226
.dgq_qos = QOS_CLASS_BACKGROUND,
227+
#if HAVE_PTHREAD_WORKQUEUES
218228
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
229+
#endif
219230
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
220231
#endif
221232
#if DISPATCH_ENABLE_THREAD_POOL
@@ -226,7 +237,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
226237
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
227238
#if DISPATCH_USE_WORKQUEUES
228239
.dgq_qos = QOS_CLASS_UTILITY,
240+
#if HAVE_PTHREAD_WORKQUEUES
229241
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
242+
#endif
230243
.dgq_wq_options = 0,
231244
#endif
232245
#if DISPATCH_ENABLE_THREAD_POOL
@@ -237,7 +250,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
237250
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
238251
#if DISPATCH_USE_WORKQUEUES
239252
.dgq_qos = QOS_CLASS_UTILITY,
253+
#if HAVE_PTHREAD_WORKQUEUES
240254
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
255+
#endif
241256
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
242257
#endif
243258
#if DISPATCH_ENABLE_THREAD_POOL
@@ -248,7 +263,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
248263
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
249264
#if DISPATCH_USE_WORKQUEUES
250265
.dgq_qos = QOS_CLASS_DEFAULT,
266+
#if HAVE_PTHREAD_WORKQUEUES
251267
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
268+
#endif
252269
.dgq_wq_options = 0,
253270
#endif
254271
#if DISPATCH_ENABLE_THREAD_POOL
@@ -259,7 +276,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
259276
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
260277
#if DISPATCH_USE_WORKQUEUES
261278
.dgq_qos = QOS_CLASS_DEFAULT,
279+
#if HAVE_PTHREAD_WORKQUEUES
262280
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
281+
#endif
263282
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
264283
#endif
265284
#if DISPATCH_ENABLE_THREAD_POOL
@@ -270,7 +289,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
270289
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
271290
#if DISPATCH_USE_WORKQUEUES
272291
.dgq_qos = QOS_CLASS_USER_INITIATED,
292+
#if HAVE_PTHREAD_WORKQUEUES
273293
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
294+
#endif
274295
.dgq_wq_options = 0,
275296
#endif
276297
#if DISPATCH_ENABLE_THREAD_POOL
@@ -281,7 +302,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
281302
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
282303
#if DISPATCH_USE_WORKQUEUES
283304
.dgq_qos = QOS_CLASS_USER_INITIATED,
305+
#if HAVE_PTHREAD_WORKQUEUES
284306
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
307+
#endif
285308
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
286309
#endif
287310
#if DISPATCH_ENABLE_THREAD_POOL
@@ -292,7 +315,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
292315
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
293316
#if DISPATCH_USE_WORKQUEUES
294317
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
318+
#if HAVE_PTHREAD_WORKQUEUES
295319
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
320+
#endif
296321
.dgq_wq_options = 0,
297322
#endif
298323
#if DISPATCH_ENABLE_THREAD_POOL
@@ -303,7 +328,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
303328
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
304329
#if DISPATCH_USE_WORKQUEUES
305330
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
331+
#if HAVE_PTHREAD_WORKQUEUES
306332
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
333+
#endif
307334
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
308335
#endif
309336
#if DISPATCH_ENABLE_THREAD_POOL
@@ -5511,7 +5538,7 @@ _dispatch_worker_thread(void *context)
55115538
bool manager = (dq == &_dispatch_mgr_root_queue);
55125539
bool monitored = !(overcommit || manager);
55135540
if (monitored) {
5514-
_dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
5541+
_dispatch_workq_worker_register(dq, qc->dgq_qos);
55155542
}
55165543
#endif
55175544

@@ -5525,7 +5552,7 @@ _dispatch_worker_thread(void *context)
55255552

55265553
#if DISPATCH_USE_INTERNAL_WORKQUEUE
55275554
if (monitored) {
5528-
_dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority);
5555+
_dispatch_workq_worker_unregister(dq, qc->dgq_qos);
55295556
}
55305557
#endif
55315558
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);

0 commit comments

Comments
 (0)