Skip to content

Commit 087d994

Browse files
committed
undo prevision fix attempts in async client and cluster
1 parent 7c2e955 commit 087d994

File tree

2 files changed

+33
-89
lines changed

2 files changed

+33
-89
lines changed

redis/asyncio/client.py

Lines changed: 24 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -500,23 +500,6 @@ async def _disconnect_raise(self, conn: Connection, error: Exception):
500500
):
501501
raise error
502502

503-
async def _try_send_command_parse_response(self, conn, *args, **options):
504-
try:
505-
return await conn.retry.call_with_retry(
506-
lambda: self._send_command_parse_response(
507-
conn, args[0], *args, **options
508-
),
509-
lambda error: self._disconnect_raise(conn, error),
510-
)
511-
except asyncio.CancelledError:
512-
await conn.disconnect(nowait=True)
513-
raise
514-
finally:
515-
if self.single_connection_client:
516-
self._single_conn_lock.release()
517-
if not self.connection:
518-
await self.connection_pool.release(conn)
519-
520503
# COMMAND EXECUTION AND PROTOCOL PARSING
521504
async def execute_command(self, *args, **options):
522505
"""Execute a command and return a parsed response"""
@@ -527,10 +510,18 @@ async def execute_command(self, *args, **options):
527510

528511
if self.single_connection_client:
529512
await self._single_conn_lock.acquire()
530-
531-
return await asyncio.shield(
532-
self._try_send_command_parse_response(conn, *args, **options)
533-
)
513+
try:
514+
return await conn.retry.call_with_retry(
515+
lambda: self._send_command_parse_response(
516+
conn, command_name, *args, **options
517+
),
518+
lambda error: self._disconnect_raise(conn, error),
519+
)
520+
finally:
521+
if self.single_connection_client:
522+
self._single_conn_lock.release()
523+
if not self.connection:
524+
await pool.release(conn)
534525

