From 44f70fd007536a10f766351af8e8db7204400e4e Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 20 Dec 2019 06:58:48 +0100 Subject: [PATCH 1/6] FIX hang on executor shutdown no wait with failure --- Lib/concurrent/futures/process.py | 39 +++++++++++++++++-------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9e2ab9db64f664..7b7d4fb99e944a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -87,7 +87,12 @@ def close(self): self._reader.close() def wakeup(self): - self._writer.send_bytes(b"") + try: + self._writer.send_bytes(b"") + except OSError: + # This can happen if the QueueManagerThread has been shutdown by + # another thread before this wakeup call. + pass def clear(self): while self._reader.poll(): @@ -160,8 +165,9 @@ def __init__(self, work_id, fn, args, kwargs): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items): + def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): self.pending_work_items = pending_work_items + self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) def _on_queue_feeder_error(self, e, obj): @@ -169,6 +175,7 @@ def _on_queue_feeder_error(self, e, obj): tb = traceback.format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) + self.thread_wakeup.wakeup() # work_item can be None if another process terminated. In this case, # the queue_manager_thread fails all work_items with BrokenProcessPool if work_item is not None: @@ -339,6 +346,8 @@ def shutdown_worker(): # Release the queue's resources as soon as possible. call_queue.close() + call_queue.join_thread() + thread_wakeup.close() # If .join() is not called on the created processes then # some ctx.Queue methods may deadlock on Mac OS X. for p in processes.values(): @@ -547,6 +556,14 @@ def __init__(self, max_workers=None, mp_context=None, self._queue_count = 0 self._pending_work_items = {} + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of queue_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send the wakeup signal to the queue_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + self._queue_management_thread_wakeup = _ThreadWakeup() + # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big @@ -554,7 +571,8 @@ def __init__(self, max_workers=None, mp_context=None, queue_size = self._max_workers + EXTRA_QUEUED_CALLS self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, - pending_work_items=self._pending_work_items) + pending_work_items=self._pending_work_items, + thread_wakeup=self._queue_management_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. @@ -562,14 +580,6 @@ def __init__(self, max_workers=None, mp_context=None, self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() - # _ThreadWakeup is a communication channel used to interrupt the wait - # of the main loop of queue_manager_thread from another thread (e.g. - # when calling executor.submit or executor.shutdown). We do not use the - # _result_queue to send the wakeup signal to the queue_manager_thread - # as it could result in a deadlock if a worker process dies with the - # _result_queue write lock still acquired. - self._queue_management_thread_wakeup = _ThreadWakeup() - def _start_queue_management_thread(self): if self._queue_management_thread is None: # When the executor gets garbarge collected, the weakref callback @@ -671,16 +681,11 @@ def shutdown(self, wait=True): # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None - if self._call_queue is not None: - self._call_queue.close() - if wait: - self._call_queue.join_thread() - self._call_queue = None + self._call_queue = None self._result_queue = None self._processes = None if self._queue_management_thread_wakeup: - self._queue_management_thread_wakeup.close() self._queue_management_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ From 38d8c985123bc6fec7f3b37c3f81b0efe2da43c8 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 20 Dec 2019 07:44:24 +0100 Subject: [PATCH 2/6] TST non regression test for #6084 --- Lib/test/test_concurrent_futures.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index c97351636e8692..4d0f39f0d0fbb1 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1027,6 +1027,32 @@ def test_shutdown_deadlock(self): with self.assertRaises(BrokenProcessPool): f.result() + def test_shutdown_deadlock_pickle(self): + # Test that the pool calling shutdown with wait=False does not cause + # a deadlock if a task fails at pickle after the shutdown call. + # Reported in GH#6034. + self.executor.shutdown(wait=True) + with self.executor_type(max_workers=2, + mp_context=get_context(self.ctx)) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + + # Start the executor and get the queue_management_thread to collect + # the threads and avoid dangling thread that should be cleaned up + # asynchronously. + executor.submit(id, 42).result() + queue_manager = executor._queue_management_thread + + # Submit a task that fails at pickle and shutdown the executor + # without waiting + f = executor.submit(id, ErrorAtPickle()) + executor.shutdown(wait=False) + with self.assertRaises(PicklingError): + f.result() + + # Make sure the executor is eventually shutdown and do not leave + # dangling threads + queue_manager.join() + create_executor_tests(ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, From b4ec893d2293a8344e8e105a7b8052ade8359eef Mon Sep 17 00:00:00 2001 From: Pat Buxton Date: Thu, 19 Dec 2019 06:20:21 -0800 Subject: [PATCH 3/6] bpo-39098 Fix OSError: handle is closed in ProcessPoolExecutor on shutdown(wait=False) When a ProcessPoolExecutor was created, a job added and then shutdown with wait=False, an OSError: handle is closed error was raised by the ThreadWakeup class. The threadwakeup should not be closed on shutdown if wait is False, but in the shutdown_worker method. --- Lib/concurrent/futures/process.py | 19 ++++++++++--------- Lib/test/test_concurrent_futures.py | 23 +++++++++++++++++++++++ 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7b7d4fb99e944a..142cc639706388 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -80,23 +80,23 @@ class _ThreadWakeup: def __init__(self): + self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) def close(self): - self._writer.close() - self._reader.close() + if not self._closed: + self._closed = True + self._writer.close() + self._reader.close() def wakeup(self): - try: + if not self._closed: self._writer.send_bytes(b"") - except OSError: - # This can happen if the QueueManagerThread has been shutdown by - # another thread before this wakeup call. - pass def clear(self): - while self._reader.poll(): - self._reader.recv_bytes() + if not self._closed: + while self._reader.poll(): + self._reader.recv_bytes() def _python_exit(): @@ -448,6 +448,7 @@ def shutdown_worker(): # this thread if there are no pending work items. if not pending_work_items: shutdown_worker() + thread_wakeup = None return except Full: # This is not a problem: we will eventually be woken up (in diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 4d0f39f0d0fbb1..a83433b037867b 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -379,6 +379,14 @@ def test_del_shutdown(self): for t in threads: t.join() + def test_shutdown_no_wait(self): + executor = futures.ThreadPoolExecutor(max_workers=5) + executor.map(abs, range(-5, 5)) + threads = executor._threads + executor.shutdown(wait=False) + for t in threads: + t.join() + def test_thread_names_assigned(self): executor = futures.ThreadPoolExecutor( max_workers=5, thread_name_prefix='SpecialPool') @@ -443,6 +451,21 @@ def test_del_shutdown(self): p.join() call_queue.join_thread() + def test_shutdown_no_wait(self): + executor = futures.ProcessPoolExecutor(max_workers=5) + list(executor.map(abs, range(-5, 5))) + processes = executor._processes + call_queue = executor._call_queue + queue_management_thread = executor._queue_management_thread + executor.shutdown(wait=False) + + # Make sure that all the executor resources were properly cleaned by + # the shutdown process + queue_management_thread.join() + for p in processes.values(): + p.join() + call_queue.join_thread() + create_executor_tests(ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, From de5ef88787a477f0023637ddce78f4a3af3c234d Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 14 Feb 2020 15:00:12 +0100 Subject: [PATCH 4/6] CLN improve test and remove unnecessary code --- Lib/concurrent/futures/process.py | 1 - Lib/test/test_concurrent_futures.py | 31 ++++++++++++++++++++++++----- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 142cc639706388..f47c5fda5c22bd 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -448,7 +448,6 @@ def shutdown_worker(): # this thread if there are no pending work items. if not pending_work_items: shutdown_worker() - thread_wakeup = None return except Full: # This is not a problem: we will eventually be woken up (in diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index a83433b037867b..736f77171365ec 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -372,21 +372,32 @@ def test_context_manager_shutdown(self): def test_del_shutdown(self): executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) + res = executor.map(abs, range(-5, 5)) threads = executor._threads del executor for t in threads: t.join() + # Make sure the results where all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + def test_shutdown_no_wait(self): + # Ensure that the executor cleans up the threads when calling + # shutdown with wait=False executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) + res = executor.map(abs, range(-5, 5)) threads = executor._threads executor.shutdown(wait=False) for t in threads: t.join() + # Make sure the results where all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + + def test_thread_names_assigned(self): executor = futures.ThreadPoolExecutor( max_workers=5, thread_name_prefix='SpecialPool') @@ -437,7 +448,7 @@ def test_context_manager_shutdown(self): def test_del_shutdown(self): executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) + res = executor.map(abs, range(-5, 5)) queue_management_thread = executor._queue_management_thread processes = executor._processes call_queue = executor._call_queue @@ -451,9 +462,15 @@ def test_del_shutdown(self): p.join() call_queue.join_thread() + # Make sure the results where all computed before the + # executor got shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + def test_shutdown_no_wait(self): + # Ensure that the executor cleans up the processes when calling + # shutdown with wait=False executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) + res = executor.map(abs, range(-5, 5)) processes = executor._processes call_queue = executor._call_queue queue_management_thread = executor._queue_management_thread @@ -466,6 +483,10 @@ def test_shutdown_no_wait(self): p.join() call_queue.join_thread() + # Make sure the results where all computed before the executor got + # shutdown. + assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + create_executor_tests(ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, @@ -1053,7 +1074,7 @@ def test_shutdown_deadlock(self): def test_shutdown_deadlock_pickle(self): # Test that the pool calling shutdown with wait=False does not cause # a deadlock if a task fails at pickle after the shutdown call. - # Reported in GH#6034. + # Reported in bpo-39104. self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=get_context(self.ctx)) as executor: From 8d6b285596477b389e52b645c149d94ab2edde4e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sun, 16 Feb 2020 18:45:19 +0100 Subject: [PATCH 5/6] Apply suggestions from code review Fix typos --- Lib/test/test_concurrent_futures.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 736f77171365ec..743d57172e8682 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -379,7 +379,7 @@ def test_del_shutdown(self): for t in threads: t.join() - # Make sure the results where all computed before the + # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) @@ -393,7 +393,7 @@ def test_shutdown_no_wait(self): for t in threads: t.join() - # Make sure the results where all computed before the + # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) @@ -462,7 +462,7 @@ def test_del_shutdown(self): p.join() call_queue.join_thread() - # Make sure the results where all computed before the + # Make sure the results were all computed before the # executor got shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) @@ -483,7 +483,7 @@ def test_shutdown_no_wait(self): p.join() call_queue.join_thread() - # Make sure the results where all computed before the executor got + # Make sure the results were all computed before the executor got # shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) From 9d47cf9daf7c5cf11fb87ae13ecc5ba2d3f1e811 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sun, 16 Feb 2020 18:49:28 +0100 Subject: [PATCH 6/6] Add NEWS --- .../next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst diff --git a/Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst b/Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst new file mode 100644 index 00000000000000..52779bf098232c --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst @@ -0,0 +1,2 @@ +Fix hanging ProcessPoolExcutor on ``shutdown(wait=False)`` when a task has +failed pickling.