Skip to content

Commit 8ddcbbd

Browse files
committed
implement pthread_workqueue within libdispatch
Provide an implementation of the pthread_workqueue functionality as a fully integrated component of libdispatch. Integration of the workqueue implementation into the dispatch code base simplifies the process of evolving the APIs between the two layers and thus prepares for future optimization and enhancements. Initially, the integrated pthread_workqueue is only enabled by default on Linux. Most of the internals are built on pthread APIs and thus should be fairly portable. However, Linux-specific code is used to dynamically estimate the number of runnable worker threads by reading /proc. Porting the thread pool management code to non-Linux platforms would entail providing similar functionality for those platforms (or otherwise restructuring the manager).
1 parent 272e818 commit 8ddcbbd

File tree

7 files changed

+978
-18
lines changed

7 files changed

+978
-18
lines changed

configure.ac

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -320,21 +320,41 @@ AS_IF([test -n "$apple_libpthread_source_path" -a -n "$apple_xnu_source_osfmk_pa
320320
AC_CHECK_HEADERS([pthread_machdep.h pthread/qos.h])
321321

322322
# pthread_workqueues.
323-
# Look for own version first, then system version.
324-
AS_IF([test -f $srcdir/libpwq/configure.ac],
325-
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
326-
ac_configure_args="--disable-libpwq-install $ac_configure_args"
327-
AC_CONFIG_SUBDIRS([libpwq])
328-
build_own_pthread_workqueues=true
323+
# Look for own version first, then for libpwq in our source tree, finally system version.
324+
AC_ARG_ENABLE([internal-libpwq],
325+
[AS_HELP_STRING([--enable-internal-libpwq],
326+
[Use libdispatch's own implementation of pthread_workqueue API.])],,
327+
[case $target_os in
328+
linux*)
329+
enable_internal_libpwq=yes
330+
;;
331+
*)
332+
enable_internal_libpwq=no
333+
esac]
334+
)
335+
AS_IF([test "x$enable_internal_libpwq" = "xyes"],
336+
[AC_DEFINE(HAVE_INTERNAL_PTHREAD_WORKQUEUE, 1, [Use libdispatch's own implementation of pthread_workqueue API])
329337
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
330-
have_pthread_workqueues=true],
331-
[build_own_pthread_workqueues=false
332-
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
333-
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
338+
have_internal_pthread_workqueues=true
339+
have_pthread_workqueues=true
340+
build_own_pthread_workqueues=false],
341+
[have_internal_pthread_workqueues=false
342+
AS_IF([test -f $srcdir/libpwq/configure.ac],
343+
[AC_DEFINE(BUILD_OWN_PTHREAD_WORKQUEUES, 1, [Define if building pthread work queues from source])
344+
ac_configure_args="--disable-libpwq-install $ac_configure_args"
345+
AC_CONFIG_SUBDIRS([libpwq])
346+
build_own_pthread_workqueues=true
347+
AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
334348
have_pthread_workqueues=true],
335-
[have_pthread_workqueues=false]
336-
)]
349+
[build_own_pthread_workqueues=false
350+
AC_CHECK_HEADERS([pthread/workqueue_private.h pthread_workqueue.h],
351+
[AC_DEFINE(HAVE_PTHREAD_WORKQUEUES, 1, [Define if pthread work queues are present])
352+
have_pthread_workqueues=true],
353+
[have_pthread_workqueues=false]
354+
)]
355+
)]
337356
)
357+
AM_CONDITIONAL(HAVE_INTERNAL_PTHREAD_WORKQUEUES, $have_internal_pthread_workqueues)
338358
AM_CONDITIONAL(BUILD_OWN_PTHREAD_WORKQUEUES, $build_own_pthread_workqueues)
339359
AM_CONDITIONAL(HAVE_PTHREAD_WORKQUEUES, $have_pthread_workqueues)
340360

src/Makefile.am

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ else
99
lib_LTLIBRARIES=libdispatch.la
1010
endif
1111

12+
if HAVE_INTERNAL_PTHREAD_WORKQUEUES
13+
INTERNAL_WORKQUEUE_SOURCES= \
14+
workqueue/workqueue.c \
15+
workqueue/workqueue_internal.h
16+
endif
17+
1218
libdispatch_la_SOURCES= \
1319
allocator.c \
1420
apply.c \
@@ -60,7 +66,8 @@ libdispatch_la_SOURCES= \
6066
shims/perfmon.h \
6167
shims/time.h \
6268
shims/tsd.h \
63-
shims/yield.h
69+
shims/yield.h \
70+
$(INTERNAL_WORKQUEUE_SOURCES)
6471

6572
EXTRA_libdispatch_la_SOURCES=
6673
EXTRA_libdispatch_la_DEPENDENCIES=
@@ -77,6 +84,7 @@ AM_OBJCFLAGS=$(DISPATCH_CFLAGS) $(CBLOCKS_FLAGS)
7784
AM_CXXFLAGS=$(PTHREAD_WORKQUEUE_CFLAGS) $(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS)
7885
AM_OBJCXXFLAGS=$(DISPATCH_CFLAGS) $(CXXBLOCKS_FLAGS)
7986

87+
if !HAVE_INTERNAL_PTHREAD_WORKQUEUES
8088
if BUILD_OWN_PTHREAD_WORKQUEUES
8189
PTHREAD_WORKQUEUE_LIBS=$(top_builddir)/libpwq/libpthread_workqueue.la
8290
PTHREAD_WORKQUEUE_CFLAGS=-I$(top_srcdir)/libpwq/include
@@ -85,6 +93,7 @@ if HAVE_PTHREAD_WORKQUEUES
8593
PTHREAD_WORKQUEUE_LIBS=-lpthread_workqueue
8694
endif
8795
endif
96+
endif
8897

8998
if BUILD_OWN_BLOCKS_RUNTIME
9099
libdispatch_la_SOURCES+= BlocksRuntime/data.c BlocksRuntime/runtime.c

src/queue.c

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@
3232
#endif
3333
#if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) && \
3434
!HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
35+
!HAVE_INTERNAL_PTHREAD_WORKQUEUE && \
3536
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
3637
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
3738
#endif
3839
#if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
39-
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
40+
!(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || HAVE_INTERNAL_PTHREAD_WORKQUEUE)
4041
#define pthread_workqueue_t void*
4142
#endif
4243

@@ -66,7 +67,7 @@ static void _dispatch_worker_thread2(int priority, int options, void *context);
6667
#endif
6768
#if DISPATCH_USE_PTHREAD_POOL
6869
static void *_dispatch_worker_thread(void *context);
69-
static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
70+
int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
7071
#endif
7172

7273
#if DISPATCH_COCOA_COMPAT
@@ -152,7 +153,7 @@ struct dispatch_root_queue_context_s {
152153
#if HAVE_PTHREAD_WORKQUEUES
153154
qos_class_t dgq_qos;
154155
int dgq_wq_priority, dgq_wq_options;
155-
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
156+
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL || HAVE_INTERNAL_PTHREAD_WORKQUEUE
156157
pthread_workqueue_t dgq_kworkqueue;
157158
#endif
158159
#endif // HAVE_PTHREAD_WORKQUEUES
@@ -635,7 +636,7 @@ _dispatch_root_queues_init_workq(int *wq_supported)
635636
result = !r;
636637
}
637638
#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
638-
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
639+
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL || HAVE_INTERNAL_PTHREAD_WORKQUEUE
639640
if (!result) {
640641
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
641642
pthread_workqueue_attr_t pwq_attr;
@@ -663,6 +664,15 @@ _dispatch_root_queues_init_workq(int *wq_supported)
663664
result = result || dispatch_assume(pwq);
664665
}
665666
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
667+
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
668+
if (!disable_wq && qc->dgq_wq_priority != WORKQ_PRIO_INVALID) {
669+
int priority = qc->dgq_wq_priority;
670+
int overcommit = qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT;
671+
r = dispatch_workqueue_get_wq(&pwq, priority, overcommit);
672+
(void)dispatch_assume_zero(r);
673+
result = result || dispatch_assume(pwq);
674+
}
675+
#endif // HAVE_INTERNAL_PTHREAD_WORKQUEUE
666676
qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
667677
}
668678
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
@@ -672,7 +682,7 @@ _dispatch_root_queues_init_workq(int *wq_supported)
672682
}
673683
#endif
674684
}
675-
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
685+
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL || HAVE_INTERNAL_PTHREAD_WORKQUEUE
676686
#endif // HAVE_PTHREAD_WORKQUEUES
677687
return result;
678688
}
@@ -3972,6 +3982,16 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n)
39723982
return;
39733983
}
39743984
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
3985+
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
3986+
if (qc->dgq_kworkqueue) {
3987+
do {
3988+
r = dispatch_workqueue_additem_np(qc->dgq_kworkqueue,
3989+
_dispatch_worker_thread4, dq);
3990+
(void)dispatch_assume_zero(r);
3991+
} while (--i);
3992+
return;
3993+
}
3994+
#endif // HAVE_INTERNAL_PTHREAD_WORKQUEUE
39753995
#if HAVE_PTHREAD_WORKQUEUE_QOS
39763996
r = _pthread_workqueue_addthreads((int)i,
39773997
_dispatch_priority_to_pp(dq->dq_priority));

src/shims.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,12 @@
4242
#if __has_include(<pthread/workqueue_private.h>)
4343
#include <pthread/workqueue_private.h>
4444
#else
45+
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
46+
#include <workqueue/workqueue_internal.h>
47+
#else
4548
#include <pthread_workqueue.h>
4649
#endif
50+
#endif
4751
#ifndef WORKQ_FEATURE_MAINTENANCE
4852
#define WORKQ_FEATURE_MAINTENANCE 0x10
4953
#endif

0 commit comments

Comments
 (0)