Skip to content

Commit 3da29dd

Browse files
Merge pull request #512 from adierking/cross-the-streams
event: implement source muxing on Windows
2 parents 3cf1bf3 + 96184ef commit 3da29dd

File tree

3 files changed

+359
-26
lines changed

3 files changed

+359
-26
lines changed

src/event/event_windows.c

Lines changed: 82 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -174,27 +174,62 @@ _dispatch_muxnote_create(dispatch_unote_t du,
174174
}
175175

176176
static void
177-
_dispatch_muxnote_stop(dispatch_muxnote_t dmn)
177+
_dispatch_muxnote_disarm_events(dispatch_muxnote_t dmn,
178+
enum _dispatch_muxnote_events events)
178179
{
179-
if (dmn->dmn_thread) {
180-
// Keep trying to cancel ReadFile() until the thread exits
181-
os_atomic_store(&dmn->dmn_stop, true, relaxed);
182-
SetEvent(dmn->dmn_event);
183-
do {
184-
CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL);
185-
} while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT);
186-
CloseHandle(dmn->dmn_thread);
187-
dmn->dmn_thread = NULL;
188-
}
189-
if (dmn->dmn_threadpool_wait) {
190-
SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL);
191-
WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait,
192-
/* fCancelPendingCallbacks */ FALSE);
193-
CloseThreadpoolWait(dmn->dmn_threadpool_wait);
194-
dmn->dmn_threadpool_wait = NULL;
195-
}
196-
if (dmn->dmn_handle_type == DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET) {
197-
WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0);
180+
long lNetworkEvents;
181+
dmn->dmn_events &= ~events;
182+
switch (dmn->dmn_handle_type) {
183+
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
184+
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
185+
186+
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
187+
break;
188+
189+
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
190+
if ((events & DISPATCH_MUXNOTE_EVENT_READ) && dmn->dmn_thread) {
191+
// Keep trying to cancel ReadFile() until the thread exits
192+
os_atomic_store(&dmn->dmn_stop, true, relaxed);
193+
SetEvent(dmn->dmn_event);
194+
do {
195+
CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL);
196+
} while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT);
197+
CloseHandle(dmn->dmn_thread);
198+
dmn->dmn_thread = NULL;
199+
}
200+
break;
201+
202+
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
203+
lNetworkEvents = dmn->dmn_network_events;
204+
if (events & DISPATCH_MUXNOTE_EVENT_READ) {
205+
lNetworkEvents &= ~FD_READ;
206+
}
207+
if (events & DISPATCH_MUXNOTE_EVENT_WRITE) {
208+
lNetworkEvents &= ~FD_WRITE;
209+
}
210+
if (lNetworkEvents == dmn->dmn_network_events) {
211+
break;
212+
}
213+
int iResult;
214+
if (lNetworkEvents & (FD_READ | FD_WRITE)) {
215+
iResult = WSAEventSelect((SOCKET)dmn->dmn_ident,
216+
(WSAEVENT)dmn->dmn_event, lNetworkEvents);
217+
} else {
218+
lNetworkEvents = 0;
219+
iResult = WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0);
220+
}
221+
if (iResult != 0) {
222+
DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect");
223+
}
224+
dmn->dmn_network_events = lNetworkEvents;
225+
if (!lNetworkEvents && dmn->dmn_threadpool_wait) {
226+
SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL);
227+
WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait,
228+
/* fCancelPendingCallbacks */ FALSE);
229+
CloseThreadpoolWait(dmn->dmn_threadpool_wait);
230+
dmn->dmn_threadpool_wait = NULL;
231+
}
232+
break;
198233
}
199234
}
200235

@@ -389,8 +424,16 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn)
389424
}
390425
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
391426
_dispatch_muxnote_retain(dmn);
392-
DWORD available =
427+
DWORD available;
428+
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) {
429+
// We can't query a pipe which has a read source open on it
430+
// because the ReadFile() in the background thread might cause
431+
// NtQueryInformationFile() to block
432+
available = 1;
433+
} else {
434+
available =
393435
_dispatch_pipe_write_availability((HANDLE)dmn->dmn_ident);
436+
}
394437
bSuccess = PostQueuedCompletionStatus(hPort, available,
395438
(ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_WRITE,
396439
(LPOVERLAPPED)dmn);
@@ -487,8 +530,12 @@ _dispatch_unote_register_muxed(dispatch_unote_t du)
487530
dmn = _dispatch_unote_muxnote_find(dmb, du._du->du_ident,
488531
du._du->du_filter);
489532
if (dmn) {
490-
WIN_PORT_ERROR();
491-
DISPATCH_INTERNAL_CRASH(0, "muxnote updating is not supported");
533+
if (events & ~dmn->dmn_events) {
534+
dmn->dmn_events |= events;
535+
if (_dispatch_io_trigger(dmn) == FALSE) {
536+
return false;
537+
}
538+
}
492539
} else {
493540
dmn = _dispatch_muxnote_create(du, events);
494541
if (!dmn) {
@@ -551,9 +598,18 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du)
551598
}
552599
dul->du_muxnote = NULL;
553600

554-
LIST_REMOVE(dmn, dmn_list);
555-
_dispatch_muxnote_stop(dmn);
556-
_dispatch_muxnote_release(dmn);
601+
enum _dispatch_muxnote_events disarmed = 0;
602+
if (LIST_EMPTY(&dmn->dmn_readers_head)) {
603+
disarmed |= DISPATCH_MUXNOTE_EVENT_READ;
604+
}
605+
if (LIST_EMPTY(&dmn->dmn_writers_head)) {
606+
disarmed |= DISPATCH_MUXNOTE_EVENT_WRITE;
607+
}
608+
_dispatch_muxnote_disarm_events(dmn, disarmed);
609+
if (!dmn->dmn_events) {
610+
LIST_REMOVE(dmn, dmn_list);
611+
_dispatch_muxnote_release(dmn);
612+
}
557613

558614
_dispatch_unote_state_set(du, DU_STATE_UNREGISTERED);
559615
return true;

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ set(DISPATCH_C_TESTS
127127
timer_bit63
128128
timer_set_time
129129
data
130+
io_muxed
130131
io_net
131132
io_pipe
132133
io_pipe_close
@@ -178,6 +179,7 @@ add_unit_test(dispatch_plusplus SOURCES dispatch_plusplus.cpp)
178179

179180
# test-specific link options
180181
if(WIN32)
182+
target_link_libraries(dispatch_io_muxed PRIVATE WS2_32)
181183
target_link_libraries(dispatch_io_net PRIVATE WS2_32)
182184
else()
183185
# When dispatch_group is reenabled above, remove this

0 commit comments

Comments
 (0)