diff --git a/.gitignore b/.gitignore index 1bf54ca69..aa26514c9 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,5 @@ config configure libtool .dirstamp +/dispatch/module.modulemap +/private/module.modulemap diff --git a/PATCHES b/PATCHES index 0c7ff18d1..6b62acb39 100644 --- a/PATCHES +++ b/PATCHES @@ -241,3 +241,29 @@ github commits starting with 29bdc2f from [4a6ec51] APPLIED rdar://25159995 [bc16cc9] APPLIED rdar://25159995 [954ace4] APPLIED rdar://25159995 +[5ea30b5] APPLIED rdar://26822213 +[9f1e778] APPLIED rdar://26822213 +[3339b81] APPLIED rdar://26822213 +[4fa8d8d] APPLIED rdar://26822213 +[e922531] APPLIED rdar://26822213 +[195cbcf] APPLIED rdar://27303844 +[5b893c8] APPLIED rdar://27303844 +[92689ed] APPLIED rdar://27303844 +[ecc14fa] APPLIED rdar://27303844 +[2dbf83c] APPLIED rdar://27303844 +[78b9e82] APPLIED rdar://27303844 +[2c0e5ee] APPLIED rdar://27303844 +[5ee237f] APPLIED rdar://27600964 +[77299ec] APPLIED rdar://27600964 +[57c5c28] APPLIED rdar://27600964 +[f8423ec] APPLIED rdar://27600964 +[325f73d] APPLIED rdar://27600964 +[b84e87e] APPLIED rdar://27600964 +[ae71a91] APPLIED rdar://27600964 +[8669dea] APPLIED rdar://27600964 +[a8d0327] APPLIED rdar://27600964 +[2e4e6af] APPLIED rdar://27600964 +[2457fb2] APPLIED rdar://27600964 +[4d58038] APPLIED rdar://27600964 +[98d0a05] APPLIED rdar://27600964 +[8976101] APPLIED rdar://27600964 diff --git a/config/config.h b/config/config.h index 3d44641bb..ca3a1dbb8 100644 --- a/config/config.h +++ b/config/config.h @@ -13,6 +13,10 @@ don't. */ #define HAVE_DECL_FD_COPY 1 +/* Define to 1 if you have the declaration of `NOTE_LOWAT', and to 0 if you + don't. */ +#define HAVE_DECL_NOTE_LOWAT 1 + /* Define to 1 if you have the declaration of `NOTE_NONE', and to 0 if you don't. */ #define HAVE_DECL_NOTE_NONE 1 diff --git a/configure.ac b/configure.ac index 1998c3cf4..461659ec5 100644 --- a/configure.ac +++ b/configure.ac @@ -353,7 +353,7 @@ AC_CHECK_FUNCS([mach_port_construct]) # AC_CHECK_DECLS([CLOCK_UPTIME, CLOCK_MONOTONIC], [], [], [[#include ]]) -AC_CHECK_DECLS([NOTE_NONE, NOTE_REAP, NOTE_REVOKE, NOTE_SIGNAL], [], [], +AC_CHECK_DECLS([NOTE_NONE, NOTE_REAP, NOTE_REVOKE, NOTE_SIGNAL, NOTE_LOWAT], [], [], [[#include ]]) AC_CHECK_DECLS([FD_COPY], [], [], [[#include ]]) AC_CHECK_DECLS([SIGEMT], [], [], [[#include ]]) @@ -432,6 +432,20 @@ AS_IF([test "x$have_mach" = "xtrue"], [ ]) AM_CONDITIONAL(HAVE_DARWIN_LD, [test "x$dispatch_cv_ld_darwin" == "xyes"]) +# +# symlink platform-specific module.modulemap files +# +AS_CASE([$target_os], + [darwin*], [ dispatch_module_map_os=darwin ], + [ dispatch_module_map_os=generic ] +) +AC_CONFIG_COMMANDS([modulemaps], [ + ln -fs $dispatch_module_map_os/module.modulemap $ac_top_srcdir/dispatch/module.modulemap + ln -fs $dispatch_module_map_os/module.modulemap $ac_top_srcdir/private/module.modulemap + ], + [dispatch_module_map_os="$dispatch_module_map_os"] +) + # # Temporary: some versions of clang do not mark __builtin_trap() as # __attribute__((__noreturn__)). Detect and add if required. @@ -449,6 +463,6 @@ AC_CONFIG_FILES([Makefile dispatch/Makefile man/Makefile os/Makefile private/Mak # # Generate testsuite links # -AC_CONFIG_LINKS([tests/dispatch:$top_srcdir/private tests/leaks-wrapper:tests/leaks-wrapper.sh]) +AC_CONFIG_LINKS([tests/dispatch:$ac_top_srcdir/private tests/leaks-wrapper:tests/leaks-wrapper.sh]) AC_OUTPUT diff --git a/dispatch/base.h b/dispatch/base.h index 4b9013197..8adfb0bdb 100644 --- a/dispatch/base.h +++ b/dispatch/base.h @@ -64,6 +64,7 @@ #define DISPATCH_MALLOC __attribute__((__malloc__)) #define DISPATCH_ALWAYS_INLINE __attribute__((__always_inline__)) #define DISPATCH_UNAVAILABLE __attribute__((__unavailable__)) +#define DISPATCH_UNAVAILABLE_MSG(msg) __attribute__((__unavailable__(msg))) #else /*! @parseOnly */ #define DISPATCH_NORETURN @@ -99,6 +100,16 @@ #define DISPATCH_ALWAYS_INLINE /*! @parseOnly */ #define DISPATCH_UNAVAILABLE +/*! @parseOnly */ +#define DISPATCH_UNAVAILABLE_MSG(msg) +#endif + +#ifdef __linux__ +#define DISPATCH_LINUX_UNAVAILABLE() \ + DISPATCH_UNAVAILABLE_MSG( \ + "This interface is unavailable on linux systems") +#else +#define DISPATCH_LINUX_UNAVAILABLE() #endif #ifndef DISPATCH_ALIAS_V2 diff --git a/dispatch/darwin/module.modulemap b/dispatch/darwin/module.modulemap new file mode 100644 index 000000000..addaae436 --- /dev/null +++ b/dispatch/darwin/module.modulemap @@ -0,0 +1,10 @@ +module Dispatch [system] [extern_c] { + umbrella header "dispatch.h" + module * { export * } + export * +} + +module DispatchIntrospection [system] [extern_c] { + header "introspection.h" + export * +} diff --git a/dispatch/dispatch.h b/dispatch/dispatch.h index 9869bc2ec..a26b95107 100644 --- a/dispatch/dispatch.h +++ b/dispatch/dispatch.h @@ -24,71 +24,36 @@ #ifdef __APPLE__ #include #include -#endif -#include -#include -#include -#include -#include -#include -#include -#include - -#ifdef __has_attribute -#if __has_attribute(unavailable) -#define __DISPATCH_UNAVAILABLE(msg) __attribute__((__unavailable__(msg))) -#endif -#endif -#ifndef __DISPATCH_UNAVAILABLE -#define __DISPATCH_UNAVAILABLE(msg) -#endif - -#ifdef __linux__ -#if __has_feature(modules) -#include // for off_t (to match Glibc.modulemap) -#endif -#define DISPATCH_LINUX_UNAVAILABLE() \ - __DISPATCH_UNAVAILABLE("This interface is unavailable on linux systems") #else -#define DISPATCH_LINUX_UNAVAILABLE() -#endif - -#ifndef __OSX_AVAILABLE_STARTING #define __OSX_AVAILABLE_STARTING(x, y) -#endif -#ifndef __OSX_AVAILABLE_BUT_DEPRECATED #define __OSX_AVAILABLE_BUT_DEPRECATED(...) -#endif -#ifndef __OSX_AVAILABLE_BUT_DEPRECATED_MSG #define __OSX_AVAILABLE_BUT_DEPRECATED_MSG(...) -#endif - -#ifndef __OSX_AVAILABLE #define __OSX_AVAILABLE(...) -#endif -#ifndef __IOS_AVAILABLE #define __IOS_AVAILABLE(...) -#endif -#ifndef __TVOS_AVAILABLE #define __TVOS_AVAILABLE(...) -#endif -#ifndef __WATCHOS_AVAILABLE #define __WATCHOS_AVAILABLE(...) -#endif -#ifndef __OSX_DEPRECATED #define __OSX_DEPRECATED(...) -#endif -#ifndef __IOS_DEPRECATED #define __IOS_DEPRECATED(...) -#endif -#ifndef __TVOS_DEPRECATED #define __TVOS_DEPRECATED(...) -#endif -#ifndef __WATCHOS_DEPRECATED #define __WATCHOS_DEPRECATED(...) +#endif // __APPLE__ + +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(__linux__) && defined(__has_feature) +#if __has_feature(modules) +#include // for off_t (to match Glibc.modulemap) +#endif #endif -#define DISPATCH_API_VERSION 20160612 +#define DISPATCH_API_VERSION 20160712 #ifndef __DISPATCH_BUILDING_DISPATCH__ diff --git a/dispatch/module.modulemap b/dispatch/generic/module.modulemap similarity index 100% rename from dispatch/module.modulemap rename to dispatch/generic/module.modulemap diff --git a/dispatch/group.h b/dispatch/group.h index 5756a401d..c50ad89d1 100644 --- a/dispatch/group.h +++ b/dispatch/group.h @@ -134,8 +134,7 @@ dispatch_group_async_f(dispatch_group_t group, * @discussion * This function waits for the completion of the blocks associated with the * given dispatch group, and returns after all blocks have completed or when - * the specified timeout has elapsed. When a timeout occurs, the group is - * restored to its original state. + * the specified timeout has elapsed. * * This function will return immediately if there are no blocks associated * with the dispatch group (i.e. the group is empty). @@ -262,7 +261,7 @@ dispatch_group_enter(dispatch_group_t group); * * @discussion * Calling this function indicates block has completed and left the dispatch - * groupJ by a means other than dispatch_group_async(). + * group by a means other than dispatch_group_async(). * * @param group * The dispatch group to update. diff --git a/libdispatch.xcodeproj/project.pbxproj b/libdispatch.xcodeproj/project.pbxproj index c40d08155..9fe06aa92 100644 --- a/libdispatch.xcodeproj/project.pbxproj +++ b/libdispatch.xcodeproj/project.pbxproj @@ -662,8 +662,8 @@ C01866BD1C5973210040FC07 /* libdispatch.a */ = {isa = PBXFileReference; explicitFileType = archive.ar; includeInIndex = 0; path = libdispatch.a; sourceTree = BUILT_PRODUCTS_DIR; }; C01866BE1C59735B0040FC07 /* libdispatch-mp-static.xcconfig */ = {isa = PBXFileReference; lastKnownFileType = text.xcconfig; path = "libdispatch-mp-static.xcconfig"; sourceTree = ""; }; C01866BF1C5976C90040FC07 /* run-on-install.sh */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.script.sh; path = "run-on-install.sh"; sourceTree = ""; }; - C901445E1C73A7FE002638FC /* module.modulemap */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = "sourcecode.module-map"; path = module.modulemap; sourceTree = ""; }; - C90144641C73A845002638FC /* module.modulemap */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = "sourcecode.module-map"; path = module.modulemap; sourceTree = ""; }; + C901445E1C73A7FE002638FC /* module.modulemap */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = "sourcecode.module-map"; name = module.modulemap; path = darwin/module.modulemap; sourceTree = ""; }; + C90144641C73A845002638FC /* module.modulemap */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = "sourcecode.module-map"; name = module.modulemap; path = darwin/module.modulemap; sourceTree = ""; }; C913AC0E143BD34800B78976 /* data_private.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = data_private.h; sourceTree = ""; tabWidth = 8; }; C927F35F10FD7F1000C5AB8B /* ddt.xcodeproj */ = {isa = PBXFileReference; lastKnownFileType = "wrapper.pb-project"; name = ddt.xcodeproj; path = tools/ddt/ddt.xcodeproj; sourceTree = ""; }; C96CE17A1CEB851600F4B8E6 /* dispatch_objc.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = dispatch_objc.m; sourceTree = ""; }; diff --git a/man/dispatch_source_create.3 b/man/dispatch_source_create.3 index 1c1951b77..4da708cfb 100644 --- a/man/dispatch_source_create.3 +++ b/man/dispatch_source_create.3 @@ -271,8 +271,8 @@ Sources of this type allow applications to manually trigger the source's event handler via a call to .Fn dispatch_source_merge_data . The data will be merged with the source's pending data via an atomic add or -logic OR (based on the source's type), and the event handler block will be -submitted to the source's target queue. The +atomic bitwise OR (based on the source's type), and the event handler block will +be submitted to the source's target queue. The .Fa data is application defined. These sources have no .Fa handle @@ -297,7 +297,8 @@ The data returned by .Fn dispatch_source_get_data indicates which of the events in the .Fa mask -were observed. Note that because this source type will request notifications on the provided port, it should not be mixed with the use of +were observed. Note that because this source type will request notifications on +the provided port, it should not be mixed with the use of .Fn mach_port_request_notification on the same port. .Pp @@ -314,8 +315,8 @@ on the mach port is waiting to be received. .Pp .Vt DISPATCH_SOURCE_TYPE_MEMORYPRESSURE .Pp -Sources of this type monitor the system memory pressure condition for state changes. -The +Sources of this type monitor the system memory pressure condition for state +changes. The .Fa handle is unused and should be zero. The .Fa mask diff --git a/os/firehose_buffer_private.h b/os/firehose_buffer_private.h index 0c8516421..2c6466f94 100644 --- a/os/firehose_buffer_private.h +++ b/os/firehose_buffer_private.h @@ -69,7 +69,7 @@ typedef struct firehose_buffer_chunk_s { uint8_t fbc_data[FIREHOSE_BUFFER_CHUNK_SIZE - sizeof(firehose_buffer_pos_u) - sizeof(uint64_t)]; -} *firehose_buffer_chunk_t; +} __attribute__((aligned(8))) *firehose_buffer_chunk_t; typedef struct firehose_buffer_range_s { uint16_t fbr_offset; // offset from the start of the buffer diff --git a/private/module.modulemap b/private/darwin/module.modulemap similarity index 100% rename from private/module.modulemap rename to private/darwin/module.modulemap diff --git a/private/generic/module.modulemap b/private/generic/module.modulemap new file mode 100644 index 000000000..62975a59b --- /dev/null +++ b/private/generic/module.modulemap @@ -0,0 +1,11 @@ +module DispatchPrivate [system] [extern_c] { + umbrella header "private.h" + exclude header "mach_private.h" + module * { export * } + export * +} + +module DispatchIntrospectionPrivate [system] [extern_c] { + header "introspection_private.h" + export * +} diff --git a/private/private.h b/private/private.h index 6f4b08b31..3c37bed0d 100644 --- a/private/private.h +++ b/private/private.h @@ -66,7 +66,7 @@ #endif /* !__DISPATCH_BUILDING_DISPATCH__ */ // Check that public and private dispatch headers match -#if DISPATCH_API_VERSION != 20160612 // Keep in sync with +#if DISPATCH_API_VERSION != 20160712 // Keep in sync with #error "Dispatch header mismatch between /usr/include and /usr/local/include" #endif @@ -199,10 +199,17 @@ DISPATCH_EXPORT DISPATCH_NOTHROW dispatch_runloop_handle_t _dispatch_get_main_queue_handle_4CF(void); +#if TARGET_OS_MAC +__OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_4_0) +DISPATCH_EXPORT DISPATCH_NOTHROW +void +_dispatch_main_queue_callback_4CF(mach_msg_header_t *_Null_unspecified msg); +#else __OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_4_0) DISPATCH_EXPORT DISPATCH_NOTHROW void _dispatch_main_queue_callback_4CF(void *_Null_unspecified msg); +#endif __OSX_AVAILABLE_STARTING(__MAC_10_9,__IPHONE_7_0) DISPATCH_EXPORT DISPATCH_MALLOC DISPATCH_RETURNS_RETAINED DISPATCH_WARN_RESULT diff --git a/private/queue_private.h b/private/queue_private.h index 0acaceb91..33de371c8 100644 --- a/private/queue_private.h +++ b/private/queue_private.h @@ -227,6 +227,23 @@ dispatch_pthread_root_queue_flags_pool_size(uint8_t pool_size) #endif /* __BLOCKS__ */ +/*! + * @function dispatch_pthread_root_queue_copy_current + * + * @abstract + * Returns a reference to the pthread root queue object that has created the + * currently executing thread, or NULL if the current thread is not associated + * to a pthread root queue. + * + * @result + * A new reference to a pthread root queue object or NULL. + */ +__OSX_AVAILABLE(10.12) __IOS_AVAILABLE(10.0) +__TVOS_AVAILABLE(10.0) __WATCHOS_AVAILABLE(3.0) +DISPATCH_EXPORT DISPATCH_RETURNS_RETAINED DISPATCH_WARN_RESULT DISPATCH_NOTHROW +dispatch_queue_t _Nullable +dispatch_pthread_root_queue_copy_current(void); + /*! * @constant DISPATCH_APPLY_CURRENT_ROOT_QUEUE * @discussion Constant to pass to the dispatch_apply() and dispatch_apply_f() diff --git a/src/Makefile.am b/src/Makefile.am index 39221a4c4..98d36160a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -87,10 +87,11 @@ if BUILD_OWN_BLOCKS_RUNTIME libdispatch_la_SOURCES+= BlocksRuntime/data.c BlocksRuntime/runtime.c CBLOCKS_FLAGS+= -I$(top_srcdir)/src/BlocksRuntime CXXBLOCKS_FLAGS+= -I$(top_srcdir)/src/BlocksRuntime +BLOCKS_RUNTIME_LIBS=-ldl endif libdispatch_la_LDFLAGS=-avoid-version -libdispatch_la_LIBADD=$(KQUEUE_LIBS) $(PTHREAD_WORKQUEUE_LIBS) $(BSD_OVERLAY_LIBS) +libdispatch_la_LIBADD=$(KQUEUE_LIBS) $(PTHREAD_WORKQUEUE_LIBS) $(BSD_OVERLAY_LIBS) $(BLOCKS_RUNTIME_LIBS) if HAVE_DARWIN_LD libdispatch_la_LDFLAGS+=-Wl,-compatibility_version,1 \ diff --git a/src/apply.c b/src/apply.c index 57021e534..e051a1630 100644 --- a/src/apply.c +++ b/src/apply.c @@ -87,6 +87,9 @@ _dispatch_apply_invoke2(void *ctxt, long invoke_flags) _dispatch_thread_event_destroy(&da->da_event); } if (os_atomic_dec2o(da, da_thr_cnt, release) == 0) { +#if DISPATCH_INTROSPECTION + _dispatch_continuation_free(da->da_dc); +#endif _dispatch_continuation_free((dispatch_continuation_t)da); } } @@ -145,6 +148,9 @@ _dispatch_apply_serial(void *ctxt) }); } while (++idx < iter); +#if DISPATCH_INTROSPECTION + _dispatch_continuation_free(da->da_dc); +#endif _dispatch_continuation_free((dispatch_continuation_t)da); } @@ -262,7 +268,12 @@ dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, da->da_iterations = iterations; da->da_nested = nested; da->da_thr_cnt = thr_cnt; +#if DISPATCH_INTROSPECTION + da->da_dc = _dispatch_continuation_alloc(); + *da->da_dc = dc; +#else da->da_dc = &dc; +#endif da->da_flags = 0; if (slowpath(dq->dq_width == 1) || slowpath(thr_cnt <= 1)) { diff --git a/src/data.c b/src/data.c index 5f1942fef..644328911 100644 --- a/src/data.c +++ b/src/data.c @@ -108,11 +108,12 @@ const dispatch_block_t _dispatch_data_destructor_none = ^{ DISPATCH_INTERNAL_CRASH(0, "none destructor called"); }; +#if !HAVE_MACH const dispatch_block_t _dispatch_data_destructor_munmap = ^{ DISPATCH_INTERNAL_CRASH(0, "munmap destructor called"); }; - -#ifndef __linux__ +#else +// _dispatch_data_destructor_munmap is a linker alias to the following const dispatch_block_t _dispatch_data_destructor_vm_deallocate = ^{ DISPATCH_INTERNAL_CRASH(0, "vmdeallocate destructor called"); }; @@ -253,7 +254,7 @@ dispatch_data_create_f(const void *buffer, size_t size, dispatch_queue_t queue, if (destructor != DISPATCH_DATA_DESTRUCTOR_DEFAULT && destructor != DISPATCH_DATA_DESTRUCTOR_FREE && destructor != DISPATCH_DATA_DESTRUCTOR_NONE && -#ifndef __linux__ +#if HAVE_MACH destructor != DISPATCH_DATA_DESTRUCTOR_VM_DEALLOCATE && #endif destructor != DISPATCH_DATA_DESTRUCTOR_INLINE) { diff --git a/src/firehose/firehose_buffer.c b/src/firehose/firehose_buffer.c index 5e9d225ef..1305bdea6 100644 --- a/src/firehose/firehose_buffer.c +++ b/src/firehose/firehose_buffer.c @@ -83,6 +83,8 @@ static void _dispatch_gate_wait(dispatch_gate_t l, uint32_t flags); #include "firehose_replyServer.h" // MiG #endif +#if OS_FIREHOSE_SPI + #if __has_feature(c_static_assert) _Static_assert(sizeof(((firehose_stream_state_u *)NULL)->fss_gate) == sizeof(((firehose_stream_state_u *)NULL)->fss_allocator), @@ -303,10 +305,10 @@ firehose_buffer_create(mach_port_t logd_port, uint64_t unique_pid, "Invalid values for MADVISE_CHUNK_COUNT / CHUNK_SIZE"); } - kr = mach_vm_map(mach_task_self(), &vm_addr, sizeof(*fb), - 0, VM_FLAGS_ANYWHERE | VM_MAKE_TAG(VM_MEMORY_GENEALOGY), - MEMORY_OBJECT_NULL, 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL, - VM_INHERIT_NONE); + kr = mach_vm_map(mach_task_self(), &vm_addr, sizeof(*fb), 0, + VM_FLAGS_ANYWHERE | VM_FLAGS_PURGABLE | + VM_MAKE_TAG(VM_MEMORY_GENEALOGY), MEMORY_OBJECT_NULL, 0, FALSE, + VM_PROT_DEFAULT, VM_PROT_ALL, VM_INHERIT_NONE); if (slowpath(kr)) { if (kr != KERN_NO_SPACE) dispatch_assume_zero(kr); firehose_mach_port_send_release(logd_port); @@ -1141,3 +1143,5 @@ __firehose_merge_updates(firehose_push_reply_t update) } } #endif // KERNEL + +#endif // OS_FIREHOSE_SPI diff --git a/src/firehose/firehose_server.c b/src/firehose/firehose_server.c index 1af06e0d8..a6be2fab7 100644 --- a/src/firehose/firehose_server.c +++ b/src/firehose/firehose_server.c @@ -149,10 +149,12 @@ firehose_client_snapshot_mark_done(firehose_client_t fc, } #define DRAIN_BATCH_SIZE 4 +#define FIREHOSE_DRAIN_FOR_IO 0x1 +#define FIREHOSE_DRAIN_POLL 0x2 OS_NOINLINE static void -firehose_client_drain(firehose_client_t fc, mach_port_t port, bool for_io) +firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) { firehose_buffer_t fb = fc->fc_buffer; firehose_buffer_chunk_t fbc; @@ -161,6 +163,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, bool for_io) uint16_t flushed, ref, count = 0; uint16_t client_head, client_flushed, sent_flushed; firehose_snapshot_t snapshot = NULL; + bool for_io = (flags & FIREHOSE_DRAIN_FOR_IO); if (for_io) { evt = FIREHOSE_EVENT_IO_BUFFER_RECEIVED; @@ -201,9 +204,9 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, bool for_io) } } - ref = (flushed + count) & FIREHOSE_RING_POS_IDX_MASK; // see firehose_buffer_ring_enqueue do { + ref = (flushed + count) & FIREHOSE_RING_POS_IDX_MASK; ref = os_atomic_load(&fbh_ring[ref], relaxed); ref &= FIREHOSE_RING_POS_IDX_MASK; } while (fc->fc_is_kernel && !ref); @@ -251,13 +254,24 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, bool for_io) firehose_client_notify(fc, port); } if (fc->fc_is_kernel) { - // see firehose_client_kernel_source_handle_event - dispatch_resume(fc->fc_kernel_source); - } else if (fc->fc_use_notifs && count >= DRAIN_BATCH_SIZE) { - // if we hit the drain batch size, the client probably logs a lot - // and there's more to drain, so optimistically schedule draining again - // this is cheap since the queue is hot, and is fair for other clients - firehose_client_push_async_merge(fc, 0, for_io); + if (!(flags & FIREHOSE_DRAIN_POLL)) { + // see firehose_client_kernel_source_handle_event + dispatch_resume(fc->fc_kernel_source); + } + } else { + if (fc->fc_use_notifs && count >= DRAIN_BATCH_SIZE) { + // if we hit the drain batch size, the client probably logs a lot + // and there's more to drain, so optimistically schedule draining + // again this is cheap since the queue is hot, and is fair for other + // clients + firehose_client_push_async_merge(fc, 0, for_io); + } + if (count && server_config.fs_kernel_client) { + // the kernel is special because it can drop messages, so if we're + // draining, poll the kernel each time while we're bound to a thread + firehose_client_drain(server_config.fs_kernel_client, + MACH_PORT_NULL, flags | FIREHOSE_DRAIN_POLL); + } } return; @@ -277,13 +291,13 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, bool for_io) static void firehose_client_drain_io_async(void *ctx) { - firehose_client_drain(ctx, MACH_PORT_NULL, true); + firehose_client_drain(ctx, MACH_PORT_NULL, FIREHOSE_DRAIN_FOR_IO); } static void firehose_client_drain_mem_async(void *ctx) { - firehose_client_drain(ctx, MACH_PORT_NULL, false); + firehose_client_drain(ctx, MACH_PORT_NULL, 0); } OS_NOINLINE @@ -751,8 +765,6 @@ firehose_server_resume(void) { struct firehose_server_s *fs = &server_config; - dispatch_mach_connect(fs->fs_mach_channel, fs->fs_bootstrap_port, - MACH_PORT_NULL, NULL); if (fs->fs_kernel_client) { dispatch_async(fs->fs_io_drain_queue, ^{ struct firehose_client_connected_info_s fcci = { @@ -761,6 +773,8 @@ firehose_server_resume(void) firehose_client_resume(fs->fs_kernel_client, &fcci); }); } + dispatch_mach_connect(fs->fs_mach_channel, fs->fs_bootstrap_port, + MACH_PORT_NULL, NULL); } #pragma mark - @@ -1035,7 +1049,7 @@ firehose_server_register(mach_port_t server_port OS_UNUSED, if (extra_info_port && extra_info_size) { mach_vm_address_t addr = 0; kr = mach_vm_map(mach_task_self(), &addr, extra_info_size, 0, - VM_FLAGS_ANYWHERE, mem_port, 0, FALSE, + VM_FLAGS_ANYWHERE, extra_info_port, 0, FALSE, VM_PROT_READ, VM_PROT_READ, VM_INHERIT_NONE); if (dispatch_assume_zero(kr)) { mach_vm_deallocate(mach_task_self(), base_addr, mem_size); @@ -1104,7 +1118,8 @@ firehose_server_push(mach_port_t server_port OS_UNUSED, } block = dispatch_block_create_with_qos_class(flags, qos, 0, ^{ - firehose_client_drain(fc, reply_port, for_io); + firehose_client_drain(fc, reply_port, + for_io ? FIREHOSE_DRAIN_FOR_IO : 0); }); dispatch_async(q, block); _Block_release(block); diff --git a/src/init.c b/src/init.c index 443f4e6aa..87be596f2 100644 --- a/src/init.c +++ b/src/init.c @@ -1197,8 +1197,8 @@ dispatch_source_type_readwrite_init(dispatch_source_t ds, dispatch_queue_t q DISPATCH_UNUSED) { ds->ds_is_level = true; +#if HAVE_DECL_NOTE_LOWAT // bypass kernel check for device kqueue support rdar://19004921 -#ifdef NOTE_LOWAT ds->ds_dkev->dk_kevent.fflags = NOTE_LOWAT; #endif ds->ds_dkev->dk_kevent.data = 1; diff --git a/src/inline_internal.h b/src/inline_internal.h index f4e44a183..d1c73dd4e 100644 --- a/src/inline_internal.h +++ b/src/inline_internal.h @@ -861,8 +861,6 @@ static inline pthread_priority_t _dispatch_get_defaultpriority(void); static inline void _dispatch_set_defaultpriority_override(void); static inline void _dispatch_reset_defaultpriority(pthread_priority_t pp); static inline pthread_priority_t _dispatch_get_priority(void); -static inline void _dispatch_set_priority(pthread_priority_t pp, - _dispatch_thread_set_self_t flags); static inline pthread_priority_t _dispatch_set_defaultpriority( pthread_priority_t pp, pthread_priority_t *new_pp); @@ -1553,36 +1551,38 @@ _dispatch_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _tail, } struct _dispatch_identity_s { - pthread_priority_t old_pri; pthread_priority_t old_pp; }; DISPATCH_ALWAYS_INLINE static inline void _dispatch_root_queue_identity_assume(struct _dispatch_identity_s *di, - pthread_priority_t pp, _dispatch_thread_set_self_t flags) + pthread_priority_t pp) { // assumed_rq was set by the caller, we need to fake the priorities dispatch_queue_t assumed_rq = _dispatch_queue_get_current(); dispatch_assert(dx_type(assumed_rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE); - di->old_pri = _dispatch_get_priority(); - // _dispatch_root_queue_drain_deferred_item() may turn a manager thread - // into a regular root queue, and we must never try to restore the manager - // flag once we became a regular work queue thread. - di->old_pri &= ~(pthread_priority_t)_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG; di->old_pp = _dispatch_get_defaultpriority(); - if (!pp) pp = di->old_pri; - if ((pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK) > - (assumed_rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) { - _dispatch_wqthread_override_start(_dispatch_tid_self(), pp); - // Ensure that the root queue sees that this thread was overridden. - _dispatch_set_defaultpriority_override(); + if (!(assumed_rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG)) { + if (!pp) { + pp = _dispatch_get_priority(); + // _dispatch_root_queue_drain_deferred_item() may turn a manager + // thread into a regular root queue, and we must never try to + // restore the manager flag once we became a regular work queue + // thread. + pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG; + } + if ((pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK) > + (assumed_rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) { + _dispatch_wqthread_override_start(_dispatch_tid_self(), pp); + // Ensure that the root queue sees that this thread was overridden. + _dispatch_set_defaultpriority_override(); + } } _dispatch_reset_defaultpriority(assumed_rq->dq_priority); - _dispatch_set_priority(assumed_rq->dq_priority, flags); } DISPATCH_ALWAYS_INLINE @@ -1590,7 +1590,6 @@ static inline void _dispatch_root_queue_identity_restore(struct _dispatch_identity_s *di) { _dispatch_reset_defaultpriority(di->old_pp); - _dispatch_set_priority(di->old_pri, 0); } typedef dispatch_queue_t @@ -1631,7 +1630,7 @@ _dispatch_queue_class_invoke(dispatch_object_t dou, if (overriding) { _dispatch_object_debug(dq, "stolen onto thread 0x%x, 0x%lx", _dispatch_tid_self(), _dispatch_get_defaultpriority()); - _dispatch_root_queue_identity_assume(&di, 0, 0); + _dispatch_root_queue_identity_assume(&di, 0); } if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN)) { @@ -1819,6 +1818,23 @@ _dispatch_get_root_queue_for_priority(pthread_priority_t pp, bool overcommit) } #endif +DISPATCH_ALWAYS_INLINE DISPATCH_CONST +static inline dispatch_queue_t +_dispatch_get_root_queue_with_overcommit(dispatch_queue_t rq, bool overcommit) +{ + bool rq_overcommit = (rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG); + // root queues in _dispatch_root_queues are not overcommit for even indices + // and overcommit for odd ones, so fixing overcommit is either returning + // the same queue, or picking its neighbour in _dispatch_root_queues + if (overcommit && !rq_overcommit) { + return rq + 1; + } + if (!overcommit && rq_overcommit) { + return rq - 1; + } + return rq; +} + DISPATCH_ALWAYS_INLINE static inline void _dispatch_queue_set_bound_thread(dispatch_queue_t dq) @@ -1927,10 +1943,16 @@ _dispatch_queue_priority_inherit_from_target(dispatch_queue_t dq, #if HAVE_PTHREAD_WORKQUEUE_QOS const dispatch_priority_t rootqueue_flag = _PTHREAD_PRIORITY_ROOTQUEUE_FLAG; const dispatch_priority_t inherited_flag = _PTHREAD_PRIORITY_INHERIT_FLAG; + const dispatch_priority_t defaultqueue_flag = + _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG; dispatch_priority_t dqp = dq->dq_priority, tqp = tq->dq_priority; if ((!(dqp & ~_PTHREAD_PRIORITY_FLAGS_MASK) || (dqp & inherited_flag)) && (tqp & rootqueue_flag)) { - dq->dq_priority = (tqp & ~rootqueue_flag) | inherited_flag; + if (tqp & defaultqueue_flag) { + dq->dq_priority = 0; + } else { + dq->dq_priority = (tqp & ~rootqueue_flag) | inherited_flag; + } } #else (void)dq; (void)tq; @@ -1963,7 +1985,7 @@ _dispatch_set_defaultpriority(pthread_priority_t pp, pthread_priority_t *new_pp) if (new_pp) *new_pp = pp; return old_pp; #else - (void)pp; + (void)pp; (void)new_pp; return 0; #endif } @@ -2007,10 +2029,11 @@ _dispatch_priority_inherit_from_root_queue(pthread_priority_t pp, #if HAVE_PTHREAD_WORKQUEUE_QOS pthread_priority_t p = pp & ~_PTHREAD_PRIORITY_FLAGS_MASK; pthread_priority_t rqp = rq->dq_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK; - bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG; + pthread_priority_t defaultqueue = + rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG; if (!p || (!defaultqueue && p < rqp)) { - p = rqp; + p = rqp | defaultqueue; } return p | (rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG); #else @@ -2032,12 +2055,11 @@ _dispatch_get_priority(void) #endif } +#if HAVE_PTHREAD_WORKQUEUE_QOS DISPATCH_ALWAYS_INLINE static inline pthread_priority_t -_dispatch_priority_compute_update(pthread_priority_t pp, - _dispatch_thread_set_self_t flags) +_dispatch_priority_compute_update(pthread_priority_t pp) { -#if HAVE_PTHREAD_WORKQUEUE_QOS dispatch_assert(pp != DISPATCH_NO_PRIORITY); if (!_dispatch_set_qos_class_enabled) return 0; // the priority in _dispatch_get_priority() only tracks manager-ness @@ -2051,11 +2073,7 @@ _dispatch_priority_compute_update(pthread_priority_t pp, pthread_priority_t cur_priority = _dispatch_get_priority(); pthread_priority_t unbind = _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG; pthread_priority_t overcommit = _PTHREAD_PRIORITY_OVERCOMMIT_FLAG; - if (flags & DISPATCH_IGNORE_UNBIND) { - // if DISPATCH_IGNORE_UNBIND is passed, we want to ignore the - // difference if it is limited to the NEEDS_UNBIND flag - cur_priority &= ~(unbind | overcommit); - } else if (unlikely(cur_priority & unbind)) { + if (unlikely(cur_priority & unbind)) { // else we always need an update if the NEEDS_UNBIND flag is set // the slowpath in _dispatch_set_priority_and_voucher_slow() will // adjust the priority further with the proper overcommitness @@ -2064,11 +2082,9 @@ _dispatch_priority_compute_update(pthread_priority_t pp, cur_priority &= ~overcommit; } if (unlikely(pp != cur_priority)) return pp; -#else - (void)pp; (void)flags; -#endif return 0; } +#endif DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT static inline voucher_t @@ -2076,7 +2092,7 @@ _dispatch_set_priority_and_voucher(pthread_priority_t pp, voucher_t v, _dispatch_thread_set_self_t flags) { #if HAVE_PTHREAD_WORKQUEUE_QOS - pp = _dispatch_priority_compute_update(pp, flags); + pp = _dispatch_priority_compute_update(pp); if (likely(!pp)) { if (v == DISPATCH_NO_VOUCHER) { return DISPATCH_NO_VOUCHER; @@ -2128,21 +2144,6 @@ _dispatch_reset_voucher(voucher_t v, _dispatch_thread_set_self_t flags) (void)_dispatch_set_priority_and_voucher(0, v, flags); } -DISPATCH_ALWAYS_INLINE -static inline void -_dispatch_set_priority(pthread_priority_t pp, - _dispatch_thread_set_self_t flags) -{ - dispatch_assert(pp != DISPATCH_NO_PRIORITY); - pp = _dispatch_priority_compute_update(pp, flags); - if (likely(!pp)) { - return; - } -#if HAVE_PTHREAD_WORKQUEUE_QOS - _dispatch_set_priority_and_mach_voucher_slow(pp, VOUCHER_NO_MACH_VOUCHER); -#endif -} - DISPATCH_ALWAYS_INLINE static inline bool _dispatch_queue_need_override(dispatch_queue_class_t dqu, pthread_priority_t pp) diff --git a/src/internal.h b/src/internal.h index d299fe99f..4408d9672 100644 --- a/src/internal.h +++ b/src/internal.h @@ -270,7 +270,6 @@ DISPATCH_EXPORT DISPATCH_NOTHROW void dispatch_atfork_child(void); #if USE_POSIX_SEM #include #endif - #include #include #include @@ -413,8 +412,9 @@ DISPATCH_EXPORT DISPATCH_NOINLINE __attribute__((__format__(__printf__,1,2))) void _dispatch_log(const char *msg, ...); #endif // DISPATCH_USE_OS_DEBUG_LOG -#define dsnprintf(...) \ - ({ int _r = snprintf(__VA_ARGS__); _r < 0 ? 0u : (size_t)_r; }) +#define dsnprintf(buf, siz, ...) \ + ({ size_t _siz = siz; int _r = snprintf(buf, _siz, __VA_ARGS__); \ + _r < 0 ? 0u : ((size_t)_r > _siz ? _siz : (size_t)_r); }) #if __GNUC__ #define dispatch_static_assert(e) ({ \ @@ -869,12 +869,16 @@ typedef struct kevent64_s _dispatch_kevent_qos_s; #define DISPATCH_TRACE_SUBCLASS_DEFAULT 0 #define DISPATCH_TRACE_SUBCLASS_VOUCHER 1 #define DISPATCH_TRACE_SUBCLASS_PERF 2 +#define DISPATCH_TRACE_SUBCLASS_MACH_MSG 3 + #define DISPATCH_PERF_non_leaf_retarget DISPATCH_CODE(PERF, 1) #define DISPATCH_PERF_post_activate_retarget DISPATCH_CODE(PERF, 2) #define DISPATCH_PERF_post_activate_mutation DISPATCH_CODE(PERF, 3) #define DISPATCH_PERF_delayed_registration DISPATCH_CODE(PERF, 4) #define DISPATCH_PERF_mutable_target DISPATCH_CODE(PERF, 5) +#define DISPATCH_MACH_MSG_hdr_move DISPATCH_CODE(MACH_MSG, 1) + DISPATCH_ALWAYS_INLINE static inline void _dispatch_ktrace_impl(uint32_t code, uint64_t a, uint64_t b, @@ -950,16 +954,6 @@ _dispatch_ktrace_impl(uint32_t code, uint64_t a, uint64_t b, #define VOUCHER_USE_BANK_AUTOREDEEM 1 #endif -#if OS_FIREHOSE_SPI -#include -#else -typedef uint64_t firehose_activity_id_t; -typedef uint64_t firehose_tracepoint_id_t; -typedef unsigned long firehose_activity_flags_t; -typedef uint8_t firehose_stream_t; -typedef void * voucher_activity_hooks_t; -#endif - #if !VOUCHER_USE_MACH_VOUCHER || \ !__has_include() || \ !DISPATCH_HOST_SUPPORTS_OSX(101200) @@ -1049,8 +1043,7 @@ DISPATCH_ENUM(_dispatch_thread_set_self, unsigned long, DISPATCH_PRIORITY_ENFORCE = 0x1, DISPATCH_VOUCHER_REPLACE = 0x2, DISPATCH_VOUCHER_CONSUME = 0x4, - DISPATCH_IGNORE_UNBIND = 0x8, - DISPATCH_THREAD_PARK = 0x10, + DISPATCH_THREAD_PARK = 0x8, ); DISPATCH_WARN_RESULT static inline voucher_t _dispatch_adopt_priority_and_set_voucher( diff --git a/src/introspection.c b/src/introspection.c index cdd631ccd..d847cb91a 100644 --- a/src/introspection.c +++ b/src/introspection.c @@ -189,6 +189,7 @@ _dispatch_introspection_continuation_get_info(dispatch_queue_t dq, if (_dispatch_object_has_vtable(dc)) { flags = 0; switch (dc_type(dc)) { +#if HAVE_PTHREAD_WORKQUEUE_QOS case DC_OVERRIDE_STEALING_TYPE: case DC_OVERRIDE_OWNING_TYPE: dc = dc->dc_data; @@ -200,6 +201,7 @@ _dispatch_introspection_continuation_get_info(dispatch_queue_t dq, return; } return _dispatch_introspection_continuation_get_info(dq, dc, diqi); +#endif case DC_ASYNC_REDIRECT_TYPE: DISPATCH_INTERNAL_CRASH(0, "Handled by the caller"); case DC_MACH_SEND_BARRRIER_DRAIN_TYPE: diff --git a/src/io.c b/src/io.c index e2a123217..e4f05aec9 100644 --- a/src/io.c +++ b/src/io.c @@ -24,13 +24,6 @@ #define DISPATCH_IO_DEBUG DISPATCH_DEBUG #endif -#if DISPATCH_IO_DEBUG -#define _dispatch_fd_debug(msg, fd, args...) \ - _dispatch_debug("fd[0x%x]: " msg, (fd), ##args) -#else -#define _dispatch_fd_debug(msg, fd, args...) -#endif - #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x) #define _dispatch_io_data_release(x) _dispatch_objc_release(x) @@ -75,7 +68,7 @@ static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk, dispatch_operation_t operation, dispatch_data_t data); static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream, dispatch_io_t channel); -static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk, +static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk, dispatch_io_t channel); static void _dispatch_stream_source_handler(void *ctx); static void _dispatch_stream_queue_handler(void *ctx); @@ -119,6 +112,38 @@ enum { #define _dispatch_io_Block_copy(x) \ ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x))) +#pragma mark - +#pragma mark dispatch_io_debug + +#if DISPATCH_IO_DEBUG +#if !DISPATCH_DEBUG +#define _dispatch_io_log(x, ...) do { \ + _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \ + (void *)_dispatch_thread_self(), ##__VA_ARGS__); \ + } while (0) +#ifdef _dispatch_object_debug +#undef _dispatch_object_debug +#define _dispatch_object_debug dispatch_debug +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif +#else +#define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__) +#endif // DISPATCH_DEBUG +#else +#define _dispatch_io_log(x, ...) +#endif // DISPATCH_IO_DEBUG + +#define _dispatch_fd_debug(msg, fd, ...) \ + _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__) +#define _dispatch_op_debug(msg, op, ...) \ + _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__) +#define _dispatch_channel_debug(msg, channel, ...) \ + _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__) +#define _dispatch_fd_entry_debug(msg, fd_entry, ...) \ + _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__) +#define _dispatch_disk_debug(msg, disk, ...) \ + _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__) + #pragma mark - #pragma mark dispatch_io_hashtables @@ -227,7 +252,8 @@ _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry, _dispatch_retain(queue); dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{ dispatch_async(queue, ^{ - _dispatch_fd_debug("cleanup handler invoke", -1); + _dispatch_channel_debug("cleanup handler invoke: err %d", + channel, err); cleanup_handler(err); }); _dispatch_release(queue); @@ -318,9 +344,9 @@ dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd, if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { return DISPATCH_BAD_INPUT; } - _dispatch_fd_debug("io create", fd); dispatch_io_t channel = _dispatch_io_create(type); channel->fd = fd; + _dispatch_channel_debug("create", channel); channel->fd_actual = fd; dispatch_suspend(channel->queue); _dispatch_retain(queue); @@ -374,9 +400,9 @@ dispatch_io_create_with_path(dispatch_io_type_t type, const char *path, if (!path_data) { return DISPATCH_OUT_OF_MEMORY; } - _dispatch_fd_debug("io create with path %s", -1, path); dispatch_io_t channel = _dispatch_io_create(type); channel->fd = -1; + _dispatch_channel_debug("create with path %s", channel, path); channel->fd_actual = -1; path_data->channel = channel; path_data->oflag = oflag; @@ -461,8 +487,8 @@ dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel, if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { return DISPATCH_BAD_INPUT; } - _dispatch_fd_debug("io create with io %p", -1, in_channel); dispatch_io_t channel = _dispatch_io_create(type); + _dispatch_channel_debug("create with channel %p", channel, in_channel); dispatch_suspend(channel->queue); _dispatch_retain(queue); _dispatch_retain(channel); @@ -569,7 +595,7 @@ dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water) { _dispatch_retain(channel); dispatch_async(channel->queue, ^{ - _dispatch_fd_debug("io set high water", channel->fd); + _dispatch_channel_debug("set high water: %zu", channel, high_water); if (channel->params.low > high_water) { channel->params.low = high_water; } @@ -583,7 +609,7 @@ dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water) { _dispatch_retain(channel); dispatch_async(channel->queue, ^{ - _dispatch_fd_debug("io set low water", channel->fd); + _dispatch_channel_debug("set low water: %zu", channel, low_water); if (channel->params.high < low_water) { channel->params.high = low_water ? low_water : 1; } @@ -598,7 +624,7 @@ dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval, { _dispatch_retain(channel); dispatch_async(channel->queue, ^{ - _dispatch_fd_debug("io set interval", channel->fd); + _dispatch_channel_debug("set interval: %llu", channel, interval); channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX; channel->params.interval_flags = flags; _dispatch_release(channel); @@ -642,7 +668,7 @@ dispatch_io_get_descriptor(dispatch_io_t channel) static void _dispatch_io_stop(dispatch_io_t channel) { - _dispatch_fd_debug("io stop", channel->fd); + _dispatch_channel_debug("stop", channel); (void)os_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed); _dispatch_retain(channel); dispatch_async(channel->queue, ^{ @@ -650,7 +676,7 @@ _dispatch_io_stop(dispatch_io_t channel) _dispatch_object_debug(channel, "%s", __func__); dispatch_fd_entry_t fd_entry = channel->fd_entry; if (fd_entry) { - _dispatch_fd_debug("io stop cleanup", channel->fd); + _dispatch_channel_debug("stop cleanup", channel); _dispatch_fd_entry_cleanup_operations(fd_entry, channel); if (!(channel->atomic_flags & DIO_CLOSED)) { channel->fd_entry = NULL; @@ -661,8 +687,8 @@ _dispatch_io_stop(dispatch_io_t channel) _dispatch_retain(channel); dispatch_async(_dispatch_io_fds_lockq, ^{ _dispatch_object_debug(channel, "%s", __func__); - _dispatch_fd_debug("io stop after close cleanup", - channel->fd); + _dispatch_channel_debug("stop cleanup after close", + channel); dispatch_fd_entry_t fdi; uintptr_t hash = DIO_HASH(channel->fd); TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) { @@ -697,7 +723,7 @@ dispatch_io_close(dispatch_io_t channel, unsigned long flags) dispatch_async(channel->queue, ^{ dispatch_async(channel->barrier_queue, ^{ _dispatch_object_debug(channel, "%s", __func__); - _dispatch_fd_debug("io close", channel->fd); + _dispatch_channel_debug("close", channel); if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) { (void)os_atomic_or2o(channel, atomic_flags, DIO_CLOSED, relaxed); @@ -967,10 +993,6 @@ _dispatch_operation_create(dispatch_op_direction_t direction, { // On channel queue dispatch_assert(direction < DOP_DIR_MAX); - _dispatch_fd_debug("operation create", channel->fd); -#if DISPATCH_IO_DEBUG - int fd = channel->fd; -#endif // Safe to call _dispatch_io_get_error() with channel->fd_entry since // that can only be NULL if atomic_flags are set rdar://problem/8362514 int err = _dispatch_io_get_error(NULL, channel, false); @@ -985,7 +1007,8 @@ _dispatch_operation_create(dispatch_op_direction_t direction, } else if (direction == DOP_DIR_WRITE && !err) { d = NULL; } - _dispatch_fd_debug("IO handler invoke", fd); + _dispatch_channel_debug("IO handler invoke: err %d", channel, + err); handler(true, d, err); _dispatch_io_data_release(data); }); @@ -995,6 +1018,7 @@ _dispatch_operation_create(dispatch_op_direction_t direction, } dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation), sizeof(struct dispatch_operation_s)); + _dispatch_channel_debug("operation create: %p", channel, op); op->do_next = DISPATCH_OBJECT_LISTLESS; op->do_xref_cnt = -1; // operation object is not exposed externally op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL); @@ -1023,6 +1047,7 @@ void _dispatch_operation_dispose(dispatch_operation_t op) { _dispatch_object_debug(op, "%s", __func__); + _dispatch_op_debug("dispose", op); // Deliver the data if there's any if (op->fd_entry) { _dispatch_operation_deliver_data(op, DOP_DONE); @@ -1049,6 +1074,7 @@ _dispatch_operation_dispose(dispatch_operation_t op) dispatch_release(op->op_q); } Block_release(op->handler); + _dispatch_op_debug("disposed", op); } static void @@ -1071,6 +1097,7 @@ _dispatch_operation_enqueue(dispatch_operation_t op, handler(true, d, err); _dispatch_io_data_release(data); }); + _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err); _dispatch_release(op); return; } @@ -1098,13 +1125,14 @@ _dispatch_operation_should_enqueue(dispatch_operation_t op, dispatch_queue_t tq, dispatch_data_t data) { // On stream queue or disk queue - _dispatch_fd_debug("enqueue operation", op->fd_entry->fd); + _dispatch_op_debug("enqueue", op); _dispatch_io_data_retain(data); op->data = data; int err = _dispatch_io_get_error(op, NULL, true); if (err) { op->err = err; // Final release + _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err); _dispatch_release(op); return false; } @@ -1241,7 +1269,6 @@ _dispatch_fd_entry_init_async(dispatch_fd_t fd, dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL, _dispatch_io_fds_lockq_init); dispatch_async(_dispatch_io_fds_lockq, ^{ - _dispatch_fd_debug("fd entry init", fd); dispatch_fd_entry_t fd_entry = NULL; // Check to see if there is an existing entry for the given fd uintptr_t hash = DIO_HASH(fd); @@ -1257,8 +1284,9 @@ _dispatch_fd_entry_init_async(dispatch_fd_t fd, // If we did not find an existing entry, create one fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash); } + _dispatch_fd_entry_debug("init", fd_entry); dispatch_async(fd_entry->barrier_queue, ^{ - _dispatch_fd_debug("fd entry init completion", fd); + _dispatch_fd_entry_debug("init completion", fd_entry); completion_callback(fd_entry); // stat() is complete, release reference to fd_entry _dispatch_fd_entry_release(fd_entry); @@ -1286,16 +1314,16 @@ static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash) { // On fds lock queue - _dispatch_fd_debug("fd entry create", fd); dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( _dispatch_io_fds_lockq); + _dispatch_fd_entry_debug("create: fd %d", fd_entry, fd); fd_entry->fd = fd; TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list); fd_entry->barrier_queue = dispatch_queue_create( "com.apple.libdispatch-io.barrierq", NULL); fd_entry->barrier_group = dispatch_group_create(); dispatch_async(fd_entry->barrier_queue, ^{ - _dispatch_fd_debug("fd entry stat", fd); + _dispatch_fd_entry_debug("stat", fd_entry); int err, orig_flags, orig_nosigpipe = -1; struct stat st; _dispatch_io_syscall_switch(err, @@ -1367,7 +1395,7 @@ _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash) // all operations associated with this entry have been freed dispatch_async(fd_entry->close_queue, ^{ if (!fd_entry->disk) { - _dispatch_fd_debug("close queue fd_entry cleanup", fd); + _dispatch_fd_entry_debug("close queue cleanup", fd_entry); dispatch_op_direction_t dir; for (dir = 0; dir < DOP_DIR_MAX; dir++) { _dispatch_stream_dispose(fd_entry, dir); @@ -1385,11 +1413,11 @@ _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash) // source cancels it and suspends the close queue. Freeing the fd_entry // structure must happen after the source cancel handler has finished dispatch_async(fd_entry->close_queue, ^{ - _dispatch_fd_debug("close queue release", fd); + _dispatch_fd_entry_debug("close queue release", fd_entry); dispatch_release(fd_entry->close_queue); - _dispatch_fd_debug("barrier queue release", fd); + _dispatch_fd_entry_debug("barrier queue release", fd_entry); dispatch_release(fd_entry->barrier_queue); - _dispatch_fd_debug("barrier group release", fd); + _dispatch_fd_entry_debug("barrier group release", fd_entry); dispatch_release(fd_entry->barrier_group); if (fd_entry->orig_flags != -1) { _dispatch_io_syscall( @@ -1418,9 +1446,9 @@ _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data, dev_t dev, mode_t mode) { // On devs lock queue - _dispatch_fd_debug("fd entry create with path %s", -1, path_data->path); dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( path_data->channel->queue); + _dispatch_fd_entry_debug("create: path %s", fd_entry, path_data->path); if (S_ISREG(mode)) { _dispatch_disk_init(fd_entry, major(dev)); } else { @@ -1439,7 +1467,7 @@ _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data, // that the channel associated with this entry has been closed and that // all operations associated with this entry have been freed dispatch_async(fd_entry->close_queue, ^{ - _dispatch_fd_debug("close queue fd_entry cleanup", -1); + _dispatch_fd_entry_debug("close queue cleanup", fd_entry); if (!fd_entry->disk) { dispatch_op_direction_t dir; for (dir = 0; dir < DOP_DIR_MAX; dir++) { @@ -1458,7 +1486,7 @@ _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data, } }); dispatch_async(fd_entry->close_queue, ^{ - _dispatch_fd_debug("close queue release", -1); + _dispatch_fd_entry_debug("close queue release", fd_entry); dispatch_release(fd_entry->close_queue); dispatch_release(fd_entry->barrier_queue); dispatch_release(fd_entry->barrier_group); @@ -1511,7 +1539,7 @@ _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, } _dispatch_fd_entry_retain(fd_entry); dispatch_async(fd_entry->disk->pick_queue, ^{ - _dispatch_disk_cleanup_operations(fd_entry->disk, channel); + _dispatch_disk_cleanup_inactive_operations(fd_entry->disk, channel); _dispatch_fd_entry_release(fd_entry); if (channel) { _dispatch_release(channel); @@ -1683,7 +1711,7 @@ _dispatch_stream_complete_operation(dispatch_stream_t stream, { // On stream queue _dispatch_object_debug(op, "%s", __func__); - _dispatch_fd_debug("complete operation", op->fd_entry->fd); + _dispatch_op_debug("complete: stream %p", op, stream); TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list); if (op == stream->op) { stream->op = NULL; @@ -1692,6 +1720,7 @@ _dispatch_stream_complete_operation(dispatch_stream_t stream, dispatch_source_cancel(op->timer); } // Final release will deliver any pending data + _dispatch_op_debug("release -> %d (stream complete)", op, op->do_ref_cnt); _dispatch_release(op); } @@ -1700,7 +1729,7 @@ _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op) { // On pick queue _dispatch_object_debug(op, "%s", __func__); - _dispatch_fd_debug("complete operation", op->fd_entry->fd); + _dispatch_op_debug("complete: disk %p", op, disk); // Current request is always the last op returned if (disk->cur_rq == op) { disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s, @@ -1719,6 +1748,7 @@ _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op) dispatch_source_cancel(op->timer); } // Final release will deliver any pending data + _dispatch_op_debug("release -> %d (disk complete)", op, op->do_ref_cnt); _dispatch_release(op); } @@ -1806,18 +1836,34 @@ _dispatch_stream_cleanup_operations(dispatch_stream_t stream, } } -static void -_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel) +static inline void +_dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk, + dispatch_io_t channel, bool inactive_only) { // On pick queue dispatch_operation_t op, tmp; TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) { + if (inactive_only && op->active) continue; if (!channel || op->channel == channel) { + _dispatch_op_debug("cleanup: disk %p", op, disk); _dispatch_disk_complete_operation(disk, op); } } } +static void +_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel) +{ + _dispatch_disk_cleanup_specified_operations(disk, channel, false); +} + +static void +_dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk, + dispatch_io_t channel) +{ + _dispatch_disk_cleanup_specified_operations(disk, channel, true); +} + #pragma mark - #pragma mark dispatch_stream_handler/dispatch_disk_handler @@ -1829,7 +1875,7 @@ _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op) return stream->source; } dispatch_fd_t fd = op->fd_entry->fd; - _dispatch_fd_debug("stream source create", fd); + _dispatch_op_debug("stream source create", op); dispatch_source_t source = NULL; if (op->direction == DOP_DIR_READ) { source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, @@ -1848,7 +1894,7 @@ _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op) // unregistered dispatch_queue_t close_queue = op->fd_entry->close_queue; dispatch_source_set_cancel_handler(source, ^{ - _dispatch_fd_debug("stream source cancel", fd); + _dispatch_op_debug("stream source cancel", op); dispatch_resume(close_queue); }); stream->source = source; @@ -1896,13 +1942,13 @@ _dispatch_stream_handler(void *ctx) goto pick; } stream->op = op; - _dispatch_fd_debug("stream handler", op->fd_entry->fd); + _dispatch_op_debug("stream handler", op); dispatch_fd_entry_t fd_entry = op->fd_entry; _dispatch_fd_entry_retain(fd_entry); // For performance analysis if (!op->total && dispatch_io_defaults.initial_delivery) { // Empty delivery to signal the start of the operation - _dispatch_fd_debug("initial delivery", op->fd_entry->fd); + _dispatch_op_debug("initial delivery", op); _dispatch_operation_deliver_data(op, DOP_DELIVER); } // TODO: perform on the operation target queue to get correct priority @@ -1960,7 +2006,7 @@ _dispatch_disk_handler(void *ctx) if (disk->io_active) { return; } - _dispatch_fd_debug("disk handler", -1); + _dispatch_disk_debug("disk handler", disk); dispatch_operation_t op; size_t i = disk->free_idx, j = disk->req_idx; if (j <= i) { @@ -1976,8 +2022,10 @@ _dispatch_disk_handler(void *ctx) continue; } _dispatch_retain(op); + _dispatch_op_debug("retain -> %d", op, op->do_ref_cnt + 1); disk->advise_list[i%disk->advise_list_depth] = op; op->active = true; + _dispatch_op_debug("activate: disk %p", op, disk); _dispatch_object_debug(op, "%s", __func__); } else { // No more operations to get @@ -1989,6 +2037,7 @@ _dispatch_disk_handler(void *ctx) op = disk->advise_list[disk->req_idx]; if (op) { disk->io_active = true; + _dispatch_op_debug("async perform: disk %p", op, disk); dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform); } } @@ -1997,8 +2046,8 @@ static void _dispatch_disk_perform(void *ctxt) { dispatch_disk_t disk = ctxt; + _dispatch_disk_debug("disk perform", disk); size_t chunk_size = dispatch_io_defaults.chunk_size; - _dispatch_fd_debug("disk perform", -1); dispatch_operation_t op; size_t i = disk->advise_idx, j = disk->free_idx; if (j <= i) { @@ -2022,7 +2071,7 @@ _dispatch_disk_perform(void *ctxt) // For performance analysis if (!op->total && dispatch_io_defaults.initial_delivery) { // Empty delivery to signal the start of the operation - _dispatch_fd_debug("initial delivery", op->fd_entry->fd); + _dispatch_op_debug("initial delivery", op); _dispatch_operation_deliver_data(op, DOP_DELIVER); } // Advise two chunks if the list only has one element and this is the @@ -2038,7 +2087,9 @@ _dispatch_disk_perform(void *ctxt) int result = _dispatch_operation_perform(op); disk->advise_list[disk->req_idx] = NULL; disk->req_idx = (++disk->req_idx)%disk->advise_list_depth; + _dispatch_op_debug("async perform completion: disk %p", op, disk); dispatch_async(disk->pick_queue, ^{ + _dispatch_op_debug("perform completion", op); switch (result) { case DISPATCH_OP_DELIVER: _dispatch_operation_deliver_data(op, DOP_DEFAULT); @@ -2060,12 +2111,15 @@ _dispatch_disk_perform(void *ctxt) dispatch_assert(result); break; } + _dispatch_op_debug("deactivate: disk %p", op, disk); op->active = false; disk->io_active = false; _dispatch_disk_handler(disk); // Balancing the retain in _dispatch_disk_handler. Note that op must be // released at the very end, since it might hold the last reference to // the disk + _dispatch_op_debug("release -> %d (disk perform complete)", op, + op->do_ref_cnt); _dispatch_release(op); }); } @@ -2076,6 +2130,8 @@ _dispatch_disk_perform(void *ctxt) static void _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size) { + _dispatch_op_debug("advise", op); + if (_dispatch_io_get_error(op, NULL, true)) return; #ifdef __linux__ // linux does not support fcntl (F_RDAVISE) // define necessary datastructure and use readahead @@ -2123,6 +2179,7 @@ _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size) static int _dispatch_operation_perform(dispatch_operation_t op) { + _dispatch_op_debug("perform", op); int err = _dispatch_io_get_error(op, NULL, true); if (err) { goto error; @@ -2151,7 +2208,7 @@ _dispatch_operation_perform(dispatch_operation_t op) op->buf_siz = max_buf_siz; } op->buf = valloc(op->buf_siz); - _dispatch_fd_debug("buffer allocated", op->fd_entry->fd); + _dispatch_op_debug("buffer allocated", op); } else if (op->direction == DOP_DIR_WRITE) { // Always write the first data piece, if that is smaller than a // chunk, accumulate further data pieces until chunk size is reached @@ -2177,7 +2234,7 @@ _dispatch_operation_perform(dispatch_operation_t op) op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf, NULL); _dispatch_io_data_release(d); - _dispatch_fd_debug("buffer mapped", op->fd_entry->fd); + _dispatch_op_debug("buffer mapped", op); } } if (op->fd_entry->fd == -1) { @@ -2214,7 +2271,7 @@ _dispatch_operation_perform(dispatch_operation_t op) } // EOF is indicated by two handler invocations if (processed == 0) { - _dispatch_fd_debug("EOF", op->fd_entry->fd); + _dispatch_op_debug("performed: EOF", op); return DISPATCH_OP_DELIVER_AND_COMPLETE; } op->buf_len += (size_t)processed; @@ -2230,7 +2287,7 @@ _dispatch_operation_perform(dispatch_operation_t op) if (err == EAGAIN) { // For disk based files with blocking I/O we should never get EAGAIN dispatch_assert(!op->fd_entry->disk); - _dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err); + _dispatch_op_debug("performed: EAGAIN", op); if (op->direction == DOP_DIR_READ && op->total && op->channel == op->fd_entry->convenience_channel) { // Convenience read with available data completes on EAGAIN @@ -2238,6 +2295,7 @@ _dispatch_operation_perform(dispatch_operation_t op) } return DISPATCH_OP_RESUME; } + _dispatch_op_debug("performed: err %d", op, err); op->err = err; switch (err) { case ECANCELED: @@ -2267,7 +2325,7 @@ _dispatch_operation_deliver_data(dispatch_operation_t op, deliver = true; } else if (op->buf_len < op->buf_siz) { // Request buffer is not yet used up - _dispatch_fd_debug("buffer data", op->fd_entry->fd); + _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered); return; } } else { @@ -2321,17 +2379,14 @@ _dispatch_operation_deliver_data(dispatch_operation_t op, } if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) { op->undelivered = undelivered; - _dispatch_fd_debug("buffer data", op->fd_entry->fd); + _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered); return; } op->undelivered = 0; _dispatch_object_debug(op, "%s", __func__); - _dispatch_fd_debug("deliver data", op->fd_entry->fd); + _dispatch_op_debug("deliver data", op); dispatch_op_direction_t direction = op->direction; dispatch_io_handler_t handler = op->handler; -#if DISPATCH_IO_DEBUG - int fd = op->fd_entry->fd; -#endif dispatch_fd_entry_t fd_entry = op->fd_entry; _dispatch_fd_entry_retain(fd_entry); dispatch_io_t channel = op->channel; @@ -2343,7 +2398,7 @@ _dispatch_operation_deliver_data(dispatch_operation_t op, if (done) { if (direction == DOP_DIR_READ && err) { if (dispatch_data_get_size(d)) { - _dispatch_fd_debug("IO handler invoke", fd); + _dispatch_op_debug("IO handler invoke", op); handler(false, d, 0); } d = NULL; @@ -2351,7 +2406,7 @@ _dispatch_operation_deliver_data(dispatch_operation_t op, d = NULL; } } - _dispatch_fd_debug("IO handler invoke", fd); + _dispatch_op_debug("IO handler invoke: err %d", op, err); handler(done, d, err); _dispatch_release(channel); _dispatch_fd_entry_release(fd_entry); diff --git a/src/once.c b/src/once.c index 82885ccd9..d7d6a8e64 100644 --- a/src/once.c +++ b/src/once.c @@ -60,7 +60,7 @@ dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) dispatch_thread_event_t event; if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) { - dow.dow_thread = _dispatch_thread_port(); + dow.dow_thread = _dispatch_tid_self(); _dispatch_client_callout(ctxt, func); // The next barrier must be long and strong. diff --git a/src/provider.d b/src/provider.d index 828f95c2d..ede3c56b3 100644 --- a/src/provider.d +++ b/src/provider.d @@ -66,7 +66,7 @@ provider dispatch { * * Timer configuration indicates that dispatch_source_set_timer() was called. * Timer programming indicates that the dispatch manager is about to sleep - * for 'deadline' (but may wake up earlier if non-timer events occur). + * for 'deadline' ns (but may wake up earlier if non-timer events occur). * Time parameters are in nanoseconds, a value of -1 means "forever". * * dispatch$target:libdispatch*.dylib::timer-configure diff --git a/src/queue.c b/src/queue.c index 465fe31aa..0c058be55 100644 --- a/src/queue.c +++ b/src/queue.c @@ -1235,56 +1235,38 @@ _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa, DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute"); } - _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit; - qos_class_t qos = dqa->dqa_qos_class; - dispatch_queue_flags_t dqf = 0; - const void *vtable; + // + // Step 1: Normalize arguments (qos, overcommit, tq) + // - if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) { - if (tq->do_targetq) { - DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and " - "a non global target queue"); - } + qos_class_t qos = dqa->dqa_qos_class; +#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK + if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE && + !_dispatch_root_queues[ + DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) { + qos = _DISPATCH_QOS_CLASS_USER_INITIATED; } - - if (legacy) { - // if any of these attributes is specified, use non legacy classes - if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency -#if 0 // - || overcommit != _dispatch_queue_attr_overcommit_unspecified #endif - ) { - legacy = false; + bool maintenance_fallback = false; +#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK + maintenance_fallback = true; +#endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK + if (maintenance_fallback) { + if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE && + !_dispatch_root_queues[ + DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) { + qos = _DISPATCH_QOS_CLASS_BACKGROUND; } } - if (legacy) { - vtable = DISPATCH_VTABLE(queue); - } else if (dqa->dqa_concurrent) { - vtable = DISPATCH_VTABLE(queue_concurrent); - } else { - vtable = DISPATCH_VTABLE(queue_serial); - } - switch (dqa->dqa_autorelease_frequency) { - case DISPATCH_AUTORELEASE_FREQUENCY_NEVER: - dqf |= DQF_AUTORELEASE_NEVER; - break; - case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM: - dqf |= DQF_AUTORELEASE_ALWAYS; - break; - } - if (label) { - const char *tmp = _dispatch_strdup_if_mutable(label); - if (tmp != label) { - dqf |= DQF_LABEL_NEEDS_FREE; - label = tmp; + + _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit; + if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) { + if (tq->do_targetq) { + DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and " + "a non-global target queue"); } } - dispatch_queue_t dq = _dispatch_alloc(vtable, - sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD); - _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ? - DISPATCH_QUEUE_WIDTH_MAX : 1, dqa->dqa_inactive); - dq->dq_label = label; if (tq && !tq->do_targetq && tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) { // Handle discrepancies between attr and target queue, attributes win @@ -1296,15 +1278,8 @@ _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa, } } if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) { - if (overcommit == _dispatch_queue_attr_overcommit_enabled) { - if (!(tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG)) { - tq++; - } - } else { - if (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) { - tq--; - } - } + tq = _dispatch_get_root_queue_with_overcommit(tq, + overcommit == _dispatch_queue_attr_overcommit_enabled); } else { tq = NULL; } @@ -1327,43 +1302,68 @@ _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa, _dispatch_queue_attr_overcommit_enabled; } } -#if HAVE_PTHREAD_WORKQUEUE_QOS - dq->dq_priority = (dispatch_priority_t)_pthread_qos_class_encode(qos, - dqa->dqa_relative_priority, - overcommit == _dispatch_queue_attr_overcommit_enabled ? - _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0); -#endif if (!tq) { - if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) { - qos = _DISPATCH_QOS_CLASS_DEFAULT; - } -#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK - if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE && - !_dispatch_root_queues[ - DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) { - qos = _DISPATCH_QOS_CLASS_USER_INITIATED; - } -#endif - bool maintenance_fallback = false; -#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK - maintenance_fallback = true; -#endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK - if (maintenance_fallback) { - if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE && - !_dispatch_root_queues[ - DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) { - qos = _DISPATCH_QOS_CLASS_BACKGROUND; - } - } - - tq = _dispatch_get_root_queue(qos, overcommit == + qos_class_t tq_qos = qos == _DISPATCH_QOS_CLASS_UNSPECIFIED ? + _DISPATCH_QOS_CLASS_DEFAULT : qos; + tq = _dispatch_get_root_queue(tq_qos, overcommit == _dispatch_queue_attr_overcommit_enabled); if (slowpath(!tq)) { DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute"); } + } + + // + // Step 2: Initialize the queue + // + + if (legacy) { + // if any of these attributes is specified, use non legacy classes + if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) { + legacy = false; + } + } + + const void *vtable; + dispatch_queue_flags_t dqf = 0; + if (legacy) { + vtable = DISPATCH_VTABLE(queue); + } else if (dqa->dqa_concurrent) { + vtable = DISPATCH_VTABLE(queue_concurrent); } else { + vtable = DISPATCH_VTABLE(queue_serial); + } + switch (dqa->dqa_autorelease_frequency) { + case DISPATCH_AUTORELEASE_FREQUENCY_NEVER: + dqf |= DQF_AUTORELEASE_NEVER; + break; + case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM: + dqf |= DQF_AUTORELEASE_ALWAYS; + break; + } + if (label) { + const char *tmp = _dispatch_strdup_if_mutable(label); + if (tmp != label) { + dqf |= DQF_LABEL_NEEDS_FREE; + label = tmp; + } + } + + dispatch_queue_t dq = _dispatch_alloc(vtable, + sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD); + _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ? + DISPATCH_QUEUE_WIDTH_MAX : 1, dqa->dqa_inactive); + + dq->dq_label = label; + +#if HAVE_PTHREAD_WORKQUEUE_QOS + dq->dq_priority = (dispatch_priority_t)_pthread_qos_class_encode(qos, + dqa->dqa_relative_priority, + overcommit == _dispatch_queue_attr_overcommit_enabled ? + _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0); +#endif + _dispatch_retain(tq); + if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) { // legacy way of inherithing the QoS from the target - _dispatch_retain(tq); _dispatch_queue_priority_inherit_from_target(dq, tq); } if (!dqa->dqa_inactive) { @@ -2194,6 +2194,21 @@ _dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label } #endif +dispatch_queue_t +dispatch_pthread_root_queue_copy_current(void) +{ + dispatch_queue_t dq = _dispatch_queue_get_current(); + if (!dq) return NULL; + while (slowpath(dq->do_targetq)) { + dq = dq->do_targetq; + } + if (dx_type(dq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE || + dq->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) { + return NULL; + } + return (dispatch_queue_t)_os_object_retain_with_resurrect(dq->_as_os_obj); +} + #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES void @@ -2550,7 +2565,7 @@ _dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp, if (!pflags) return; int r = _pthread_set_properties_self(pflags, pp, kv); if (r == EINVAL) { - DISPATCH_INTERNAL_CRASH(0, "_pthread_set_properties_self failed"); + DISPATCH_INTERNAL_CRASH(pp, "_pthread_set_properties_self failed"); } (void)dispatch_assume_zero(r); } @@ -2716,21 +2731,23 @@ _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags, voucher_t voucher, pthread_priority_t pri, dispatch_block_t block) { flags = _dispatch_block_normalize_flags(flags); -#if HAVE_PTHREAD_WORKQUEUE_QOS bool assign = (flags & DISPATCH_BLOCK_ASSIGN_CURRENT); if (assign && !(flags & DISPATCH_BLOCK_HAS_VOUCHER)) { +#if OS_VOUCHER_ACTIVITY_SPI voucher = VOUCHER_CURRENT; +#endif flags |= DISPATCH_BLOCK_HAS_VOUCHER; } +#if OS_VOUCHER_ACTIVITY_SPI if (voucher == VOUCHER_CURRENT) { voucher = _voucher_get(); } +#endif if (assign && !(flags & DISPATCH_BLOCK_HAS_PRIORITY)) { pri = _dispatch_priority_propagate(); flags |= DISPATCH_BLOCK_HAS_PRIORITY; } -#endif dispatch_block_t db = _dispatch_block_create(flags, voucher, pri, block); #if DISPATCH_DEBUG dispatch_assert(_dispatch_block_get_data(db)); @@ -3176,10 +3193,24 @@ _dispatch_async_redirect_invoke(dispatch_continuation_t dc, dispatch_thread_frame_s dtf; struct dispatch_continuation_s *other_dc = dc->dc_other; dispatch_invoke_flags_t ctxt_flags = (dispatch_invoke_flags_t)dc->dc_ctxt; - dispatch_queue_t dq = dc->dc_data, rq, old_dq, old_rq = NULL; + // if we went through _dispatch_root_queue_push_override, + // the "right" root queue was stuffed into dc_func + dispatch_queue_t assumed_rq = (dispatch_queue_t)dc->dc_func; + dispatch_queue_t dq = dc->dc_data, rq, old_dq; + struct _dispatch_identity_s di; pthread_priority_t op, dp, old_dp; + if (ctxt_flags) { + flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK; + flags |= ctxt_flags; + } + old_dq = _dispatch_get_current_queue(); + if (assumed_rq) { + _dispatch_queue_set_current(assumed_rq); + _dispatch_root_queue_identity_assume(&di, 0); + } + old_dp = _dispatch_set_defaultpriority(dq->dq_priority, &dp); op = dq->dq_override; if (op > (dp & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) { @@ -3188,28 +3219,19 @@ _dispatch_async_redirect_invoke(dispatch_continuation_t dc, _dispatch_set_defaultpriority_override(); } - if (ctxt_flags) { - flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK; - flags |= ctxt_flags; - } - if (dc->dc_func) { - // if we went through _dispatch_root_queue_push_override, - // the "right" root queue was stuffed into dc_func - old_rq = _dispatch_get_current_queue(); - _dispatch_queue_set_current((dispatch_queue_t)dc->dc_func); - } _dispatch_thread_frame_push(&dtf, dq); _dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER, DISPATCH_OBJ_CONSUME_BIT, { _dispatch_continuation_pop(other_dc, dq, flags); }); - _dispatch_reset_defaultpriority(old_dp); _dispatch_thread_frame_pop(&dtf); - if (old_rq) { - _dispatch_queue_set_current(old_rq); + if (assumed_rq) { + _dispatch_root_queue_identity_restore(&di); + _dispatch_queue_set_current(old_dq); } + _dispatch_reset_defaultpriority(old_dp); + rq = dq->do_targetq; - old_dq = _dispatch_get_current_queue(); while (slowpath(rq->do_targetq) && rq != old_dq) { _dispatch_non_barrier_complete(rq); rq = rq->do_targetq; @@ -4053,17 +4075,14 @@ _dispatch_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp, } if (target) { return _dispatch_queue_class_wakeup(dq, pp, flags, target); -#if HAVE_PTHREAD_WORKQUEUE_QOS } else if (pp) { return _dispatch_queue_class_override_drainer(dq, pp, flags); -#endif } else if (flags & DISPATCH_WAKEUP_CONSUME) { return _dispatch_release_tailcall(dq); } } #if DISPATCH_COCOA_COMPAT - DISPATCH_ALWAYS_INLINE static inline bool _dispatch_runloop_handle_is_valid(dispatch_runloop_handle_t handle) @@ -4104,11 +4123,13 @@ _dispatch_runloop_queue_set_handle(dispatch_queue_t dq, dispatch_runloop_handle_ #error "runloop support not implemented on this platform" #endif } +#endif // DISPATCH_COCOA_COMPAT void _dispatch_runloop_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp, dispatch_wakeup_flags_t flags) { +#if DISPATCH_COCOA_COMPAT if (slowpath(_dispatch_queue_atomic_flags(dq) & DQF_RELEASED)) { // return _dispatch_queue_wakeup(dq, pp, flags); @@ -4127,12 +4148,13 @@ _dispatch_runloop_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp, _dispatch_thread_override_end(owner, dq); return; } - if (flags & DISPATCH_WAKEUP_CONSUME) { return _dispatch_release_tailcall(dq); } -} +#else + return _dispatch_queue_wakeup(dq, pp, flags); #endif +} void _dispatch_main_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp, @@ -4232,9 +4254,6 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n) int r; _dispatch_debug_root_queue(dq, __func__); - dispatch_once_f(&_dispatch_root_queues_pred, NULL, - _dispatch_root_queues_init_once); - #if HAVE_PTHREAD_WORKQUEUES #if DISPATCH_USE_PTHREAD_POOL if (qc->dgq_kworkqueue != (void*)(~0ul)) @@ -4865,13 +4884,30 @@ _dispatch_queue_override_invoke(dispatch_continuation_t dc, DISPATCH_ALWAYS_INLINE static inline bool -_dispatch_root_queue_push_needs_override(dispatch_queue_t rq, +_dispatch_need_global_root_queue_push_override(dispatch_queue_t rq, pthread_priority_t pp) { - if (dx_type(rq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE || !rq->dq_priority) { - return false; - } - return pp > (rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK); + pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK; + bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG; + + if (unlikely(!rqp)) return false; + + pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK; + return defaultqueue ? pp && pp != rqp : pp > rqp; +} + +DISPATCH_ALWAYS_INLINE +static inline bool +_dispatch_need_global_root_queue_push_override_stealer(dispatch_queue_t rq, + pthread_priority_t pp) +{ + pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK; + bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG; + + if (unlikely(!rqp)) return false; + + pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK; + return defaultqueue || pp > rqp; } DISPATCH_NOINLINE @@ -5008,8 +5044,10 @@ _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq, } apply_again: - if (_dispatch_root_queue_push_needs_override(tq, pp)) { - _dispatch_root_queue_push_override_stealer(tq, dq, pp); + if (dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) { + if (_dispatch_need_global_root_queue_push_override_stealer(tq, pp)) { + _dispatch_root_queue_push_override_stealer(tq, dq, pp); + } } else if (_dispatch_queue_need_override(tq, pp)) { dx_wakeup(tq, pp, DISPATCH_WAKEUP_OVERRIDING); } @@ -5035,12 +5073,14 @@ _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq, return _dispatch_release_tailcall(dq); } } +#endif // HAVE_PTHREAD_WORKQUEUE_QOS DISPATCH_NOINLINE void _dispatch_queue_class_override_drainer(dispatch_queue_t dq, pthread_priority_t pp, dispatch_wakeup_flags_t flags) { +#if HAVE_PTHREAD_WORKQUEUE_QOS uint64_t dq_state, value; // @@ -5062,10 +5102,14 @@ _dispatch_queue_class_override_drainer(dispatch_queue_t dq, return _dispatch_queue_class_wakeup_with_override(dq, pp, flags, dq_state); } +#else + (void)pp; +#endif // HAVE_PTHREAD_WORKQUEUE_QOS if (flags & DISPATCH_WAKEUP_CONSUME) { return _dispatch_release_tailcall(dq); } } + #if DISPATCH_USE_KEVENT_WORKQUEUE DISPATCH_NOINLINE static void @@ -5091,8 +5135,7 @@ _dispatch_trystash_to_deferred_items(dispatch_queue_t dq, dispatch_object_t dou, dq = old_dq; dou._do = old_dou; } - if ((pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK) > - (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) { + if (_dispatch_need_global_root_queue_push_override(dq, pp)) { return _dispatch_root_queue_push_override(dq, dou, pp); } // bit of cheating: we should really pass `pp` but we know that we are @@ -5102,7 +5145,16 @@ _dispatch_trystash_to_deferred_items(dispatch_queue_t dq, dispatch_object_t dou, _dispatch_queue_push_inline(dq, dou, 0, 0); } #endif -#endif // HAVE_PTHREAD_WORKQUEUE_QOS + +DISPATCH_NOINLINE +static void +_dispatch_queue_push_slow(dispatch_queue_t dq, dispatch_object_t dou, + pthread_priority_t pp) +{ + dispatch_once_f(&_dispatch_root_queues_pred, NULL, + _dispatch_root_queues_init_once); + _dispatch_queue_push(dq, dou, pp); +} DISPATCH_NOINLINE void @@ -5110,17 +5162,21 @@ _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou, pthread_priority_t pp) { _dispatch_assert_is_valid_qos_override(pp); - if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE && dq->dq_priority) { + if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) { #if DISPATCH_USE_KEVENT_WORKQUEUE dispatch_deferred_items_t ddi = _dispatch_deferred_items_get(); if (unlikely(ddi && !(ddi->ddi_stashed_pp & (dispatch_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK))) { + dispatch_assert(_dispatch_root_queues_pred == DLOCK_ONCE_DONE); return _dispatch_trystash_to_deferred_items(dq, dou, pp, ddi); } #endif #if HAVE_PTHREAD_WORKQUEUE_QOS - if ((pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK) > - (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) { + // can't use dispatch_once_f() as it would create a frame + if (unlikely(_dispatch_root_queues_pred != DLOCK_ONCE_DONE)) { + return _dispatch_queue_push_slow(dq, dou, pp); + } + if (_dispatch_need_global_root_queue_push_override(dq, pp)) { return _dispatch_root_queue_push_override(dq, dou, pp); } #endif @@ -5210,7 +5266,7 @@ _dispatch_queue_class_wakeup(dispatch_queue_t dq, pthread_priority_t pp, flags ^= DISPATCH_WAKEUP_SLOW_WAITER; dispatch_assert(!(flags & DISPATCH_WAKEUP_CONSUME)); - os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{ + os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, { new_state = old_state | bits; if (_dq_state_drain_pended(old_state)) { // same as DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT @@ -5218,21 +5274,22 @@ _dispatch_queue_class_wakeup(dispatch_queue_t dq, pthread_priority_t pp, new_state &= ~DISPATCH_QUEUE_DRAIN_OWNER_MASK; new_state &= ~DISPATCH_QUEUE_DRAIN_PENDED; } - if (likely(_dq_state_is_runnable(new_state) && - !_dq_state_drain_locked(new_state))) { - if (_dq_state_has_pending_barrier(old_state) || - new_state + pending_barrier_width < - DISPATCH_QUEUE_WIDTH_FULL_BIT) { - // see _dispatch_queue_drain_try_lock - new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK; - new_state ^= xor_owner_and_set_full_width_and_in_barrier; - } else { - new_state |= DISPATCH_QUEUE_ENQUEUED; - } - } else if (_dq_state_drain_locked(new_state)) { + if (unlikely(_dq_state_drain_locked(new_state))) { #ifdef DLOCK_NOWAITERS_BIT new_state &= ~(uint64_t)DLOCK_NOWAITERS_BIT; #endif + } else if (unlikely(!_dq_state_is_runnable(new_state) || + !(flags & DISPATCH_WAKEUP_FLUSH))) { + // either not runnable, or was not for the first item (26700358) + // so we should not try to lock and handle overrides instead + } else if (_dq_state_has_pending_barrier(old_state) || + new_state + pending_barrier_width < + DISPATCH_QUEUE_WIDTH_FULL_BIT) { + // see _dispatch_queue_drain_try_lock + new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK; + new_state ^= xor_owner_and_set_full_width_and_in_barrier; + } else { + new_state |= DISPATCH_QUEUE_ENQUEUED; } }); if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) { @@ -5397,7 +5454,7 @@ _dispatch_root_queue_drain_deferred_item(dispatch_queue_t dq, pp = _dispatch_priority_inherit_from_root_queue(pp, dq); _dispatch_queue_set_current(dq); - _dispatch_root_queue_identity_assume(&di, pp, DISPATCH_IGNORE_UNBIND); + _dispatch_root_queue_identity_assume(&di, pp); #if DISPATCH_COCOA_COMPAT void *pool = _dispatch_last_resort_autorelease_pool_push(); #endif // DISPATCH_COCOA_COMPAT @@ -5704,7 +5761,7 @@ _dispatch_runloop_queue_handle_init(void *ctxt) handle = fd; #else #error "runloop support not implemented on this platform" -#endif +#endif _dispatch_runloop_queue_set_handle(dq, handle); _dispatch_program_is_probably_callback_driven = true; @@ -5766,7 +5823,13 @@ _dispatch_queue_set_mainq_drain_state(bool arg) } void -_dispatch_main_queue_callback_4CF(void *ignored DISPATCH_UNUSED) +_dispatch_main_queue_callback_4CF( +#if TARGET_OS_MAC + mach_msg_header_t *_Null_unspecified msg +#else + void *ignored +#endif + DISPATCH_UNUSED) { if (main_q_is_draining) { return; @@ -5781,6 +5844,9 @@ _dispatch_main_queue_callback_4CF(void *ignored DISPATCH_UNUSED) void dispatch_main(void) { + dispatch_once_f(&_dispatch_root_queues_pred, NULL, + _dispatch_root_queues_init_once); + #if HAVE_PTHREAD_MAIN_NP if (pthread_main_np()) { #endif diff --git a/src/semaphore.c b/src/semaphore.c index 09d68108f..4d232b7eb 100644 --- a/src/semaphore.c +++ b/src/semaphore.c @@ -260,7 +260,7 @@ _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, #if USE_MACH_SEM mach_timespec_t _timeout; kern_return_t kr; -#elif USE_POSIX_SEM +#elif USE_POSIX_SEM struct timespec _timeout; int ret; #elif USE_WIN32_SEM diff --git a/src/shims/linux_stubs.c b/src/shims/linux_stubs.c index c0f7e5932..07ee8bc06 100644 --- a/src/shims/linux_stubs.c +++ b/src/shims/linux_stubs.c @@ -51,4 +51,3 @@ unsigned short dispatch_callout__return_semaphore; unsigned short dispatch_queue__push_semaphore; void (*_dispatch_block_special_invoke)(void*); struct dispatch_queue_attr_s _dispatch_queue_attr_concurrent; - diff --git a/src/shims/linux_stubs.h b/src/shims/linux_stubs.h index e13304906..0c12e8272 100644 --- a/src/shims/linux_stubs.h +++ b/src/shims/linux_stubs.h @@ -47,8 +47,6 @@ typedef uint32_t voucher_activity_trace_id_t; typedef uint32_t voucher_activity_id_t; -typedef uint32_t _voucher_activity_buffer_hook_t;; - typedef uint32_t voucher_activity_flag_t; typedef struct { } mach_msg_header_t; diff --git a/src/shims/lock.h b/src/shims/lock.h index 5c2dfc502..246c80738 100644 --- a/src/shims/lock.h +++ b/src/shims/lock.h @@ -497,12 +497,8 @@ _dispatch_gate_tryenter(dispatch_gate_t l) DLOCK_GATE_UNLOCKED, tid_self, acquire)); } -DISPATCH_ALWAYS_INLINE -static inline void -_dispatch_gate_wait(dispatch_gate_t l, dispatch_lock_options_t flags) -{ - _dispatch_gate_wait_slow(l, DLOCK_GATE_UNLOCKED, flags); -} +#define _dispatch_gate_wait(l, flags) \ + _dispatch_gate_wait_slow(l, DLOCK_GATE_UNLOCKED, flags) DISPATCH_ALWAYS_INLINE static inline void @@ -523,13 +519,9 @@ _dispatch_once_gate_tryenter(dispatch_once_gate_t l) DLOCK_ONCE_UNLOCKED, tid_self, acquire)); } -DISPATCH_ALWAYS_INLINE -static inline void -_dispatch_once_gate_wait(dispatch_once_gate_t l) -{ - _dispatch_gate_wait_slow(&l->dgo_gate, (dispatch_lock)DLOCK_ONCE_DONE, - DLOCK_LOCK_NONE); -} +#define _dispatch_once_gate_wait(l) \ + _dispatch_gate_wait_slow(&(l)->dgo_gate, (dispatch_lock)DLOCK_ONCE_DONE, \ + DLOCK_LOCK_NONE) DISPATCH_ALWAYS_INLINE static inline void diff --git a/src/shims/tsd.h b/src/shims/tsd.h index 2edba3959..2e3ece8b0 100644 --- a/src/shims/tsd.h +++ b/src/shims/tsd.h @@ -278,7 +278,7 @@ _dispatch_thread_setspecific_packed_pair(pthread_key_t k1, pthread_key_t k2, _dispatch_thread_setspecific(k1, p[0]); _dispatch_thread_setspecific(k2, p[1]); } -#endif +#endif #if TARGET_OS_WIN32 #define _dispatch_thread_self() ((uintptr_t)GetCurrentThreadId()) @@ -293,12 +293,10 @@ _dispatch_thread_setspecific_packed_pair(pthread_key_t k1, pthread_key_t k2, #if TARGET_OS_WIN32 #define _dispatch_thread_port() ((mach_port_t)0) -#else +#elif !DISPATCH_USE_THREAD_LOCAL_STORAGE #if DISPATCH_USE_DIRECT_TSD #define _dispatch_thread_port() ((mach_port_t)(uintptr_t)\ _dispatch_thread_getspecific(_PTHREAD_TSD_SLOT_MACH_THREAD_SELF)) -#elif DISPATCH_USE_THREAD_LOCAL_STORAGE -#define _dispatch_thread_port() ((mach_port_t)(_dispatch_get_tsd_base()->tid)) #else #define _dispatch_thread_port() pthread_mach_thread_np(_dispatch_thread_self()) #endif diff --git a/src/source.c b/src/source.c index debd2b1ec..afb811c40 100644 --- a/src/source.c +++ b/src/source.c @@ -1031,10 +1031,8 @@ _dispatch_source_wakeup(dispatch_source_t ds, pthread_priority_t pp, if (tq) { return _dispatch_queue_class_wakeup(ds->_as_dq, pp, flags, tq); -#if HAVE_PTHREAD_WORKQUEUE_QOS } else if (pp) { return _dispatch_queue_class_override_drainer(ds->_as_dq, pp, flags); -#endif } else if (flags & DISPATCH_WAKEUP_CONSUME) { return _dispatch_release_tailcall(ds); } @@ -2439,7 +2437,7 @@ _dispatch_timers_calendar_change(void) _dispatch_timer_expired = true; for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) { _dispatch_timers_mask |= - 1 << DISPATCH_TIMER_INDEX(DISPATCH_TIMER_WALL_CLOCK, qos); + 1 << DISPATCH_TIMER_INDEX(DISPATCH_TIMER_KIND_WALL, qos); } } #endif @@ -3196,19 +3194,23 @@ _dispatch_memorypressure_handler(void *context DISPATCH_UNUSED) if (memorypressure & DISPATCH_MEMORYPRESSURE_NORMAL) { _dispatch_memory_warn = false; _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT; +#if VOUCHER_USE_MACH_VOUCHER if (_firehose_task_buffer) { firehose_buffer_clear_bank_flags(_firehose_task_buffer, FIREHOSE_BUFFER_BANK_FLAG_LOW_MEMORY); } +#endif } if (memorypressure & DISPATCH_MEMORYPRESSURE_WARN) { _dispatch_memory_warn = true; _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYPRESSURE_PRESSURE_WARN; +#if VOUCHER_USE_MACH_VOUCHER if (_firehose_task_buffer) { firehose_buffer_set_bank_flags(_firehose_task_buffer, FIREHOSE_BUFFER_BANK_FLAG_LOW_MEMORY); } +#endif } if (memorypressure & DISPATCH_MEMORYPRESSURE_MALLOC_MASK) { malloc_memory_event_handler(memorypressure & DISPATCH_MEMORYPRESSURE_MALLOC_MASK); @@ -4568,6 +4570,9 @@ _dispatch_mach_msg_recv(dispatch_mach_t dm, dispatch_mach_reply_refs_t dmr, DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT : DISPATCH_MACH_MSG_DESTRUCTOR_FREE; dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL); + if (hdr == _dispatch_kevent_mach_msg_buf(ke)) { + _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move, (uint64_t)hdr, (uint64_t)dmsg->dmsg_buf); + } dmsg->dmsg_voucher = voucher; dmsg->dmsg_priority = priority; dmsg->do_ctxt = ctxt; @@ -4587,6 +4592,7 @@ _dispatch_mach_msg_reply_recv(dispatch_mach_t dm, } void *ctxt = dmr->dmr_ctxt; mach_msg_header_t *hdr, *hdr2 = NULL; + void *hdr_copyout_addr; mach_msg_size_t siz, msgsiz = 0; mach_msg_return_t kr; mach_msg_option_t options; @@ -4604,6 +4610,7 @@ _dispatch_mach_msg_reply_recv(dispatch_mach_t dm, (options & MACH_RCV_TIMEOUT) ? "poll" : "wait"); kr = mach_msg(hdr, options, 0, siz, reply_port, MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); + hdr_copyout_addr = hdr; _dispatch_debug_machport(reply_port); _dispatch_debug("machport[0x%08x]: MACH_RCV_MSG (size %u, opts 0x%x) " "returned: %s - 0x%x", reply_port, siz, options, @@ -4654,18 +4661,20 @@ _dispatch_mach_msg_reply_recv(dispatch_mach_t dm, dispatch_assume_zero(kr); break; } - if (slowpath(dm->dq_atomic_flags & DSF_CANCELED)) { + _dispatch_mach_msg_reply_received(dm, dmr, hdr->msgh_local_port); + hdr->msgh_local_port = MACH_PORT_NULL; + if (slowpath((dm->dq_atomic_flags & DSF_CANCELED) || kr)) { if (!kr) mach_msg_destroy(hdr); goto out; } - _dispatch_mach_msg_reply_received(dm, dmr, hdr->msgh_local_port); - hdr->msgh_local_port = MACH_PORT_NULL; - if (kr) goto out; dispatch_mach_msg_t dmsg; dispatch_mach_msg_destructor_t destructor = (!hdr2) ? DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT : DISPATCH_MACH_MSG_DESTRUCTOR_FREE; dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL); + if (!hdr2 || hdr != hdr_copyout_addr) { + _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move, (uint64_t)hdr_copyout_addr, (uint64_t)_dispatch_mach_msg_get_msg(dmsg)); + } dmsg->do_ctxt = ctxt; return dmsg; out: @@ -5904,8 +5913,12 @@ _dispatch_mach_install(dispatch_mach_t dm, pthread_priority_t pp) } if (dm->ds_is_direct_kevent) { pp &= (~_PTHREAD_PRIORITY_FLAGS_MASK | + _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG | _PTHREAD_PRIORITY_OVERCOMMIT_FLAG); // _dispatch_mach_reply_kevent_register assumes this has been done + // which is unlike regular sources or queues, the DEFAULTQUEUE flag + // is used so that the priority of that channel doesn't act as a floor + // QoS for incoming messages (26761457) dm->dq_priority = (dispatch_priority_t)pp; } dm->ds_is_installed = true; @@ -6090,10 +6103,8 @@ _dispatch_mach_wakeup(dispatch_mach_t dm, pthread_priority_t pp, done: if (tq) { return _dispatch_queue_class_wakeup(dm->_as_dq, pp, flags, tq); -#if HAVE_PTHREAD_WORKQUEUE_QOS } else if (pp) { return _dispatch_queue_class_override_drainer(dm->_as_dq, pp, flags); -#endif } else if (flags & DISPATCH_WAKEUP_CONSUME) { return _dispatch_release_tailcall(dm); } diff --git a/src/swift/Queue.swift b/src/swift/Queue.swift index ef9e70f91..c8498420e 100644 --- a/src/swift/Queue.swift +++ b/src/swift/Queue.swift @@ -249,7 +249,7 @@ public extension DispatchQueue { private func _syncHelper( fn: (@noescape () -> ()) -> (), execute work: @noescape () throws -> T, - rescue: ((Swift.Error) throws -> (T))) rethrows -> T + rescue: ((Swift.Error) throws -> (T))) rethrows -> T { var result: T? var error: Swift.Error? @@ -272,7 +272,7 @@ public extension DispatchQueue { fn: (DispatchWorkItem) -> (), flags: DispatchWorkItemFlags, execute work: @noescape () throws -> T, - rescue: ((Swift.Error) throws -> (T))) rethrows -> T + rescue: ((Swift.Error) throws -> (T))) rethrows -> T { var result: T? var error: Swift.Error? diff --git a/src/swift/Wrapper.swift b/src/swift/Wrapper.swift index e476769a2..a9a7cd549 100644 --- a/src/swift/Wrapper.swift +++ b/src/swift/Wrapper.swift @@ -334,4 +334,4 @@ internal enum _OSQoSClass : UInt32 { } @_silgen_name("_swift_dispatch_release") -internal func _swift_dispatch_release(_ obj: dispatch_object_t) -> Void +internal func _swift_dispatch_release(_ obj: dispatch_object_t) -> Void diff --git a/src/voucher.c b/src/voucher.c index 94a293427..ee04e3b19 100644 --- a/src/voucher.c +++ b/src/voucher.c @@ -1560,6 +1560,7 @@ _voucher_init(void) { } +#if OS_VOUCHER_ACTIVITY_SPI void* voucher_activity_get_metadata_buffer(size_t *length) { @@ -1631,6 +1632,7 @@ voucher_activity_initialize_4libtrace(voucher_activity_hooks_t hooks) { (void)hooks; } +#endif // OS_VOUCHER_ACTIVITY_SPI size_t _voucher_debug(voucher_t v, char* buf, size_t bufsiz) diff --git a/src/voucher_internal.h b/src/voucher_internal.h index 3aa1a6579..b34ad4643 100644 --- a/src/voucher_internal.h +++ b/src/voucher_internal.h @@ -90,8 +90,10 @@ voucher_get_mach_voucher(voucher_t voucher); void _voucher_init(void); void _voucher_atfork_child(void); void _voucher_activity_debug_channel_init(void); +#if OS_VOUCHER_ACTIVITY_SPI void _voucher_activity_swap(firehose_activity_id_t old_id, firehose_activity_id_t new_id); +#endif void _voucher_xref_dispose(voucher_t voucher); void _voucher_dispose(voucher_t voucher); size_t _voucher_debug(voucher_t v, char* buf, size_t bufsiz);