Skip to content

Commit d1e4417

Browse files
committed
implement pthread_workqueue within libdispatch
Provide an implementation of the pthread_workqueue functionality as a fully integrated component of libdispatch. Integration of the workqueue implementation into the dispatch code base simplifies the process of evolving the APIs between the two layers and thus prepares for future optimization and enhancements. Initially, the integrated pthread_workqueue is only enabled by default on Linux. Most of the internals are built on pthread APIs and thus should be fairly portable. However, Linux-specific code is used to dynamically estimate the number of runnable worker threads by reading /proc. Porting the thread pool management code to non-Linux platforms would entail providing similar functionality for those platforms (or otherwise restructuring the manager).
1 parent c95febb commit d1e4417

File tree

7 files changed

+978
-18
lines changed

7 files changed

+978
-18
lines changed

configure.ac

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -333,21 +333,41 @@ AS_IF([test -n "$apple_libpthread_source_path" -a -n "$apple_xnu_source_osfmk_pa
333333
AC_CHECK_HEADERS([pthread_machdep.h pthread/qos.h])
334334

335335
# pthread_workqueues.
336-
# Look for own version first, then system version.
337-
AS_IF([test -f $srcdir/libpwq/configure.ac],
338-
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
339-
ac_configure_args="--disable-libpwq-install $ac_configure_args"
340-
AC_CONFIG_SUBDIRS([libpwq])
341-
build_own_pthread_workqueues=true
336+
# Look for own version first, then for libpwq in our source tree, finally system version.
337+
AC_ARG_ENABLE([internal-libpwq],
338+
[AS_HELP_STRING([--enable-internal-libpwq],
339+
[Use libdispatch's own implementation of pthread_workqueue API.])],,
340+
[case $target_os in
341+
linux*)
342+
enable_internal_libpwq=yes
343+
;;
344+
*)
345+
enable_internal_libpwq=no
346+
esac]
347+
)
348+
AS_IF([test "x$enable_internal_libpwq" = "xyes"],
349+
[AC_DEFINE(HAVE_INTERNAL_PTHREAD_WORKQUEUE, 1, [Use libdispatch's own implementation of pthread_workqueue API])
342350
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
343-
have_pthread_workqueues=true],
344-
[build_own_pthread_workqueues=false
345-
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
346-
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
351+
have_internal_pthread_workqueues=true
352+
have_pthread_workqueues=true
353+
build_own_pthread_workqueues=false],
354+
[have_internal_pthread_workqueues=false
355+
AS_IF([test -f $srcdir/libpwq/configure.ac],
356+
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
357+
ac_configure_args="--disable-libpwq-install $ac_configure_args"
358+
AC_CONFIG_SUBDIRS([libpwq])
359+
build_own_pthread_workqueues=true
360+
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
347361
have_pthread_workqueues=true],
348-
[have_pthread_workqueues=false]
349-
)]
362+
[build_own_pthread_workqueues=false
363+
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
364+
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
365+
have_pthread_workqueues=true],
366+
[have_pthread_workqueues=false]
367+
)]
368+
)]
350369
)
370+
AM_CONDITIONAL(HAVE_INTERNAL_PTHREAD_WORKQUEUES, $have_internal_pthread_workqueues)
351371
AM_CONDITIONAL(BUILD_OWN_PTHREAD_WORKQUEUES, $build_own_pthread_workqueues)
352372
AM_CONDITIONAL(HAVE_PTHREAD_WORKQUEUES, $have_pthread_workqueues)
353373

src/Makefile.am

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ else
99
lib_LTLIBRARIES=libdispatch.la
1010
endif
1111

12+
if HAVE_INTERNAL_PTHREAD_WORKQUEUES
13+
INTERNAL_WORKQUEUE_SOURCES= \
14+
workqueue/workqueue.c \
15+
workqueue/workqueue_internal.h
16+
endif
17+
1218
libdispatch_la_SOURCES= \
1319
allocator.c \
1420
apply.c \
@@ -53,7 +59,8 @@ libdispatch_la_SOURCES= \
5359
shims/perfmon.h \
5460
shims/time.h \
5561
shims/tsd.h \
56-
shims/yield.h
62+
shims/yield.h \
63+
$(INTERNAL_WORKQUEUE_SOURCES)
5764

5865
EXTRA_libdispatch_la_SOURCES=
5966
EXTRA_libdispatch_la_DEPENDENCIES=
@@ -75,6 +82,7 @@ if BUILD_OWN_KQUEUES
7582
KQUEUE_CFLAGS+=-I$(top_srcdir)/libkqueue/include
7683
endif
7784

85+
if !HAVE_INTERNAL_PTHREAD_WORKQUEUES
7886
if BUILD_OWN_PTHREAD_WORKQUEUES
7987
PTHREAD_WORKQUEUE_LIBS=$(top_builddir)/libpwq/libpthread_workqueue.la
8088
PTHREAD_WORKQUEUE_CFLAGS=-I$(top_srcdir)/libpwq/include
@@ -83,6 +91,7 @@ if HAVE_PTHREAD_WORKQUEUES
8391
PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue
8492
endif
8593
endif
94+
endif
8695

8796
if BUILD_OWN_BLOCKS_RUNTIME
8897
libdispatch_la_SOURCES+= BlocksRuntime/data.c BlocksRuntime/runtime.c

src/queue.c

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#endif
3737
#if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
3838
!HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39+
!HAVE_INTERNAL_PTHREAD_WORKQUEUE && \
3940
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
4041
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
4142
#endif
@@ -44,7 +45,7 @@
4445
#define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
4546
#endif
4647
#if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47-
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48+
!(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || HAVE_INTERNAL_PTHREAD_WORKQUEUE)
4849
#define pthread_workqueue_t void*
4950
#endif
5051

@@ -70,7 +71,7 @@ static void _dispatch_worker_thread2(int priority, int options, void *context);
7071
#endif
7172
#if DISPATCH_USE_PTHREAD_POOL
7273
static void *_dispatch_worker_thread(void *context);
73-
static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
74+
int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
7475
#endif
7576

7677
#if DISPATCH_COCOA_COMPAT
@@ -159,7 +160,7 @@ struct dispatch_root_queue_context_s {
159160
#if HAVE_PTHREAD_WORKQUEUES
160161
qos_class_t dgq_qos;
161162
int dgq_wq_priority, dgq_wq_options;
162-
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
163+
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL || HAVE_INTERNAL_PTHREAD_WORKQUEUE
163164
pthread_workqueue_t dgq_kworkqueue;
164165
#endif
165166
#endif // HAVE_PTHREAD_WORKQUEUES
@@ -719,7 +720,7 @@ _dispatch_root_queues_init_workq(int *wq_supported)
719720
result = !r;
720721
}
721722
#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
722-
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
723+
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL || HAVE_INTERNAL_PTHREAD_WORKQUEUE
723724
if (!result) {
724725
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
725726
pthread_workqueue_attr_t pwq_attr;
@@ -747,6 +748,15 @@ _dispatch_root_queues_init_workq(int *wq_supported)
747748
result = result || dispatch_assume(pwq);
748749
}
749750
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
751+
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
752+
if (!disable_wq && qc->dgq_wq_priority != WORKQ_PRIO_INVALID) {
753+
int priority = qc->dgq_wq_priority;
754+
int overcommit = qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT;
755+
r = dispatch_workqueue_get_wq(&pwq, priority, overcommit);
756+
(void)dispatch_assume_zero(r);
757+
result = result || dispatch_assume(pwq);
758+
}
759+
#endif // HAVE_INTERNAL_PTHREAD_WORKQUEUE
750760
qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
751761
}
752762
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
@@ -756,7 +766,7 @@ _dispatch_root_queues_init_workq(int *wq_supported)
756766
}
757767
#endif
758768
}
759-
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
769+
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL || HAVE_INTERNAL_PTHREAD_WORKQUEUE
760770
#endif // HAVE_PTHREAD_WORKQUEUES
761771
return result;
762772
}
@@ -4275,6 +4285,16 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n)
42754285
return;
42764286
}
42774287
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4288+
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
4289+
if (qc->dgq_kworkqueue) {
4290+
do {
4291+
r = dispatch_workqueue_additem_np(qc->dgq_kworkqueue,
4292+
_dispatch_worker_thread4, dq);
4293+
(void)dispatch_assume_zero(r);
4294+
} while (--i);
4295+
return;
4296+
}
4297+
#endif // HAVE_INTERNAL_PTHREAD_WORKQUEUE
42784298
#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
42794299
if (!dq->dq_priority) {
42804300
r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,

src/shims.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,12 @@ enum {
119119
#if __has_include(<pthread/workqueue_private.h>)
120120
#include <pthread/workqueue_private.h>
121121
#else
122+
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
123+
#include <workqueue/workqueue_internal.h>
124+
#else
122125
#include <pthread_workqueue.h>
123126
#endif
127+
#endif
124128
#ifndef WORKQ_FEATURE_MAINTENANCE
125129
#define WORKQ_FEATURE_MAINTENANCE 0x10
126130
#endif

0 commit comments

Comments
 (0)