Skip to content

Commit ced739b

Browse files
committed
make can_read() destructive for simplicity, and rename the method.
Remove timeout argument, always timeout immediately.
1 parent 9fe8366 commit ced739b

File tree

4 files changed

+22
-37
lines changed

4 files changed

+22
-37
lines changed

redis/asyncio/connection.py

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def on_disconnect(self):
208208
def on_connect(self, connection: "Connection"):
209209
raise NotImplementedError()
210210

211-
async def can_read(self, timeout: float) -> bool:
211+
async def can_read_destructive(self) -> bool:
212212
raise NotImplementedError()
213213

214214
async def read_response(
@@ -286,9 +286,9 @@ async def _read_from_socket(
286286
return False
287287
raise ConnectionError(f"Error while reading from socket: {ex.args}")
288288

289-
async def can_read(self, timeout: float) -> bool:
289+
async def can_read_destructive(self) -> bool:
290290
return bool(self.length) or await self._read_from_socket(
291-
timeout=timeout, raise_on_timeout=False
291+
timeout=0, raise_on_timeout=False
292292
)
293293

294294
async def read(self, length: int) -> bytes:
@@ -386,8 +386,8 @@ def on_disconnect(self):
386386
self._buffer = None
387387
self.encoder = None
388388

389-
async def can_read(self, timeout: float):
390-
return self._buffer and bool(await self._buffer.can_read(timeout))
389+
async def can_read_destructive(self):
390+
return self._buffer and bool(await self._buffer.can_read_destructive())
391391

392392
async def read_response(
393393
self, disable_decoding: bool = False
@@ -444,9 +444,7 @@ async def read_response(
444444
class HiredisParser(BaseParser):
445445
"""Parser class for connections using Hiredis"""
446446

447-
__slots__ = BaseParser.__slots__ + ("_next_response", "_reader", "_socket_timeout")
448-
449-
_next_response: bool
447+
__slots__ = BaseParser.__slots__ + ("_reader", "_socket_timeout")
450448

451449
def __init__(self, socket_read_size: int):
452450
if not HIREDIS_AVAILABLE:
@@ -466,23 +464,18 @@ def on_connect(self, connection: "Connection"):
466464
kwargs["errors"] = connection.encoder.encoding_errors
467465

468466
self._reader = hiredis.Reader(**kwargs)
469-
self._next_response = False
470467
self._socket_timeout = connection.socket_timeout
471468

472469
def on_disconnect(self):
473470
self._stream = None
474471
self._reader = None
475-
self._next_response = False
476472

477-
async def can_read(self, timeout: float):
473+
async def can_read_destructive(self):
478474
if not self._stream or not self._reader:
479475
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
480-
481-
if self._next_response is False:
482-
self._next_response = self._reader.gets()
483-
if self._next_response is False:
484-
return await self.read_from_socket(timeout=timeout, raise_on_timeout=False)
485-
return True
476+
if self._reader.gets():
477+
return True
478+
return await self.read_from_socket(timeout=0, raise_on_timeout=False)
486479

487480
async def read_from_socket(
488481
self,
@@ -523,12 +516,6 @@ async def read_response(
523516
self.on_disconnect()
524517
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
525518

526-
# _next_response might be cached from a can_read() call
527-
if self._next_response is not False:
528-
response = self._next_response
529-
self._next_response = False
530-
return response
531-
532519
response = self._reader.gets()
533520
while response is False:
534521
await self.read_from_socket()
@@ -924,12 +911,10 @@ async def send_command(self, *args: Any, **kwargs: Any) -> None:
924911
self.pack_command(*args), check_health=kwargs.get("check_health", True)
925912
)
926913

927-
async def can_read(self, timeout: float = 0):
914+
async def can_read_destructive(self):
928915
"""Poll the socket to see if there's data that can be read."""
929-
if not self.is_connected:
930-
await self.connect()
931916
try:
932-
return await self._parser.can_read(timeout)
917+
return await self._parser.can_read_destructive()
933918
except OSError as e:
934919
await self.disconnect()
935920
raise ConnectionError(
@@ -1497,12 +1482,12 @@ async def get_connection(self, command_name, *keys, **options):
14971482
# pool before all data has been read or the socket has been
14981483
# closed. either way, reconnect and verify everything is good.
14991484
try:
1500-
if await connection.can_read():
1485+
if await connection.can_read_destructive():
15011486
raise ConnectionError("Connection has data") from None
15021487
except ConnectionError:
15031488
await connection.disconnect()
15041489
await connection.connect()
1505-
if await connection.can_read():
1490+
if await connection.can_read_destructive():
15061491
raise ConnectionError("Connection not ready") from None
15071492
except BaseException:
15081493
# release the connection back to the pool so that we don't
@@ -1698,12 +1683,12 @@ async def get_connection(self, command_name, *keys, **options):
16981683
# pool before all data has been read or the socket has been
16991684
# closed. either way, reconnect and verify everything is good.
17001685
try:
1701-
if await connection.can_read():
1686+
if await connection.can_read_destructive():
17021687
raise ConnectionError("Connection has data") from None
17031688
except ConnectionError:
17041689
await connection.disconnect()
17051690
await connection.connect()
1706-
if await connection.can_read():
1691+
if await connection.can_read_destructive():
17071692
raise ConnectionError("Connection not ready") from None
17081693
except BaseException:
17091694
# release the connection back to the pool so that we don't leak it

tests/test_asyncio/test_cluster.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ async def test_refresh_using_specific_nodes(
433433
Connection,
434434
send_packed_command=mock.DEFAULT,
435435
connect=mock.DEFAULT,
436-
can_read=mock.DEFAULT,
436+
can_read_destructive=mock.DEFAULT,
437437
) as mocks:
438438
# simulate 7006 as a failed node
439439
def execute_command_mock(self, *args, **options):
@@ -473,7 +473,7 @@ def map_7007(self):
473473
execute_command.successful_calls = 0
474474
execute_command.failed_calls = 0
475475
initialize.side_effect = initialize_mock
476-
mocks["can_read"].return_value = False
476+
mocks["can_read_destructive"].return_value = False
477477
mocks["send_packed_command"].return_value = "MOCK_OK"
478478
mocks["connect"].return_value = None
479479
with mock.patch.object(
@@ -514,7 +514,7 @@ async def test_reading_from_replicas_in_round_robin(self) -> None:
514514
send_command=mock.DEFAULT,
515515
read_response=mock.DEFAULT,
516516
_connect=mock.DEFAULT,
517-
can_read=mock.DEFAULT,
517+
can_read_destructive=mock.DEFAULT,
518518
on_connect=mock.DEFAULT,
519519
) as mocks:
520520
with mock.patch.object(
@@ -546,7 +546,7 @@ def execute_command_mock_third(self, *args, **options):
546546
mocks["send_command"].return_value = True
547547
mocks["read_response"].return_value = "OK"
548548
mocks["_connect"].return_value = True
549-
mocks["can_read"].return_value = False
549+
mocks["can_read_destructive"].return_value = False
550550
mocks["on_connect"].return_value = True
551551

552552
# Create a cluster with reading from replications

tests/test_asyncio/test_connection_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async def connect(self):
103103
async def disconnect(self):
104104
pass
105105

106-
async def can_read(self, timeout: float = 0):
106+
async def can_read_destructive(self, timeout: float = 0):
107107
return False
108108

109109

tests/test_asyncio/test_pubsub.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,7 @@ async def test_reconnect_socket_error(self, r: redis.Redis, method):
844844
self.state = 1
845845
with mock.patch.object(self.pubsub.connection, "_parser") as mockobj:
846846
mockobj.read_response.side_effect = socket.error
847-
mockobj.can_read.side_effect = socket.error
847+
mockobj.can_read_destructive.side_effect = socket.error
848848
# wait until task noticies the disconnect until we undo the patch
849849
await self.cond.wait_for(lambda: self.state >= 2)
850850
assert not self.pubsub.connection.is_connected

0 commit comments

Comments
 (0)