Skip to content

PYTHON-2334: Fix gevent race condition #472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 29, 2020

Conversation

TylerWilley
Copy link
Contributor

Gevent patches the Lock class such that it can raise Timeout exceptions while waiting in .acquire()

When these raise, it can cause the pymongo thread_util.py:Semaphore to exit the acquire() function without releasing self._cond's lock. Changing to with self._cond guarantees release even if an exception is raised by gevent.

Also, in pool.py, when waiting for the lock for active sockets, this same condition could occur and cause self._socket_semaphore's lock to be lost.

timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
Copy link
Contributor Author

@TylerWilley TylerWilley Jul 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will release, and re-lock self._cond

Condition has 2 acquires in it, If _waiter.acquire() raises, Condition will re-acquire in the finally clause.

If _acquire_restore raises, then Condition won't re-acquire and a double-release is possible. I can't think of a good way to solve that. One possible solution would be to update gevent to monkey patch Condition with a version that guarantees self._lock re-acquisition prior to raising. A double release here is somewhat less concerning though, as it would only cause one waiter to hang until the next notify, instead of leaking a lock.

Reference:
https://github.com/python/cpython/blob/v3.6.9/Lib/threading.py#L295-L304

self.active_sockets += 1

try:
with self.lock:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If gevent raises on the acquire here, self._socket_semaphore's lock will be permanently decremented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be simpler to move the active_sockets modification down below inside the existing try/except:

# We've now acquired the semaphore and must release it on error.
sock_info = None
try:
    with self.lock:
        self.active_sockets += 1
    while sock_info is None:...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that, however the second try/except block will decrement the active sockets if an exception is raised.

If you raise within the first with self.lock, you will not have first incremented active_sockets, so you will end up permanently decreasing active_sockets.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. What about adding a flag:

# We've now acquired the semaphore and must release it on error.
sock_info = None
incremented = False
try:
    with self.lock:
        self.active_sockets += 1
        incremented = True
    while sock_info is None:...
except Exception:
    if sock_info:
        # We checked out a socket but authentication failed.	                
        sock_info.close_socket(ConnectionClosedReason.ERROR)
    self._socket_semaphore.release()
    if incremented:
        with self.lock:
            self.active_sockets -= 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work... Should I add that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, seems to still work

@ShaneHarvey
Copy link
Member

Hi @TylerWilley, can you provide some sample code that reproduces this issue?

@TylerWilley
Copy link
Contributor Author

TylerWilley commented Jul 27, 2020

import time
from gevent import monkey, Greenlet, Timeout
monkey.patch_all()

import threading
from pymongo.thread_util import Semaphore

x = Semaphore()

def BrokenThread():
    with Timeout(1):
        try:
            x.acquire()
            print("Greenlet acquired lock")
        except Timeout:
            print("Greenlet timedout")

def ContentiousThread():
    x.acquire()
    time.sleep(2)
    x.release()

ct = Greenlet.spawn(ContentiousThread)
bt = Greenlet.spawn(BrokenThread)

ct.join()
bt.join()

print("Condition Lock State:", x._cond._lock.locked())

Expected:

Greenlet timedout
Condition Lock State: False

Actual:

Greenlet timedout
Traceback (most recent call last):
  File "test_thread_utils.py", line 27, in <module>
    ct.join()
  File "/Users/twilley56/.pyenv/versions/3.6.9/lib/python3.6/site-packages/gevent/greenlet.py", line 812, in join
    result = get_my_hub(self).switch()
  File "/Users/twilley56/.pyenv/versions/3.6.9/lib/python3.6/site-packages/gevent/_greenlet_primitives.py", line 65, in switch
    return _greenlet_switch(self) # pylint:disable=undefined-variable
gevent.exceptions.LoopExit: This operation would block forever
	Hub: <Hub '' at 0x108a31d58 select default pending=0 ref=0 thread_ident=0x10a4555c0>
	Handles:
[]

(This is because gevent runs coroutines, and no coroutine is able to run, given that only 2 coroutines are running. One waiting for ct.join, and another waiting for x.release, since neither can run and no other coroutine is running to unlock either lock, gevent halts)

Copy link
Member

@ShaneHarvey ShaneHarvey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the repro code. It gives me an idea for how to write a high level test for this change using MongoClient.

Edit: You can see the test here: #475

self.active_sockets += 1

try:
with self.lock:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. What about adding a flag:

# We've now acquired the semaphore and must release it on error.
sock_info = None
incremented = False
try:
    with self.lock:
        self.active_sockets += 1
        incremented = True
    while sock_info is None:...
except Exception:
    if sock_info:
        # We checked out a socket but authentication failed.	                
        sock_info.close_socket(ConnectionClosedReason.ERROR)
    self._socket_semaphore.release()
    if incremented:
        with self.lock:
            self.active_sockets -= 1

@TylerWilley TylerWilley changed the title Fix gevent race condition PYTHON-2334: Fix gevent race condition Jul 28, 2020
@ShaneHarvey ShaneHarvey marked this pull request as ready for review July 29, 2020 23:02
@ShaneHarvey ShaneHarvey merged commit 83578dc into mongodb:master Jul 29, 2020
@ShaneHarvey
Copy link
Member

Thanks @TylerWilley! I've opened #475 to add a regression test for gevent.Timeout and fix Semaphore.release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants