diff --git a/.gitmodules b/.gitmodules index e6068b432..e69de29bb 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +0,0 @@ -[submodule "libpwq"] - path = libpwq - url = https://github.com/mheily/libpwq.git diff --git a/Makefile.am b/Makefile.am index ffc82df29..f1be02951 100644 --- a/Makefile.am +++ b/Makefile.am @@ -4,17 +4,12 @@ ACLOCAL_AMFLAGS = -I m4 -if BUILD_OWN_PTHREAD_WORKQUEUES - MAYBE_PTHREAD_WORKQUEUES = libpwq -endif - if BUILD_TESTS MAYBE_TESTS = tests endif SUBDIRS= \ dispatch \ - $(MAYBE_PTHREAD_WORKQUEUES) \ man \ os \ private \ diff --git a/configure.ac b/configure.ac index 00e546bf1..7db3d60f9 100644 --- a/configure.ac +++ b/configure.ac @@ -320,22 +320,31 @@ AS_IF([test -n "$apple_libpthread_source_path" -a -n "$apple_xnu_source_osfmk_pa AC_CHECK_HEADERS([pthread_machdep.h pthread/qos.h]) # pthread_workqueues. -# Look for own version first, then system version. -AS_IF([test -f $srcdir/libpwq/configure.ac], - [AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source]) - ac_configure_args="--disable-libpwq-install $ac_configure_args" - AC_CONFIG_SUBDIRS([libpwq]) - build_own_pthread_workqueues=true +# Look for own version first, than see if there is a system version. +AC_ARG_ENABLE([internal-libpwq], + [AS_HELP_STRING([--enable-internal-libpwq], + [Use libdispatch's own implementation of pthread workqueues.])],, + [case $target_os in + linux*) + enable_internal_libpwq=yes + ;; + *) + enable_internal_libpwq=no + esac] +) +AS_IF([test "x$enable_internal_libpwq" = "xyes"], + [AC_DEFINE(DISPATCH_USE_INTERNAL_WORKQUEUE, 1, [Use libdispatch's own implementation of pthread_workqueue API]) AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present]) + dispatch_use_internal_workqueue=true have_pthread_workqueues=true], - [build_own_pthread_workqueues=false + [dispatch_use_internal_workqueue=false AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h], [AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present]) have_pthread_workqueues=true], [have_pthread_workqueues=false] - )] + )] ) -AM_CONDITIONAL(BUILD_OWN_PTHREAD_WORKQUEUES, $build_own_pthread_workqueues) +AM_CONDITIONAL(DISPATCH_USE_INTERNAL_WORKQUEUE, $dispatch_use_internal_workqueue) AM_CONDITIONAL(HAVE_PTHREAD_WORKQUEUES, $have_pthread_workqueues) AC_CHECK_HEADERS([libproc_internal.h], [], [], [#include ]) diff --git a/libpwq b/libpwq deleted file mode 160000 index 18437d2be..000000000 --- a/libpwq +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 18437d2be372f4422b207ec6442c8caf7974025d diff --git a/src/Makefile.am b/src/Makefile.am index a0ddbc98b..ac2f74cfd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -9,6 +9,12 @@ else lib_LTLIBRARIES=libdispatch.la endif +if DISPATCH_USE_INTERNAL_WORKQUEUE +INTERNAL_WORKQUEUE_SOURCES= \ + event/workqueue.c \ + event/workqueue_internal.h +endif + libdispatch_la_SOURCES= \ allocator.c \ apply.c \ @@ -60,7 +66,8 @@ libdispatch_la_SOURCES= \ shims/perfmon.h \ shims/time.h \ shims/tsd.h \ - shims/yield.h + shims/yield.h \ + $(INTERNAL_WORKQUEUE_SOURCES) EXTRA_libdispatch_la_SOURCES= EXTRA_libdispatch_la_DEPENDENCIES= @@ -77,12 +84,13 @@ AM_OBJCFLAGS=$(DISPATCH_CFLAGS) $(CBLOCKS_FLAGS) AM_CXXFLAGS=$(PTHREAD_WORKQUEUE_CFLAGS) $(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS) AM_OBJCXXFLAGS=$(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS) -if BUILD_OWN_PTHREAD_WORKQUEUES - PTHREAD_WORKQUEUE_LIBS=$(top_builddir)/libpwq/libpthread_workqueue.la - PTHREAD_WORKQUEUE_CFLAGS=-I$(top_srcdir)/libpwq/include +if DISPATCH_USE_INTERNAL_WORKQUEUE + PTHREAD_WORKQUEUE_LIBS= + PTHREAD_WORKQUEUE_CFLAGS= else if HAVE_PTHREAD_WORKQUEUES PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue + PTHREAD_WORKQUEUE_CFLAGS= endif endif diff --git a/src/apply.c b/src/apply.c index 79c4e9594..0f1d85d77 100644 --- a/src/apply.c +++ b/src/apply.c @@ -159,11 +159,11 @@ static inline void _dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da, dispatch_function_t func) { - uint32_t i = 0; + int32_t i = 0; dispatch_continuation_t head = NULL, tail = NULL; // The current thread does not need a continuation - uint32_t continuation_cnt = da->da_thr_cnt - 1; + int32_t continuation_cnt = da->da_thr_cnt - 1; dispatch_assert(continuation_cnt); @@ -192,14 +192,14 @@ static void _dispatch_apply_redirect(void *ctxt) { dispatch_apply_t da = (dispatch_apply_t)ctxt; - uint32_t da_width = da->da_thr_cnt - 1; + int32_t da_width = da->da_thr_cnt - 1; dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq; do { - uint32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width); + int32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width); if (slowpath(da_width > width)) { - uint32_t excess = da_width - width; + int32_t excess = da_width - width; for (tq = dq; tq != rq; tq = tq->do_targetq) { _dispatch_queue_relinquish_width(tq, excess); } @@ -234,7 +234,7 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, if (slowpath(iterations == 0)) { return; } - uint32_t thr_cnt = dispatch_hw_config(active_cpus); + int32_t thr_cnt = dispatch_hw_config(active_cpus); dispatch_thread_context_t dtctxt = _dispatch_thread_context_find(_dispatch_apply_key); size_t nested = dtctxt ? dtctxt->dtc_apply_nesting : 0; dispatch_queue_t old_dq = _dispatch_queue_get_current(); @@ -247,7 +247,7 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, ? nested * iterations : DISPATCH_APPLY_MAX; } if (iterations < thr_cnt) { - thr_cnt = (uint32_t)iterations; + thr_cnt = iterations; } if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) { dq = old_dq ? old_dq : _dispatch_get_root_queue( diff --git a/src/event/workqueue.c b/src/event/workqueue.c new file mode 100644 index 000000000..0b9bc0ac5 --- /dev/null +++ b/src/event/workqueue.c @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2017-2017 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +#include "internal.h" + +#if DISPATCH_USE_INTERNAL_WORKQUEUE + +// forward looking typedef; not yet defined in dispatch +typedef pid_t dispatch_tid; + +/* + * dispatch_workq monitors the thread pool that is + * executing the work enqueued on libdispatch's pthread + * root queues and dynamically adjusts its size. + * + * The dynamic monitoring could be implemented using either + * (a) low-frequency user-level approximation of the number of runnable + * worker threads via reading the /proc file system + * (b) a Linux kernel extension that hooks the process change handler + * to accurately track the number of runnable normal worker threads + * This file provides an implementation of option (a). + * + * Using either form of monitoring, if (i) there appears to be + * work available in the monitored pthread root queue, (ii) the + * number of runnable workers is below the target size for the pool, + * and (iii) the total number of worker threads is below an upper limit, + * then an additional worker thread will be added to the pool. + */ + +#pragma mark static data for monitoring subsystem + +/* + * State for the user-level monitoring of a workqueue. + */ +typedef struct dispatch_workq_monitor_s { + /* The dispatch_queue we are monitoring */ + dispatch_queue_t dq; + + /* The observed number of runnable worker threads */ + int32_t num_runnable; + + /* The desired number of runnable worker threads */ + int32_t target_runnable; + + /* + * Tracking of registered workers; all accesses must hold lock. + * Invariant: registered_tids[0]...registered_tids[num_registered_tids-1] + * contain the dispatch_tids of the worker threads we are monitoring. + */ + dispatch_unfair_lock_s registered_tid_lock; + dispatch_tid *registered_tids; + int num_registered_tids; +} dispatch_workq_monitor_s, *dispatch_workq_monitor_t; + +static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES]; + +#pragma mark Implementation of the monitoring subsystem. + +#define WORKQ_MAX_TRACKED_TIDS DISPATCH_WORKQ_MAX_PTHREAD_COUNT +#define WORKQ_OVERSUBSCRIBE_FACTOR 2 + +static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED); +static dispatch_once_t _dispatch_workq_init_once_pred; + +void +_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority) +{ + dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once); + +#if HAVE_DISPATCH_WORKQ_MONITORING + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority]; + dispatch_assert(mon->dq == root_q); + dispatch_tid tid = _dispatch_thread_getspecific(tid); + _dispatch_unfair_lock_lock(&mon->registered_tid_lock); + dispatch_assert(mon->num_registered_tids < WORKQ_MAX_TRACKED_TIDS-1); + int worker_id = mon->num_registered_tids++; + mon->registered_tids[worker_id] = tid; + _dispatch_unfair_lock_unlock(&mon->registered_tid_lock); +#endif // HAVE_DISPATCH_WORKQ_MONITORING +} + +void +_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority) +{ +#if HAVE_DISPATCH_WORKQ_MONITORING + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority]; + dispatch_tid tid = _dispatch_thread_getspecific(tid); + _dispatch_unfair_lock_lock(&mon->registered_tid_lock); + for (int i = 0; i < mon->num_registered_tids; i++) { + if (mon->registered_tids[i] == tid) { + int last = mon->num_registered_tids - 1; + mon->registered_tids[i] = mon->registered_tids[last]; + mon->registered_tids[last] = 0; + mon->num_registered_tids--; + break; + } + } + _dispatch_unfair_lock_unlock(&mon->registered_tid_lock); +#endif // HAVE_DISPATCH_WORKQ_MONITORING +} + + +#if HAVE_DISPATCH_WORKQ_MONITORING +#if defined(__linux__) +/* + * For each pid that is a registered worker, read /proc/[pid]/stat + * to get a count of the number of them that are actually runnable. + * See the proc(5) man page for the format of the contents of /proc/[pid]/stat + */ +static void +_dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon) +{ + char path[128]; + char buf[4096]; + int running_count = 0; + + _dispatch_unfair_lock_lock(&mon->registered_tid_lock); + + for (int i = 0; i < mon->num_registered_tids; i++) { + dispatch_tid tid = mon->registered_tids[i]; + int fd; + size_t bytes_read = -1; + + int r = snprintf(path, sizeof(path), "/proc/%d/stat", tid); + dispatch_assert(r > 0 && r < sizeof(path)); + + fd = open(path, O_RDONLY | O_NONBLOCK); + if (unlikely(fd == -1)) { + DISPATCH_CLIENT_CRASH(tid, + "workq: registered worker exited prematurely"); + } else { + bytes_read = read(fd, buf, sizeof(buf)-1); + (void)close(fd); + } + + if (bytes_read > 0) { + buf[bytes_read] = '\0'; + char state; + if (sscanf(buf, "%*d %*s %c", &state) == 1) { + // _dispatch_debug("workq: Worker %d, state %c\n", tid, state); + if (state == 'R') { + running_count++; + } + } else { + _dispatch_debug("workq: sscanf of state failed for %d", tid); + } + } else { + _dispatch_debug("workq: Failed to read %s", path); + } + } + + mon->num_runnable = running_count; + + _dispatch_unfair_lock_unlock(&mon->registered_tid_lock); +} +#else +#error must define _dispatch_workq_count_runnable_workers +#endif + +static void +_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) +{ + // TODO: Once we switch away from the legacy priorities to + // newer QoS, we can loop in order of decreasing QoS + // and track the total number of runnable threads seen + // across pools. We can then use that number to + // implement a global policy where low QoS queues + // are not eligible for over-subscription if the higher + // QoS queues have already consumed the target + // number of threads. + for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) { + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i]; + dispatch_queue_t dq = mon->dq; + + if (!_dispatch_queue_class_probe(dq)) { + _dispatch_debug("workq: %s is empty.", dq->dq_label); + continue; + } + + _dispatch_workq_count_runnable_workers(mon); + _dispatch_debug("workq: %s has %d runnable wokers (target is %d)", + dq->dq_label, mon->num_runnable, mon->target_runnable); + + if (mon->num_runnable == 0) { + // We are below target, and no worker is runnable. + // It is likely the program is stalled. Therefore treat + // this as if dq were an overcommit queue and call poke + // with the limit being the maximum number of workers for dq. + int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS; + _dispatch_debug("workq: %s has no runnable workers; poking with floor %d", + dq->dq_label, floor); + _dispatch_global_queue_poke(dq, 1, floor); + } else if (mon->num_runnable < mon->target_runnable) { + // We are below target, but some workers are still runnable. + // We want to oversubscribe to hit the desired load target. + // However, this under-utilization may be transitory so set the + // floor as a small multiple of threads per core. + int32_t floor = (1 - WORKQ_OVERSUBSCRIBE_FACTOR) * mon->target_runnable; + int32_t floor2 = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS; + floor = MAX(floor, floor2); + _dispatch_debug("workq: %s under utilization target; poking with floor %d", + dq->dq_label, floor); + _dispatch_global_queue_poke(dq, 1, floor); + } + } +} +#endif // HAVE_DISPATCH_WORKQ_MONITORING + + +// temporary until we switch over to QoS based interface. +static dispatch_queue_t +get_root_queue_from_legacy_priority(int priority) +{ + switch (priority) { + case WORKQ_HIGH_PRIOQUEUE: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS]; + case WORKQ_DEFAULT_PRIOQUEUE: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS]; + case WORKQ_LOW_PRIOQUEUE: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS]; + case WORKQ_BG_PRIOQUEUE: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]; + case WORKQ_BG_PRIOQUEUE_CONDITIONAL: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS]; + case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]; + default: + return NULL; + } +} + +static void +_dispatch_workq_init_once(void *context DISPATCH_UNUSED) +{ +#if HAVE_DISPATCH_WORKQ_MONITORING + int target_runnable = dispatch_hw_config(active_cpus); + for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) { + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i]; + mon->dq = get_root_queue_from_legacy_priority(i); + void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid)); + mon->registered_tids = buf; + mon->target_runnable = target_runnable; + } + + // Create monitoring timer that will periodically run on dispatch_mgr_q + dispatch_source_t ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, + 0, 0, &_dispatch_mgr_q); + dispatch_source_set_timer(ds, dispatch_time(DISPATCH_TIME_NOW, 0), + NSEC_PER_SEC, 0); + dispatch_source_set_event_handler_f(ds, _dispatch_workq_monitor_pools); + dispatch_set_context(ds, ds); // avoid appearing as leaked + dispatch_activate(ds); +#endif // HAVE_DISPATCH_WORKQ_MONITORING +} + +#endif // DISPATCH_USE_INTERNAL_WORKQUEUE diff --git a/src/event/workqueue_internal.h b/src/event/workqueue_internal.h new file mode 100644 index 000000000..012e554fb --- /dev/null +++ b/src/event/workqueue_internal.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017-2017 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +/* + * IMPORTANT: This header file describes INTERNAL interfaces to libdispatch + * which are subject to change in future releases of Mac OS X. Any applications + * relying on these interfaces WILL break. + */ + +#ifndef __DISPATCH_WORKQUEUE_INTERNAL__ +#define __DISPATCH_WORKQUEUE_INTERNAL__ + +/* Work queue priority attributes. */ +#define WORKQ_HIGH_PRIOQUEUE 0 +#define WORKQ_DEFAULT_PRIOQUEUE 1 +#define WORKQ_LOW_PRIOQUEUE 2 +#define WORKQ_BG_PRIOQUEUE 3 +#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4 +#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5 + +#define WORKQ_NUM_PRIORITIES 6 + +#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255 + +void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority); +void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority); + +#if defined(__linux__) +#define HAVE_DISPATCH_WORKQ_MONITORING 1 +#else +#define HAVE_DISPATCH_WORKQ_MONITORING 0 +#endif + +#endif /* __DISPATCH_WORKQUEUE_INTERNAL__ */ + diff --git a/src/inline_internal.h b/src/inline_internal.h index a7ccb8260..d76b77aff 100644 --- a/src/inline_internal.h +++ b/src/inline_internal.h @@ -1479,7 +1479,7 @@ _dispatch_root_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _head, struct dispatch_object_s *head = _head._do, *tail = _tail._do; if (unlikely(_dispatch_queue_push_update_tail_list(dq, head, tail))) { _dispatch_queue_push_update_head(dq, head); - return _dispatch_global_queue_poke(dq, n); + return _dispatch_global_queue_poke(dq, n, 0); } } diff --git a/src/queue.c b/src/queue.c index 088c5cfd2..80a38c62a 100644 --- a/src/queue.c +++ b/src/queue.c @@ -23,7 +23,7 @@ #include "protocol.h" // _dispatch_send_wakeup_runloop_thread #endif -#if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \ +#if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG || DISPATCH_USE_INTERNAL_WORKQUEUE) && \ !defined(DISPATCH_ENABLE_THREAD_POOL) #define DISPATCH_ENABLE_THREAD_POOL 1 #endif @@ -32,6 +32,7 @@ #endif #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) && \ !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \ + !DISPATCH_USE_INTERNAL_WORKQUEUE && \ !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK) #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1 #endif @@ -143,12 +144,14 @@ static struct dispatch_pthread_root_queue_context_s }; #endif -#define MAX_PTHREAD_COUNT 255 +#ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT +#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255 +#endif struct dispatch_root_queue_context_s { union { struct { - unsigned int volatile dgq_pending; + int volatile dgq_pending; #if HAVE_PTHREAD_WORKQUEUES qos_class_t dgq_qos; int dgq_wq_priority, dgq_wq_options; @@ -158,7 +161,7 @@ struct dispatch_root_queue_context_s { #endif // HAVE_PTHREAD_WORKQUEUES #if DISPATCH_USE_PTHREAD_POOL void *dgq_ctxt; - uint32_t volatile dgq_thread_pool_size; + int32_t volatile dgq_thread_pool_size; #endif }; char _dgq_pad[DISPATCH_CACHELINE_SIZE]; @@ -663,7 +666,15 @@ _dispatch_root_queues_init_workq(int *wq_supported) result = result || dispatch_assume(pwq); } #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK - qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul); + if (pwq) { + qc->dgq_kworkqueue = pwq; + } else { + qc->dgq_kworkqueue = (void*)(~0ul); + // because the fastpath of _dispatch_global_queue_poke didn't + // know yet that we're using the internal pool implementation + // we have to undo its setting of dgq_pending + qc->dgq_pending = 0; + } } #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK if (!disable_wq) { @@ -680,10 +691,10 @@ _dispatch_root_queues_init_workq(int *wq_supported) #if DISPATCH_USE_PTHREAD_POOL static inline void _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc, - uint8_t pool_size, bool overcommit) + int32_t pool_size, bool overcommit) { dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; - uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT : + int32_t thread_pool_size = overcommit ? DISPATCH_WORKQ_MAX_PTHREAD_COUNT : dispatch_hw_config(active_cpus); if (slowpath(pool_size) && pool_size < thread_pool_size) { thread_pool_size = pool_size; @@ -716,7 +727,7 @@ _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED) size_t i; for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { bool overcommit = true; -#if TARGET_OS_EMBEDDED +#if TARGET_OS_EMBEDDED || (DISPATCH_USE_INTERNAL_WORKQUEUE && HAVE_DISPATCH_WORKQ_MONITORING) // some software hangs if the non-overcommitting queues do not // overcommit when threads block. Someday, this behavior should // apply to all platforms @@ -1979,8 +1990,8 @@ _dispatch_pthread_root_queue_create(const char *label, unsigned long flags, dispatch_pthread_root_queue_context_t pqc; dispatch_queue_flags_t dqf = 0; size_t dqs; - uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ? - (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0; + int32_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ? + (int8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0; dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD; dqs = roundup(dqs, _Alignof(struct dispatch_root_queue_context_s)); @@ -3945,10 +3956,10 @@ _dispatch_runloop_queue_poke(dispatch_queue_t dq, dispatch_qos_t qos, DISPATCH_NOINLINE static void -_dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) +_dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor) { dispatch_root_queue_context_t qc = dq->do_ctxt; - uint32_t i = n; + int remaining = n; int r = ENOSYS; _dispatch_root_queues_init(); @@ -3968,16 +3979,16 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread4, dq, &wh, &gen_cnt); (void)dispatch_assume_zero(r); - } while (--i); + } while (--remaining); return; } #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK #if HAVE_PTHREAD_WORKQUEUE_QOS - r = _pthread_workqueue_addthreads((int)i, + r = _pthread_workqueue_addthreads(remaining, _dispatch_priority_to_pp(dq->dq_priority)); #elif HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority, - qc->dgq_wq_options, (int)i); + qc->dgq_wq_options, remaining); #endif (void)dispatch_assume_zero(r); return; @@ -3987,23 +3998,43 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; if (fastpath(pqc->dpq_thread_mediator.do_vtable)) { while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) { - if (!--i) { + _dispatch_root_queue_debug("signaled sleeping worker for " + "global queue: %p", dq); + if (!--remaining) { return; } } } - uint32_t j, t_count; + + bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT; + if (overcommit) { + os_atomic_add2o(qc, dgq_pending, remaining, relaxed); + } else { + if (!os_atomic_cmpxchg2o(qc, dgq_pending, 0, remaining, relaxed)) { + _dispatch_root_queue_debug("worker thread request still pending for " + "global queue: %p", dq); + return; + } + } + + int32_t can_request, t_count; // seq_cst with atomic store to tail t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered); do { - if (!t_count) { + can_request = t_count < floor ? 0 : t_count - floor; + if (remaining > can_request) { + _dispatch_root_queue_debug("pthread pool reducing request from %d to %d", + remaining, can_request); + os_atomic_sub2o(qc, dgq_pending, remaining - can_request, relaxed); + remaining = can_request; + } + if (remaining == 0) { _dispatch_root_queue_debug("pthread pool is full for root queue: " "%p", dq); return; } - j = i > t_count ? t_count : i; } while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count, - t_count - j, &t_count, acquire)); + t_count - remaining, &t_count, acquire)); pthread_attr_t *attr = &pqc->dpq_thread_attr; pthread_t tid, *pthr = &tid; @@ -4020,13 +4051,13 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) } _dispatch_temporary_resource_shortage(); } - } while (--j); + } while (--remaining); #endif // DISPATCH_USE_PTHREAD_POOL } DISPATCH_NOINLINE void -_dispatch_global_queue_poke(dispatch_queue_t dq, unsigned int n) +_dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor) { if (!_dispatch_queue_class_probe(dq)) { return; @@ -4043,7 +4074,7 @@ _dispatch_global_queue_poke(dispatch_queue_t dq, unsigned int n) return; } #endif // HAVE_PTHREAD_WORKQUEUES - return _dispatch_global_queue_poke_slow(dq, n); + return _dispatch_global_queue_poke_slow(dq, n, floor); } #pragma mark - @@ -5248,7 +5279,7 @@ _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq) (void)os_atomic_dec2o(qc, dgq_pending, relaxed); } if (!available) { - _dispatch_global_queue_poke(dq, 1); + _dispatch_global_queue_poke(dq, 1, 0); } return available; } @@ -5315,7 +5346,7 @@ _dispatch_root_queue_drain_one(dispatch_queue_t dq) } os_atomic_store2o(dq, dq_items_head, next, relaxed); - _dispatch_global_queue_poke(dq, 1); + _dispatch_global_queue_poke(dq, 1, 0); out: return head; } @@ -5412,7 +5443,7 @@ _dispatch_worker_thread4(void *context) dispatch_root_queue_context_t qc = dq->do_ctxt; _dispatch_introspection_thread_add(); - int pending = (int)os_atomic_dec2o(qc, dgq_pending, relaxed); + int pending = os_atomic_dec2o(qc, dgq_pending, relaxed); dispatch_assert(pending >= 0); _dispatch_root_queue_drain(dq, _dispatch_get_priority()); _dispatch_voucher_debug("root queue clear", NULL); @@ -5458,6 +5489,11 @@ _dispatch_worker_thread(void *context) dispatch_root_queue_context_t qc = dq->do_ctxt; dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; + int pending = os_atomic_dec2o(qc, dgq_pending, relaxed); + if (unlikely(pending < 0)) { + DISPATCH_INTERNAL_CRASH(pending, "Pending thread request underflow"); + } + if (pqc->dpq_observer_hooks.queue_will_execute) { _dispatch_set_pthread_root_queue_observer_hooks( &pqc->dpq_observer_hooks); @@ -5475,6 +5511,15 @@ _dispatch_worker_thread(void *context) (void)dispatch_assume_zero(r); _dispatch_introspection_thread_add(); +#if DISPATCH_USE_INTERNAL_WORKQUEUE + bool overcommit = (qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT); + bool manager = (dq == &_dispatch_mgr_root_queue); + bool monitored = !(overcommit || manager); + if (monitored) { + _dispatch_workq_worker_register(dq, qc->dgq_wq_priority); + } +#endif + const int64_t timeout = 5ull * NSEC_PER_SEC; pthread_priority_t old_pri = _dispatch_get_priority(); do { @@ -5483,8 +5528,13 @@ _dispatch_worker_thread(void *context) } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator, dispatch_time(0, timeout)) == 0); +#if DISPATCH_USE_INTERNAL_WORKQUEUE + if (monitored) { + _dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority); + } +#endif (void)os_atomic_inc2o(qc, dgq_thread_pool_size, release); - _dispatch_global_queue_poke(dq, 1); + _dispatch_global_queue_poke(dq, 1, 0); _dispatch_release(dq); return NULL; diff --git a/src/queue_internal.h b/src/queue_internal.h index 29e83cc59..c468eb105 100644 --- a/src/queue_internal.h +++ b/src/queue_internal.h @@ -563,7 +563,7 @@ void _dispatch_queue_resume(dispatch_queue_t dq, bool activate); void _dispatch_queue_finalize_activation(dispatch_queue_t dq); void _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags); -void _dispatch_global_queue_poke(dispatch_queue_t dq, unsigned int n); +void _dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor); void _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou, dispatch_qos_t qos); void _dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq); @@ -948,7 +948,7 @@ struct dispatch_apply_s { dispatch_continuation_t da_dc; dispatch_thread_event_s da_event; dispatch_invoke_flags_t da_flags; - uint32_t da_thr_cnt; + int32_t da_thr_cnt; }; typedef struct dispatch_apply_s *dispatch_apply_t; diff --git a/src/shims.h b/src/shims.h index 8434341ec..c9d727592 100644 --- a/src/shims.h +++ b/src/shims.h @@ -41,6 +41,8 @@ #if HAVE_PTHREAD_WORKQUEUES #if __has_include() #include +#elif DISPATCH_USE_INTERNAL_WORKQUEUE +#include #else #include #endif diff --git a/tests/Makefile.am b/tests/Makefile.am index a47613097..a6526fca1 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -88,7 +88,7 @@ AM_OBJCFLAGS=$(DISPATCH_TESTS_CFLAGS) $(CBLOCKS_FLAGS) AM_CXXFLAGS=$(DISPATCH_TESTS_CFLAGS) $(CXXBLOCKS_FLAGS) $(BSD_OVERLAY_CFLAGS) AM_OBJCXXFLAGS=$(DISPATCH_TESTS_CFLAGS) $(CXXBLOCKS_FLAGS) -if !BUILD_OWN_PTHREAD_WORKQUEUES +if !DISPATCH_USE_INTERNAL_WORKQUEUE if HAVE_PTHREAD_WORKQUEUES PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue endif