From 0ef3446a8c1df6a4d10397260cfe2e7e32496b2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 20 Jan 2023 11:50:37 +0000 Subject: [PATCH 1/5] A failing unittest --- tests/test_asyncio/test_connection.py | 66 ++++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index bf59dbe6b0..712d632aeb 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -1,7 +1,7 @@ import asyncio import socket import types -from unittest.mock import patch +from unittest.mock import AsyncMock, Mock, patch import pytest @@ -9,12 +9,14 @@ from redis.asyncio.connection import ( BaseParser, Connection, + HiredisParser, PythonParser, UnixDomainSocketConnection, ) from redis.asyncio.retry import Retry from redis.backoff import NoBackoff from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError +from redis.utils import HIREDIS_AVAILABLE from tests.conftest import skip_if_server_version_lt from .compat import mock @@ -146,3 +148,65 @@ async def test_connection_parse_response_resume(r: redis.Redis): pytest.fail("didn't receive a response") assert response assert i > 0 + + +@pytest.mark.xfail +@pytest.mark.onlynoncluster +async def test_connection_hiredis_disconect_race(): + """ + This test reproduces the case in issue #2349 + where a connection is closed while the parser is reading to feed the internal buffer. + The stremam read() will succeed, but when it returns, another task has already called + `disconnect()` and is waiting for close to finish. When it attempts to feed the + buffer, it will fail, since the buffer is no longer there. + """ + if not HIREDIS_AVAILABLE: + pytest.skip("Hiredis not available)") + parser_class = HiredisParser + + args = {} + args["parser_class"] = parser_class + conn = Connection(**args) + + cond = asyncio.Condition() + # 0 == initial + # 1 == reader is reading + # 2 == closer has closed and is waiting for close to finish + state = 0 + + # mock read function, which wait for a close to happen before returning + async def read(_): + nonlocal state + async with cond: + state = 1 # we are reading + cond.notify() + # wait until the closing task has done + await cond.wait_for(lambda: state == 2) + return b" " + + # function closes the connection while reader is still blocked reading + async def do_close(): + nonlocal state + async with cond: + await cond.wait_for(lambda: state == 1) + state = 2 + cond.notify() + await conn.disconnect() + + async def do_read(): + await conn.read_response() + + reader = AsyncMock() + writer = AsyncMock() + writer.transport = Mock() + writer.transport.get_extra_info.side_effect = None + + reader.read.side_effect = read + + async def open_connection(*args, **kwargs): + return reader, writer + + with patch.object(asyncio, "open_connection", open_connection): + await conn.connect() + + await asyncio.gather(do_read(), do_close()) From a9661a4d658253c0d1a2740fc2954b410688904b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 20 Jan 2023 12:36:33 +0000 Subject: [PATCH 2/5] Do not clear the redis-reader's state when we disconnect so that it can finish reading the final message --- redis/asyncio/connection.py | 15 +++++++++------ tests/test_asyncio/test_connection.py | 16 ++++++++++------ 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 2c75d4fcf1..862f6f096b 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -350,13 +350,14 @@ async def _readline(self) -> bytes: class HiredisParser(BaseParser): """Parser class for connections using Hiredis""" - __slots__ = BaseParser.__slots__ + ("_reader",) + __slots__ = BaseParser.__slots__ + ("_reader", "_connected") def __init__(self, socket_read_size: int): if not HIREDIS_AVAILABLE: raise RedisError("Hiredis is not available.") super().__init__(socket_read_size=socket_read_size) self._reader: Optional[hiredis.Reader] = None + self._connected: bool = False def on_connect(self, connection: "Connection"): self._stream = connection._reader @@ -369,13 +370,13 @@ def on_connect(self, connection: "Connection"): kwargs["errors"] = connection.encoder.encoding_errors self._reader = hiredis.Reader(**kwargs) + self._connected = True def on_disconnect(self): - self._stream = None - self._reader = None + self._connected = False async def can_read_destructive(self): - if not self._stream or not self._reader: + if not self._connected: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) if self._reader.gets(): return True @@ -397,8 +398,10 @@ async def read_from_socket(self): async def read_response( self, disable_decoding: bool = False ) -> Union[EncodableT, List[EncodableT]]: - if not self._stream or not self._reader: - self.on_disconnect() + # If `on_disconnect()` has been called, prohibit any more reads + # even if they could happen because data might be present. + # We still allow reads in progress to finish + if not self._connected: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None response = self._reader.gets() diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 712d632aeb..3a611206b7 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -150,15 +150,18 @@ async def test_connection_parse_response_resume(r: redis.Redis): assert i > 0 -@pytest.mark.xfail @pytest.mark.onlynoncluster async def test_connection_hiredis_disconect_race(): """ This test reproduces the case in issue #2349 - where a connection is closed while the parser is reading to feed the internal buffer. - The stremam read() will succeed, but when it returns, another task has already called - `disconnect()` and is waiting for close to finish. When it attempts to feed the - buffer, it will fail, since the buffer is no longer there. + where a connection is closed while the parser is reading to feed the + internal buffer.The stremam read() will succeed, but when it returns, + another task hasalready called `disconnect()` and is waiting for + close to finish. When it attempts to feed the buffer, it will fail + since the buffer is no longer there. + + This test verifies that a read in progress can finish even + if the `disconnect()` method is called. """ if not HIREDIS_AVAILABLE: pytest.skip("Hiredis not available)") @@ -194,7 +197,8 @@ async def do_close(): await conn.disconnect() async def do_read(): - await conn.read_response() + with pytest.raises(InvalidResponse): + await conn.read_response() reader = AsyncMock() writer = AsyncMock() From fd61dc337de41c9abf90ee8b82ab73803526f5fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 20 Jan 2023 13:26:32 +0000 Subject: [PATCH 3/5] Test that reading a message of two chunks after a disconnect() works. --- tests/test_asyncio/test_connection.py | 50 +++++++++++++++++---------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 3a611206b7..6d6b3b9d2c 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -1,7 +1,7 @@ import asyncio import socket import types -from unittest.mock import AsyncMock, Mock, patch +from unittest.mock import patch import pytest @@ -151,7 +151,10 @@ async def test_connection_parse_response_resume(r: redis.Redis): @pytest.mark.onlynoncluster -async def test_connection_hiredis_disconect_race(): +@pytest.mark.parametrize( + "parser_class", [PythonParser, HiredisParser], ids=["PythonParser", "HiredisParser"] +) +async def test_connection_disconect_race(parser_class): """ This test reproduces the case in issue #2349 where a connection is closed while the parser is reading to feed the @@ -163,12 +166,14 @@ async def test_connection_hiredis_disconect_race(): This test verifies that a read in progress can finish even if the `disconnect()` method is called. """ - if not HIREDIS_AVAILABLE: - pytest.skip("Hiredis not available)") - parser_class = HiredisParser + if parser_class == PythonParser: + pytest.xfail("doesn't work yet with PythonParser") + if parser_class == HiredisParser and not HIREDIS_AVAILABLE: + pytest.skip("Hiredis not available") args = {} args["parser_class"] = parser_class + conn = Connection(**args) cond = asyncio.Condition() @@ -177,15 +182,20 @@ async def test_connection_hiredis_disconect_race(): # 2 == closer has closed and is waiting for close to finish state = 0 - # mock read function, which wait for a close to happen before returning - async def read(_): + # Mock read function, which wait for a close to happen before returning + # Can either be invoked as two `read()` calls (HiredisParser) + # or as a `readline()` followed by `readexact()` (PythonParser) + chunks = [b"$13\r\n", b"Hello, World!\r\n"] + + async def read(_=None): nonlocal state async with cond: - state = 1 # we are reading - cond.notify() - # wait until the closing task has done - await cond.wait_for(lambda: state == 2) - return b" " + if state == 0: + state = 1 # we are reading + cond.notify() + # wait until the closing task has done + await cond.wait_for(lambda: state == 2) + return chunks.pop(0) # function closes the connection while reader is still blocked reading async def do_close(): @@ -197,15 +207,18 @@ async def do_close(): await conn.disconnect() async def do_read(): - with pytest.raises(InvalidResponse): - await conn.read_response() + return await conn.read_response() - reader = AsyncMock() - writer = AsyncMock() - writer.transport = Mock() + reader = mock.AsyncMock() + writer = mock.AsyncMock() + writer.transport = mock.Mock() writer.transport.get_extra_info.side_effect = None + # for HiredisParser reader.read.side_effect = read + # for PythonParser + reader.readline.side_effect = read + reader.readexactly.side_effect = read async def open_connection(*args, **kwargs): return reader, writer @@ -213,4 +226,5 @@ async def open_connection(*args, **kwargs): with patch.object(asyncio, "open_connection", open_connection): await conn.connect() - await asyncio.gather(do_read(), do_close()) + vals = await asyncio.gather(do_read(), do_close()) + assert vals == [b"Hello, World!", None] From efc74e8bb37429bde39668e3274994627e4e642c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 20 Jan 2023 15:12:54 +0000 Subject: [PATCH 4/5] Add Changes --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index d89079ba6f..e1ce947394 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Add test and fix async HiredisParser when reading during a disconnect() (#2349) * Simplify synchronous SocketBuffer state management * Fix string cleanse in Redis Graph * Make PythonParser resumable in case of error (#2510) From ed572089c0b4e5523037c60656edbfaa42de4b04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 20 Jan 2023 15:18:18 +0000 Subject: [PATCH 5/5] fix typos --- tests/test_asyncio/test_connection.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 6d6b3b9d2c..e32d34414d 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -158,9 +158,9 @@ async def test_connection_disconect_race(parser_class): """ This test reproduces the case in issue #2349 where a connection is closed while the parser is reading to feed the - internal buffer.The stremam read() will succeed, but when it returns, - another task hasalready called `disconnect()` and is waiting for - close to finish. When it attempts to feed the buffer, it will fail + internal buffer.The stream `read()` will succeed, but when it returns, + another task has already called `disconnect()` and is waiting for + close to finish. When we attempts to feed the buffer, we will fail since the buffer is no longer there. This test verifies that a read in progress can finish even