From 703b1d5bbc663a219207bef7aeaa770933344d54 Mon Sep 17 00:00:00 2001 From: David Grove Date: Wed, 25 Jan 2017 17:29:01 -0500 Subject: [PATCH] implement pthread_workqueue within libdispatch Provide an implementation of the pthread_workqueue functionality (libpwq) as a fully integrated component of libdispatch. Integration of the workqueue implementation into the libdispatch code base simplifies the process of evolving the APIs between the two layers and thus prepares for future optimization and enhancements. The overall design is to augment libdispatch's existing pthread_root_queue option with periodic user-space monitoring of the status of a root queue's worker threads and to oversubscribe the workqueue when not enough workers are observed to be runnable when the queue has available work. Initially, the integrated pthread_workqueue is only enabled by default on Linux. The current monitoring implementation relies on Linux-specific code to dynamically estimate the number of runnable worker threads by reading /proc. Porting the monitoring code to non-Linux platforms would entail providing similar functionality for those platforms. Remove libpwq git submodule and autotools support for building libpwq from source as part of building libdispatch. --- .gitmodules | 3 - Makefile.am | 5 - configure.ac | 27 ++-- libpwq | 1 - src/Makefile.am | 16 +- src/apply.c | 14 +- src/event/workqueue.c | 273 +++++++++++++++++++++++++++++++++ src/event/workqueue_internal.h | 52 +++++++ src/inline_internal.h | 2 +- src/queue.c | 104 +++++++++---- src/queue_internal.h | 4 +- src/shims.h | 2 + tests/Makefile.am | 2 +- 13 files changed, 445 insertions(+), 60 deletions(-) delete mode 160000 libpwq create mode 100644 src/event/workqueue.c create mode 100644 src/event/workqueue_internal.h diff --git a/.gitmodules b/.gitmodules index e6068b432..e69de29bb 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +0,0 @@ -[submodule "libpwq"] - path = libpwq - url = https://github.com/mheily/libpwq.git diff --git a/Makefile.am b/Makefile.am index ffc82df29..f1be02951 100644 --- a/Makefile.am +++ b/Makefile.am @@ -4,17 +4,12 @@ ACLOCAL_AMFLAGS = -I m4 -if BUILD_OWN_PTHREAD_WORKQUEUES - MAYBE_PTHREAD_WORKQUEUES = libpwq -endif - if BUILD_TESTS MAYBE_TESTS = tests endif SUBDIRS= \ dispatch \ - $(MAYBE_PTHREAD_WORKQUEUES) \ man \ os \ private \ diff --git a/configure.ac b/configure.ac index 00e546bf1..7db3d60f9 100644 --- a/configure.ac +++ b/configure.ac @@ -320,22 +320,31 @@ AS_IF([test -n "$apple_libpthread_source_path" -a -n "$apple_xnu_source_osfmk_pa AC_CHECK_HEADERS([pthread_machdep.h pthread/qos.h]) # pthread_workqueues. -# Look for own version first, then system version. -AS_IF([test -f $srcdir/libpwq/configure.ac], - [AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source]) - ac_configure_args="--disable-libpwq-install $ac_configure_args" - AC_CONFIG_SUBDIRS([libpwq]) - build_own_pthread_workqueues=true +# Look for own version first, than see if there is a system version. +AC_ARG_ENABLE([internal-libpwq], + [AS_HELP_STRING([--enable-internal-libpwq], + [Use libdispatch's own implementation of pthread workqueues.])],, + [case $target_os in + linux*) + enable_internal_libpwq=yes + ;; + *) + enable_internal_libpwq=no + esac] +) +AS_IF([test "x$enable_internal_libpwq" = "xyes"], + [AC_DEFINE(DISPATCH_USE_INTERNAL_WORKQUEUE, 1, [Use libdispatch's own implementation of pthread_workqueue API]) AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present]) + dispatch_use_internal_workqueue=true have_pthread_workqueues=true], - [build_own_pthread_workqueues=false + [dispatch_use_internal_workqueue=false AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h], [AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present]) have_pthread_workqueues=true], [have_pthread_workqueues=false] - )] + )] ) -AM_CONDITIONAL(BUILD_OWN_PTHREAD_WORKQUEUES, $build_own_pthread_workqueues) +AM_CONDITIONAL(DISPATCH_USE_INTERNAL_WORKQUEUE, $dispatch_use_internal_workqueue) AM_CONDITIONAL(HAVE_PTHREAD_WORKQUEUES, $have_pthread_workqueues) AC_CHECK_HEADERS([libproc_internal.h], [], [], [#include ]) diff --git a/libpwq b/libpwq deleted file mode 160000 index 18437d2be..000000000 --- a/libpwq +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 18437d2be372f4422b207ec6442c8caf7974025d diff --git a/src/Makefile.am b/src/Makefile.am index a0ddbc98b..ac2f74cfd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -9,6 +9,12 @@ else lib_LTLIBRARIES=libdispatch.la endif +if DISPATCH_USE_INTERNAL_WORKQUEUE +INTERNAL_WORKQUEUE_SOURCES= \ + event/workqueue.c \ + event/workqueue_internal.h +endif + libdispatch_la_SOURCES= \ allocator.c \ apply.c \ @@ -60,7 +66,8 @@ libdispatch_la_SOURCES= \ shims/perfmon.h \ shims/time.h \ shims/tsd.h \ - shims/yield.h + shims/yield.h \ + $(INTERNAL_WORKQUEUE_SOURCES) EXTRA_libdispatch_la_SOURCES= EXTRA_libdispatch_la_DEPENDENCIES= @@ -77,12 +84,13 @@ AM_OBJCFLAGS=$(DISPATCH_CFLAGS) $(CBLOCKS_FLAGS) AM_CXXFLAGS=$(PTHREAD_WORKQUEUE_CFLAGS) $(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS) AM_OBJCXXFLAGS=$(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS) -if BUILD_OWN_PTHREAD_WORKQUEUES - PTHREAD_WORKQUEUE_LIBS=$(top_builddir)/libpwq/libpthread_workqueue.la - PTHREAD_WORKQUEUE_CFLAGS=-I$(top_srcdir)/libpwq/include +if DISPATCH_USE_INTERNAL_WORKQUEUE + PTHREAD_WORKQUEUE_LIBS= + PTHREAD_WORKQUEUE_CFLAGS= else if HAVE_PTHREAD_WORKQUEUES PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue + PTHREAD_WORKQUEUE_CFLAGS= endif endif diff --git a/src/apply.c b/src/apply.c index 79c4e9594..0f1d85d77 100644 --- a/src/apply.c +++ b/src/apply.c @@ -159,11 +159,11 @@ static inline void _dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da, dispatch_function_t func) { - uint32_t i = 0; + int32_t i = 0; dispatch_continuation_t head = NULL, tail = NULL; // The current thread does not need a continuation - uint32_t continuation_cnt = da->da_thr_cnt - 1; + int32_t continuation_cnt = da->da_thr_cnt - 1; dispatch_assert(continuation_cnt); @@ -192,14 +192,14 @@ static void _dispatch_apply_redirect(void *ctxt) { dispatch_apply_t da = (dispatch_apply_t)ctxt; - uint32_t da_width = da->da_thr_cnt - 1; + int32_t da_width = da->da_thr_cnt - 1; dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq; do { - uint32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width); + int32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width); if (slowpath(da_width > width)) { - uint32_t excess = da_width - width; + int32_t excess = da_width - width; for (tq = dq; tq != rq; tq = tq->do_targetq) { _dispatch_queue_relinquish_width(tq, excess); } @@ -234,7 +234,7 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, if (slowpath(iterations == 0)) { return; } - uint32_t thr_cnt = dispatch_hw_config(active_cpus); + int32_t thr_cnt = dispatch_hw_config(active_cpus); dispatch_thread_context_t dtctxt = _dispatch_thread_context_find(_dispatch_apply_key); size_t nested = dtctxt ? dtctxt->dtc_apply_nesting : 0; dispatch_queue_t old_dq = _dispatch_queue_get_current(); @@ -247,7 +247,7 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, ? nested * iterations : DISPATCH_APPLY_MAX; } if (iterations < thr_cnt) { - thr_cnt = (uint32_t)iterations; + thr_cnt = iterations; } if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) { dq = old_dq ? old_dq : _dispatch_get_root_queue( diff --git a/src/event/workqueue.c b/src/event/workqueue.c new file mode 100644 index 000000000..0b9bc0ac5 --- /dev/null +++ b/src/event/workqueue.c @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2017-2017 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +#include "internal.h" + +#if DISPATCH_USE_INTERNAL_WORKQUEUE + +// forward looking typedef; not yet defined in dispatch +typedef pid_t dispatch_tid; + +/* + * dispatch_workq monitors the thread pool that is + * executing the work enqueued on libdispatch's pthread + * root queues and dynamically adjusts its size. + * + * The dynamic monitoring could be implemented using either + * (a) low-frequency user-level approximation of the number of runnable + * worker threads via reading the /proc file system + * (b) a Linux kernel extension that hooks the process change handler + * to accurately track the number of runnable normal worker threads + * This file provides an implementation of option (a). + * + * Using either form of monitoring, if (i) there appears to be + * work available in the monitored pthread root queue, (ii) the + * number of runnable workers is below the target size for the pool, + * and (iii) the total number of worker threads is below an upper limit, + * then an additional worker thread will be added to the pool. + */ + +#pragma mark static data for monitoring subsystem + +/* + * State for the user-level monitoring of a workqueue. + */ +typedef struct dispatch_workq_monitor_s { + /* The dispatch_queue we are monitoring */ + dispatch_queue_t dq; + + /* The observed number of runnable worker threads */ + int32_t num_runnable; + + /* The desired number of runnable worker threads */ + int32_t target_runnable; + + /* + * Tracking of registered workers; all accesses must hold lock. + * Invariant: registered_tids[0]...registered_tids[num_registered_tids-1] + * contain the dispatch_tids of the worker threads we are monitoring. + */ + dispatch_unfair_lock_s registered_tid_lock; + dispatch_tid *registered_tids; + int num_registered_tids; +} dispatch_workq_monitor_s, *dispatch_workq_monitor_t; + +static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES]; + +#pragma mark Implementation of the monitoring subsystem. + +#define WORKQ_MAX_TRACKED_TIDS DISPATCH_WORKQ_MAX_PTHREAD_COUNT +#define WORKQ_OVERSUBSCRIBE_FACTOR 2 + +static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED); +static dispatch_once_t _dispatch_workq_init_once_pred; + +void +_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority) +{ + dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once); + +#if HAVE_DISPATCH_WORKQ_MONITORING + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority]; + dispatch_assert(mon->dq == root_q); + dispatch_tid tid = _dispatch_thread_getspecific(tid); + _dispatch_unfair_lock_lock(&mon->registered_tid_lock); + dispatch_assert(mon->num_registered_tids < WORKQ_MAX_TRACKED_TIDS-1); + int worker_id = mon->num_registered_tids++; + mon->registered_tids[worker_id] = tid; + _dispatch_unfair_lock_unlock(&mon->registered_tid_lock); +#endif // HAVE_DISPATCH_WORKQ_MONITORING +} + +void +_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority) +{ +#if HAVE_DISPATCH_WORKQ_MONITORING + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority]; + dispatch_tid tid = _dispatch_thread_getspecific(tid); + _dispatch_unfair_lock_lock(&mon->registered_tid_lock); + for (int i = 0; i < mon->num_registered_tids; i++) { + if (mon->registered_tids[i] == tid) { + int last = mon->num_registered_tids - 1; + mon->registered_tids[i] = mon->registered_tids[last]; + mon->registered_tids[last] = 0; + mon->num_registered_tids--; + break; + } + } + _dispatch_unfair_lock_unlock(&mon->registered_tid_lock); +#endif // HAVE_DISPATCH_WORKQ_MONITORING +} + + +#if HAVE_DISPATCH_WORKQ_MONITORING +#if defined(__linux__) +/* + * For each pid that is a registered worker, read /proc/[pid]/stat + * to get a count of the number of them that are actually runnable. + * See the proc(5) man page for the format of the contents of /proc/[pid]/stat + */ +static void +_dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon) +{ + char path[128]; + char buf[4096]; + int running_count = 0; + + _dispatch_unfair_lock_lock(&mon->registered_tid_lock); + + for (int i = 0; i < mon->num_registered_tids; i++) { + dispatch_tid tid = mon->registered_tids[i]; + int fd; + size_t bytes_read = -1; + + int r = snprintf(path, sizeof(path), "/proc/%d/stat", tid); + dispatch_assert(r > 0 && r < sizeof(path)); + + fd = open(path, O_RDONLY | O_NONBLOCK); + if (unlikely(fd == -1)) { + DISPATCH_CLIENT_CRASH(tid, + "workq: registered worker exited prematurely"); + } else { + bytes_read = read(fd, buf, sizeof(buf)-1); + (void)close(fd); + } + + if (bytes_read > 0) { + buf[bytes_read] = '\0'; + char state; + if (sscanf(buf, "%*d %*s %c", &state) == 1) { + // _dispatch_debug("workq: Worker %d, state %c\n", tid, state); + if (state == 'R') { + running_count++; + } + } else { + _dispatch_debug("workq: sscanf of state failed for %d", tid); + } + } else { + _dispatch_debug("workq: Failed to read %s", path); + } + } + + mon->num_runnable = running_count; + + _dispatch_unfair_lock_unlock(&mon->registered_tid_lock); +} +#else +#error must define _dispatch_workq_count_runnable_workers +#endif + +static void +_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED) +{ + // TODO: Once we switch away from the legacy priorities to + // newer QoS, we can loop in order of decreasing QoS + // and track the total number of runnable threads seen + // across pools. We can then use that number to + // implement a global policy where low QoS queues + // are not eligible for over-subscription if the higher + // QoS queues have already consumed the target + // number of threads. + for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) { + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i]; + dispatch_queue_t dq = mon->dq; + + if (!_dispatch_queue_class_probe(dq)) { + _dispatch_debug("workq: %s is empty.", dq->dq_label); + continue; + } + + _dispatch_workq_count_runnable_workers(mon); + _dispatch_debug("workq: %s has %d runnable wokers (target is %d)", + dq->dq_label, mon->num_runnable, mon->target_runnable); + + if (mon->num_runnable == 0) { + // We are below target, and no worker is runnable. + // It is likely the program is stalled. Therefore treat + // this as if dq were an overcommit queue and call poke + // with the limit being the maximum number of workers for dq. + int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS; + _dispatch_debug("workq: %s has no runnable workers; poking with floor %d", + dq->dq_label, floor); + _dispatch_global_queue_poke(dq, 1, floor); + } else if (mon->num_runnable < mon->target_runnable) { + // We are below target, but some workers are still runnable. + // We want to oversubscribe to hit the desired load target. + // However, this under-utilization may be transitory so set the + // floor as a small multiple of threads per core. + int32_t floor = (1 - WORKQ_OVERSUBSCRIBE_FACTOR) * mon->target_runnable; + int32_t floor2 = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS; + floor = MAX(floor, floor2); + _dispatch_debug("workq: %s under utilization target; poking with floor %d", + dq->dq_label, floor); + _dispatch_global_queue_poke(dq, 1, floor); + } + } +} +#endif // HAVE_DISPATCH_WORKQ_MONITORING + + +// temporary until we switch over to QoS based interface. +static dispatch_queue_t +get_root_queue_from_legacy_priority(int priority) +{ + switch (priority) { + case WORKQ_HIGH_PRIOQUEUE: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS]; + case WORKQ_DEFAULT_PRIOQUEUE: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS]; + case WORKQ_LOW_PRIOQUEUE: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS]; + case WORKQ_BG_PRIOQUEUE: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]; + case WORKQ_BG_PRIOQUEUE_CONDITIONAL: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS]; + case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL: + return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]; + default: + return NULL; + } +} + +static void +_dispatch_workq_init_once(void *context DISPATCH_UNUSED) +{ +#if HAVE_DISPATCH_WORKQ_MONITORING + int target_runnable = dispatch_hw_config(active_cpus); + for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) { + dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i]; + mon->dq = get_root_queue_from_legacy_priority(i); + void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid)); + mon->registered_tids = buf; + mon->target_runnable = target_runnable; + } + + // Create monitoring timer that will periodically run on dispatch_mgr_q + dispatch_source_t ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, + 0, 0, &_dispatch_mgr_q); + dispatch_source_set_timer(ds, dispatch_time(DISPATCH_TIME_NOW, 0), + NSEC_PER_SEC, 0); + dispatch_source_set_event_handler_f(ds, _dispatch_workq_monitor_pools); + dispatch_set_context(ds, ds); // avoid appearing as leaked + dispatch_activate(ds); +#endif // HAVE_DISPATCH_WORKQ_MONITORING +} + +#endif // DISPATCH_USE_INTERNAL_WORKQUEUE diff --git a/src/event/workqueue_internal.h b/src/event/workqueue_internal.h new file mode 100644 index 000000000..012e554fb --- /dev/null +++ b/src/event/workqueue_internal.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017-2017 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +/* + * IMPORTANT: This header file describes INTERNAL interfaces to libdispatch + * which are subject to change in future releases of Mac OS X. Any applications + * relying on these interfaces WILL break. + */ + +#ifndef __DISPATCH_WORKQUEUE_INTERNAL__ +#define __DISPATCH_WORKQUEUE_INTERNAL__ + +/* Work queue priority attributes. */ +#define WORKQ_HIGH_PRIOQUEUE 0 +#define WORKQ_DEFAULT_PRIOQUEUE 1 +#define WORKQ_LOW_PRIOQUEUE 2 +#define WORKQ_BG_PRIOQUEUE 3 +#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4 +#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5 + +#define WORKQ_NUM_PRIORITIES 6 + +#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255 + +void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority); +void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority); + +#if defined(__linux__) +#define HAVE_DISPATCH_WORKQ_MONITORING 1 +#else +#define HAVE_DISPATCH_WORKQ_MONITORING 0 +#endif + +#endif /* __DISPATCH_WORKQUEUE_INTERNAL__ */ + diff --git a/src/inline_internal.h b/src/inline_internal.h index a7ccb8260..d76b77aff 100644 --- a/src/inline_internal.h +++ b/src/inline_internal.h @@ -1479,7 +1479,7 @@ _dispatch_root_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _head, struct dispatch_object_s *head = _head._do, *tail = _tail._do; if (unlikely(_dispatch_queue_push_update_tail_list(dq, head, tail))) { _dispatch_queue_push_update_head(dq, head); - return _dispatch_global_queue_poke(dq, n); + return _dispatch_global_queue_poke(dq, n, 0); } } diff --git a/src/queue.c b/src/queue.c index 088c5cfd2..80a38c62a 100644 --- a/src/queue.c +++ b/src/queue.c @@ -23,7 +23,7 @@ #include "protocol.h" // _dispatch_send_wakeup_runloop_thread #endif -#if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \ +#if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG || DISPATCH_USE_INTERNAL_WORKQUEUE) && \ !defined(DISPATCH_ENABLE_THREAD_POOL) #define DISPATCH_ENABLE_THREAD_POOL 1 #endif @@ -32,6 +32,7 @@ #endif #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) && \ !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \ + !DISPATCH_USE_INTERNAL_WORKQUEUE && \ !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK) #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1 #endif @@ -143,12 +144,14 @@ static struct dispatch_pthread_root_queue_context_s }; #endif -#define MAX_PTHREAD_COUNT 255 +#ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT +#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255 +#endif struct dispatch_root_queue_context_s { union { struct { - unsigned int volatile dgq_pending; + int volatile dgq_pending; #if HAVE_PTHREAD_WORKQUEUES qos_class_t dgq_qos; int dgq_wq_priority, dgq_wq_options; @@ -158,7 +161,7 @@ struct dispatch_root_queue_context_s { #endif // HAVE_PTHREAD_WORKQUEUES #if DISPATCH_USE_PTHREAD_POOL void *dgq_ctxt; - uint32_t volatile dgq_thread_pool_size; + int32_t volatile dgq_thread_pool_size; #endif }; char _dgq_pad[DISPATCH_CACHELINE_SIZE]; @@ -663,7 +666,15 @@ _dispatch_root_queues_init_workq(int *wq_supported) result = result || dispatch_assume(pwq); } #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK - qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul); + if (pwq) { + qc->dgq_kworkqueue = pwq; + } else { + qc->dgq_kworkqueue = (void*)(~0ul); + // because the fastpath of _dispatch_global_queue_poke didn't + // know yet that we're using the internal pool implementation + // we have to undo its setting of dgq_pending + qc->dgq_pending = 0; + } } #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK if (!disable_wq) { @@ -680,10 +691,10 @@ _dispatch_root_queues_init_workq(int *wq_supported) #if DISPATCH_USE_PTHREAD_POOL static inline void _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc, - uint8_t pool_size, bool overcommit) + int32_t pool_size, bool overcommit) { dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; - uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT : + int32_t thread_pool_size = overcommit ? DISPATCH_WORKQ_MAX_PTHREAD_COUNT : dispatch_hw_config(active_cpus); if (slowpath(pool_size) && pool_size < thread_pool_size) { thread_pool_size = pool_size; @@ -716,7 +727,7 @@ _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED) size_t i; for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { bool overcommit = true; -#if TARGET_OS_EMBEDDED +#if TARGET_OS_EMBEDDED || (DISPATCH_USE_INTERNAL_WORKQUEUE && HAVE_DISPATCH_WORKQ_MONITORING) // some software hangs if the non-overcommitting queues do not // overcommit when threads block. Someday, this behavior should // apply to all platforms @@ -1979,8 +1990,8 @@ _dispatch_pthread_root_queue_create(const char *label, unsigned long flags, dispatch_pthread_root_queue_context_t pqc; dispatch_queue_flags_t dqf = 0; size_t dqs; - uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ? - (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0; + int32_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ? + (int8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0; dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD; dqs = roundup(dqs, _Alignof(struct dispatch_root_queue_context_s)); @@ -3945,10 +3956,10 @@ _dispatch_runloop_queue_poke(dispatch_queue_t dq, dispatch_qos_t qos, DISPATCH_NOINLINE static void -_dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) +_dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor) { dispatch_root_queue_context_t qc = dq->do_ctxt; - uint32_t i = n; + int remaining = n; int r = ENOSYS; _dispatch_root_queues_init(); @@ -3968,16 +3979,16 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread4, dq, &wh, &gen_cnt); (void)dispatch_assume_zero(r); - } while (--i); + } while (--remaining); return; } #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK #if HAVE_PTHREAD_WORKQUEUE_QOS - r = _pthread_workqueue_addthreads((int)i, + r = _pthread_workqueue_addthreads(remaining, _dispatch_priority_to_pp(dq->dq_priority)); #elif HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority, - qc->dgq_wq_options, (int)i); + qc->dgq_wq_options, remaining); #endif (void)dispatch_assume_zero(r); return; @@ -3987,23 +3998,43 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; if (fastpath(pqc->dpq_thread_mediator.do_vtable)) { while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) { - if (!--i) { + _dispatch_root_queue_debug("signaled sleeping worker for " + "global queue: %p", dq); + if (!--remaining) { return; } } } - uint32_t j, t_count; + + bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT; + if (overcommit) { + os_atomic_add2o(qc, dgq_pending, remaining, relaxed); + } else { + if (!os_atomic_cmpxchg2o(qc, dgq_pending, 0, remaining, relaxed)) { + _dispatch_root_queue_debug("worker thread request still pending for " + "global queue: %p", dq); + return; + } + } + + int32_t can_request, t_count; // seq_cst with atomic store to tail t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered); do { - if (!t_count) { + can_request = t_count < floor ? 0 : t_count - floor; + if (remaining > can_request) { + _dispatch_root_queue_debug("pthread pool reducing request from %d to %d", + remaining, can_request); + os_atomic_sub2o(qc, dgq_pending, remaining - can_request, relaxed); + remaining = can_request; + } + if (remaining == 0) { _dispatch_root_queue_debug("pthread pool is full for root queue: " "%p", dq); return; } - j = i > t_count ? t_count : i; } while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count, - t_count - j, &t_count, acquire)); + t_count - remaining, &t_count, acquire)); pthread_attr_t *attr = &pqc->dpq_thread_attr; pthread_t tid, *pthr = &tid; @@ -4020,13 +4051,13 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) } _dispatch_temporary_resource_shortage(); } - } while (--j); + } while (--remaining); #endif // DISPATCH_USE_PTHREAD_POOL } DISPATCH_NOINLINE void -_dispatch_global_queue_poke(dispatch_queue_t dq, unsigned int n) +_dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor) { if (!_dispatch_queue_class_probe(dq)) { return; @@ -4043,7 +4074,7 @@ _dispatch_global_queue_poke(dispatch_queue_t dq, unsigned int n) return; } #endif // HAVE_PTHREAD_WORKQUEUES - return _dispatch_global_queue_poke_slow(dq, n); + return _dispatch_global_queue_poke_slow(dq, n, floor); } #pragma mark - @@ -5248,7 +5279,7 @@ _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq) (void)os_atomic_dec2o(qc, dgq_pending, relaxed); } if (!available) { - _dispatch_global_queue_poke(dq, 1); + _dispatch_global_queue_poke(dq, 1, 0); } return available; } @@ -5315,7 +5346,7 @@ _dispatch_root_queue_drain_one(dispatch_queue_t dq) } os_atomic_store2o(dq, dq_items_head, next, relaxed); - _dispatch_global_queue_poke(dq, 1); + _dispatch_global_queue_poke(dq, 1, 0); out: return head; } @@ -5412,7 +5443,7 @@ _dispatch_worker_thread4(void *context) dispatch_root_queue_context_t qc = dq->do_ctxt; _dispatch_introspection_thread_add(); - int pending = (int)os_atomic_dec2o(qc, dgq_pending, relaxed); + int pending = os_atomic_dec2o(qc, dgq_pending, relaxed); dispatch_assert(pending >= 0); _dispatch_root_queue_drain(dq, _dispatch_get_priority()); _dispatch_voucher_debug("root queue clear", NULL); @@ -5458,6 +5489,11 @@ _dispatch_worker_thread(void *context) dispatch_root_queue_context_t qc = dq->do_ctxt; dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; + int pending = os_atomic_dec2o(qc, dgq_pending, relaxed); + if (unlikely(pending < 0)) { + DISPATCH_INTERNAL_CRASH(pending, "Pending thread request underflow"); + } + if (pqc->dpq_observer_hooks.queue_will_execute) { _dispatch_set_pthread_root_queue_observer_hooks( &pqc->dpq_observer_hooks); @@ -5475,6 +5511,15 @@ _dispatch_worker_thread(void *context) (void)dispatch_assume_zero(r); _dispatch_introspection_thread_add(); +#if DISPATCH_USE_INTERNAL_WORKQUEUE + bool overcommit = (qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT); + bool manager = (dq == &_dispatch_mgr_root_queue); + bool monitored = !(overcommit || manager); + if (monitored) { + _dispatch_workq_worker_register(dq, qc->dgq_wq_priority); + } +#endif + const int64_t timeout = 5ull * NSEC_PER_SEC; pthread_priority_t old_pri = _dispatch_get_priority(); do { @@ -5483,8 +5528,13 @@ _dispatch_worker_thread(void *context) } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator, dispatch_time(0, timeout)) == 0); +#if DISPATCH_USE_INTERNAL_WORKQUEUE + if (monitored) { + _dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority); + } +#endif (void)os_atomic_inc2o(qc, dgq_thread_pool_size, release); - _dispatch_global_queue_poke(dq, 1); + _dispatch_global_queue_poke(dq, 1, 0); _dispatch_release(dq); return NULL; diff --git a/src/queue_internal.h b/src/queue_internal.h index 29e83cc59..c468eb105 100644 --- a/src/queue_internal.h +++ b/src/queue_internal.h @@ -563,7 +563,7 @@ void _dispatch_queue_resume(dispatch_queue_t dq, bool activate); void _dispatch_queue_finalize_activation(dispatch_queue_t dq); void _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags); -void _dispatch_global_queue_poke(dispatch_queue_t dq, unsigned int n); +void _dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor); void _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou, dispatch_qos_t qos); void _dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq); @@ -948,7 +948,7 @@ struct dispatch_apply_s { dispatch_continuation_t da_dc; dispatch_thread_event_s da_event; dispatch_invoke_flags_t da_flags; - uint32_t da_thr_cnt; + int32_t da_thr_cnt; }; typedef struct dispatch_apply_s *dispatch_apply_t; diff --git a/src/shims.h b/src/shims.h index 8434341ec..c9d727592 100644 --- a/src/shims.h +++ b/src/shims.h @@ -41,6 +41,8 @@ #if HAVE_PTHREAD_WORKQUEUES #if __has_include() #include +#elif DISPATCH_USE_INTERNAL_WORKQUEUE +#include #else #include #endif diff --git a/tests/Makefile.am b/tests/Makefile.am index a47613097..a6526fca1 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -88,7 +88,7 @@ AM_OBJCFLAGS=$(DISPATCH_TESTS_CFLAGS) $(CBLOCKS_FLAGS) AM_CXXFLAGS=$(DISPATCH_TESTS_CFLAGS) $(CXXBLOCKS_FLAGS) $(BSD_OVERLAY_CFLAGS) AM_OBJCXXFLAGS=$(DISPATCH_TESTS_CFLAGS) $(CXXBLOCKS_FLAGS) -if !BUILD_OWN_PTHREAD_WORKQUEUES +if !DISPATCH_USE_INTERNAL_WORKQUEUE if HAVE_PTHREAD_WORKQUEUES PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue endif