Skip to content

bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure #17670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,23 @@

class _ThreadWakeup:
def __init__(self):
self._closed = False
self._reader, self._writer = mp.Pipe(duplex=False)

def close(self):
self._writer.close()
self._reader.close()
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()

def wakeup(self):
self._writer.send_bytes(b"")
if not self._closed:
self._writer.send_bytes(b"")

def clear(self):
while self._reader.poll():
self._reader.recv_bytes()
if not self._closed:
while self._reader.poll():
self._reader.recv_bytes()


def _python_exit():
Expand Down Expand Up @@ -160,15 +165,17 @@ def __init__(self, work_id, fn, args, kwargs):

class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)

def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this case,
# the queue_manager_thread fails all work_items with BrokenProcessPool
if work_item is not None:
Expand Down Expand Up @@ -339,6 +346,8 @@ def shutdown_worker():

# Release the queue's resources as soon as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a requirement for this PR, but it seems to me that the _queue_management_worker function has grown quite long and complicated. Do you think it could be turned into an object with a bunch of short and readable helper methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this would be a very much easier to read indeed. I will try to make a new PR with this change when I get a chance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to your PR then @tomMoral :-)

call_queue.close()
call_queue.join_thread()
thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
Expand Down Expand Up @@ -547,29 +556,30 @@ def __init__(self, max_workers=None, mp_context=None,
self._queue_count = 0
self._pending_work_items = {}

# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()

# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items)
pending_work_items=self._pending_work_items,
thread_wakeup=self._queue_management_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()

# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()

def _start_queue_management_thread(self):
if self._queue_management_thread is None:
# When the executor gets garbarge collected, the weakref callback
Expand Down Expand Up @@ -671,16 +681,11 @@ def shutdown(self, wait=True):
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
if self._call_queue is not None:
self._call_queue.close()
if wait:
self._call_queue.join_thread()
self._call_queue = None
self._call_queue = None
self._result_queue = None
self._processes = None

if self._queue_management_thread_wakeup:
self._queue_management_thread_wakeup.close()
self._queue_management_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__
Expand Down
74 changes: 72 additions & 2 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,32 @@ def test_context_manager_shutdown(self):

def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
res = executor.map(abs, range(-5, 5))
threads = executor._threads
del executor

for t in threads:
t.join()

# Make sure the results were all computed before the
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])

def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the threads when calling
# shutdown with wait=False
executor = futures.ThreadPoolExecutor(max_workers=5)
res = executor.map(abs, range(-5, 5))
threads = executor._threads
executor.shutdown(wait=False)
for t in threads:
t.join()

# Make sure the results were all computed before the
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])


def test_thread_names_assigned(self):
executor = futures.ThreadPoolExecutor(
max_workers=5, thread_name_prefix='SpecialPool')
Expand Down Expand Up @@ -429,7 +448,7 @@ def test_context_manager_shutdown(self):

def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
list(executor.map(abs, range(-5, 5)))
res = executor.map(abs, range(-5, 5))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
call_queue = executor._call_queue
Expand All @@ -443,6 +462,31 @@ def test_del_shutdown(self):
p.join()
call_queue.join_thread()

# Make sure the results were all computed before the
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])

def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the processes when calling
# shutdown with wait=False
executor = futures.ProcessPoolExecutor(max_workers=5)
res = executor.map(abs, range(-5, 5))
processes = executor._processes
call_queue = executor._call_queue
queue_management_thread = executor._queue_management_thread
executor.shutdown(wait=False)

# Make sure that all the executor resources were properly cleaned by
# the shutdown process
queue_management_thread.join()
for p in processes.values():
p.join()
call_queue.join_thread()

# Make sure the results were all computed before the executor got
# shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])


create_executor_tests(ProcessPoolShutdownTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down Expand Up @@ -1027,6 +1071,32 @@ def test_shutdown_deadlock(self):
with self.assertRaises(BrokenProcessPool):
f.result()

def test_shutdown_deadlock_pickle(self):
# Test that the pool calling shutdown with wait=False does not cause
# a deadlock if a task fails at pickle after the shutdown call.
# Reported in bpo-39104.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=get_context(self.ctx)) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

# Start the executor and get the queue_management_thread to collect
# the threads and avoid dangling thread that should be cleaned up
# asynchronously.
executor.submit(id, 42).result()
queue_manager = executor._queue_management_thread

# Submit a task that fails at pickle and shutdown the executor
# without waiting
f = executor.submit(id, ErrorAtPickle())
executor.shutdown(wait=False)
with self.assertRaises(PicklingError):
f.result()

# Make sure the executor is eventually shutdown and do not leave
# dangling threads
queue_manager.join()


create_executor_tests(ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix hanging ProcessPoolExcutor on ``shutdown(wait=False)`` when a task has
failed pickling.