|
1 | 1 | import asyncio
|
2 | 2 | import socket
|
3 | 3 | import types
|
4 |
| -from unittest.mock import patch |
| 4 | +from unittest.mock import AsyncMock, Mock, patch |
5 | 5 |
|
6 | 6 | import pytest
|
7 | 7 |
|
8 | 8 | import redis
|
9 | 9 | from redis.asyncio.connection import (
|
10 | 10 | BaseParser,
|
11 | 11 | Connection,
|
| 12 | + HiredisParser, |
12 | 13 | PythonParser,
|
13 | 14 | UnixDomainSocketConnection,
|
14 | 15 | )
|
15 | 16 | from redis.asyncio.retry import Retry
|
16 | 17 | from redis.backoff import NoBackoff
|
17 | 18 | from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError
|
| 19 | +from redis.utils import HIREDIS_AVAILABLE |
18 | 20 | from tests.conftest import skip_if_server_version_lt
|
19 | 21 |
|
20 | 22 | from .compat import mock
|
@@ -146,3 +148,65 @@ async def test_connection_parse_response_resume(r: redis.Redis):
|
146 | 148 | pytest.fail("didn't receive a response")
|
147 | 149 | assert response
|
148 | 150 | assert i > 0
|
| 151 | + |
| 152 | + |
| 153 | +@pytest.mark.xfail |
| 154 | +@pytest.mark.onlynoncluster |
| 155 | +async def test_connection_hiredis_disconect_race(): |
| 156 | + """ |
| 157 | + This test reproduces the case in issue #2349 |
| 158 | + where a connection is closed while the parser is reading to feed the internal buffer. |
| 159 | + The stremam read() will succeed, but when it returns, another task has already called |
| 160 | + `disconnect()` and is waiting for close to finish. When it attempts to feed the |
| 161 | + buffer, it will fail, since the buffer is no longer there. |
| 162 | + """ |
| 163 | + if not HIREDIS_AVAILABLE: |
| 164 | + pytest.skip("Hiredis not available)") |
| 165 | + parser_class = HiredisParser |
| 166 | + |
| 167 | + args = {} |
| 168 | + args["parser_class"] = parser_class |
| 169 | + conn = Connection(**args) |
| 170 | + |
| 171 | + cond = asyncio.Condition() |
| 172 | + # 0 == initial |
| 173 | + # 1 == reader is reading |
| 174 | + # 2 == closer has closed and is waiting for close to finish |
| 175 | + state = 0 |
| 176 | + |
| 177 | + # mock read function, which wait for a close to happen before returning |
| 178 | + async def read(_): |
| 179 | + nonlocal state |
| 180 | + async with cond: |
| 181 | + state = 1 # we are reading |
| 182 | + cond.notify() |
| 183 | + # wait until the closing task has done |
| 184 | + await cond.wait_for(lambda: state == 2) |
| 185 | + return b" " |
| 186 | + |
| 187 | + # function closes the connection while reader is still blocked reading |
| 188 | + async def do_close(): |
| 189 | + nonlocal state |
| 190 | + async with cond: |
| 191 | + await cond.wait_for(lambda: state == 1) |
| 192 | + state = 2 |
| 193 | + cond.notify() |
| 194 | + await conn.disconnect() |
| 195 | + |
| 196 | + async def do_read(): |
| 197 | + await conn.read_response() |
| 198 | + |
| 199 | + reader = AsyncMock() |
| 200 | + writer = AsyncMock() |
| 201 | + writer.transport = Mock() |
| 202 | + writer.transport.get_extra_info.side_effect = None |
| 203 | + |
| 204 | + reader.read.side_effect = read |
| 205 | + |
| 206 | + async def open_connection(*args, **kwargs): |
| 207 | + return reader, writer |
| 208 | + |
| 209 | + with patch.object(asyncio, "open_connection", open_connection): |
| 210 | + await conn.connect() |
| 211 | + |
| 212 | + await asyncio.gather(do_read(), do_close()) |
0 commit comments