diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 3fc7fad83e..5bccda24f4 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -517,6 +517,12 @@ async def execute_command(self, *args, **options): ), lambda error: self._disconnect_raise(conn, error), ) + except asyncio.CancelledError: + # If we're cancelled and cancel happened after we sent the command + # but before we read the response, we need to read the response + # to avoid reading responses out of order. + await conn.read_response(timeout=0.001) + raise finally: if self.single_connection_client: self._single_conn_lock.release() diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 1851ca9a76..4f1a694096 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -273,3 +273,48 @@ async def open_connection(*args, **kwargs): vals = await asyncio.gather(do_read(), do_close()) assert vals == [b"Hello, World!", None] + + +@pytest.mark.onlynoncluster +async def test_client_handle_canceled_error(create_redis): + """ + This test reproduces the case in issue #2539 + where asyncio.CancelledError is raised when the parser is reading to feed the + internal buffer. The stream `readline()` will be interrupted by the CancelledError, + which will result in not reading the response after executing the command. This will + cause responses to be mixed up between commands. In this test, we execute a command + after the CancelledError is raised, and verify that the response is correct. + """ + r = await create_redis(single_connection_client=True) + + async def do_pings(): + while True: + await r.ping() + + future = asyncio.ensure_future(do_pings()) + await asyncio.sleep(0.01) + future.cancel() + with pytest.raises(asyncio.CancelledError): + await future + # To reproduce the issue, we need to execute a command which returns a different + # response type than PING. In this case, we use EXISTS because it should return an + # integer. + assert await r.exists("foo") == 0 + + await r.sadd("set", "one") + await r.sadd("set", "two") + await r.sadd("set", "three") + + async def do_smembers(): + while True: + await r.smembers("set") + + future = asyncio.ensure_future(do_smembers()) + await asyncio.sleep(0.01) + future.cancel() + with pytest.raises(asyncio.CancelledError): + await future + + assert await r.exists("foo") == 0 + + await r.connection.disconnect()