535526
async def parse_response(
536527
self, connection: Connection, command_name: Union[str, bytes], **options
@@ -774,18 +765,10 @@ async def _disconnect_raise_connect(self, conn, error):
774765
is not a TimeoutError. Otherwise, try to reconnect
775766
"""
776767
await conn.disconnect()
777-
778768
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
779769
raise error
780770
await conn.connect()
781771

782-
async def _try_execute(self, conn, command, *arg, **kwargs):
783-
try:
784-
return await command(*arg, **kwargs)
785-
except asyncio.CancelledError:
786-
await conn.disconnect()
787-
raise
788-
789772
async def _execute(self, conn, command, *args, **kwargs):
790773
"""
791774
Connect manually upon disconnection. If the Redis server is down,
@@ -794,11 +777,9 @@ async def _execute(self, conn, command, *args, **kwargs):
794777
called by the # connection to resubscribe us to any channels and
795778
patterns we were previously listening to
796779
"""
797-
return await asyncio.shield(
798-
conn.retry.call_with_retry(
799-
lambda: self._try_execute(conn, command, *args, **kwargs),
800-
lambda error: self._disconnect_raise_connect(conn, error),
801-
)
780+
return await conn.retry.call_with_retry(
781+
lambda: command(*args, **kwargs),
782+
lambda error: self._disconnect_raise_connect(conn, error),
802783
)
803784

804785
async def parse_response(self, block: bool = True, timeout: float = 0):
@@ -1202,18 +1183,6 @@ async def _disconnect_reset_raise(self, conn, error):
12021183
await self.reset()
12031184
raise
12041185

1205-
async def _try_send_command_parse_response(self, conn, *args, **options):
1206-
try:
1207-
return await conn.retry.call_with_retry(
1208-
lambda: self._send_command_parse_response(
1209-
conn, args[0], *args, **options
1210-
),
1211-
lambda error: self._disconnect_reset_raise(conn, error),
1212-
)
1213-
except asyncio.CancelledError:
1214-
await conn.disconnect()
1215-
raise
1216-
12171186
async def immediate_execute_command(self, *args, **options):
12181187
"""
12191188
Execute a command immediately, but don't auto-retry on a
@@ -1229,8 +1198,12 @@ async def immediate_execute_command(self, *args, **options):
12291198
command_name, self.shard_hint
12301199
)
12311200
self.connection = conn
1232-
return await asyncio.shield(
1233-
self._try_send_command_parse_response(conn, *args, **options)
1201+
1202+
return await conn.retry.call_with_retry(
1203+
lambda: self._send_command_parse_response(
1204+
conn, command_name, *args, **options
1205+
),
1206+
lambda error: self._disconnect_reset_raise(conn, error),
12341207
)
12351208

12361209
def pipeline_execute_command(self, *args, **options):
@@ -1398,19 +1371,6 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
13981371
await self.reset()
13991372
raise
14001373

1401-
async def _try_execute(self, conn, execute, stack, raise_on_error):
1402-
try:
1403-
return await conn.retry.call_with_retry(
1404-
lambda: execute(conn, stack, raise_on_error),
1405-
lambda error: self._disconnect_raise_reset(conn, error),
1406-
)
1407-
except asyncio.CancelledError:
1408-
# not supposed to be possible, yet here we are
1409-
await conn.disconnect(nowait=True)
1410-
raise
1411-
finally:
1412-
await self.reset()
1413-
14141374
async def execute(self, raise_on_error: bool = True):
14151375
"""Execute all the commands in the current pipeline"""
14161376
stack = self.command_stack
@@ -1432,11 +1392,10 @@ async def execute(self, raise_on_error: bool = True):
14321392
conn = cast(Connection, conn)
14331393

14341394
try:
1435-
return await asyncio.shield(
1436-
self._try_execute(conn, execute, stack, raise_on_error)
1395+
return await conn.retry.call_with_retry(
1396+
lambda: execute(conn, stack, raise_on_error),
1397+
lambda error: self._disconnect_raise_reset(conn, error),
14371398
)
1438-
except RuntimeError:
1439-
await self.reset()
14401399
finally:
14411400
await self.reset()
14421401

redis/asyncio/cluster.py

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,33 +1016,12 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
10161016
await connection.send_packed_command(connection.pack_command(*args), False)
10171017

10181018
# Read response
1019-
return await asyncio.shield(
1020-
self._parse_and_release(connection, args[0], **kwargs)
1021-
)
1022-
1023-
async def _parse_and_release(self, connection, *args, **kwargs):
10241019
try:
1025-
return await self.parse_response(connection, *args, **kwargs)
1026-
except asyncio.CancelledError:
1027-
# should not be possible
1028-
await connection.disconnect(nowait=True)
1029-
raise
1020+
return await self.parse_response(connection, args[0], **kwargs)
10301021
finally:
1022+
# Release connection
10311023
self._free.append(connection)
10321024

1033-
async def _try_parse_response(self, cmd, connection, ret):
1034-
try:
1035-
cmd.result = await asyncio.shield(
1036-
self.parse_response(connection, cmd.args[0], **cmd.kwargs)
1037-
)
1038-
except asyncio.CancelledError:
1039-
await connection.disconnect(nowait=True)
1040-
raise
1041-
except Exception as e:
1042-
cmd.result = e
1043-
ret = True
1044-
return ret
1045-
10461025
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10471026
# Acquire connection
10481027
connection = self.acquire_connection()
@@ -1055,7 +1034,13 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10551034
# Read responses
10561035
ret = False
10571036
for cmd in commands:
1058-
ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret))
1037+
try:
1038+
cmd.result = await self.parse_response(
1039+
connection, cmd.args[0], **cmd.kwargs
1040+
)
1041+
except Exception as e:
1042+
cmd.result = e
1043+
ret = True
10591044

10601045
# Release connection
10611046
self._free.append(connection)

0 commit comments

Comments
 (0)