Skip to content

Dev/no lock #2308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ async def on_connect(self, connection: Connection) -> None:
# regardless of the server type. If this is a primary connection,
# READONLY would not affect executing write commands.
await connection.send_command("READONLY")
if str_if_bytes(await connection.read_response_without_lock()) != "OK":
if str_if_bytes(await connection.read_response()) != "OK":
raise ConnectionError("READONLY command failed")

def get_nodes(self) -> List["ClusterNode"]:
Expand Down Expand Up @@ -866,11 +866,9 @@ async def parse_response(
) -> Any:
try:
if NEVER_DECODE in kwargs:
response = await connection.read_response_without_lock(
disable_decoding=True
)
response = await connection.read_response(disable_decoding=True)
else:
response = await connection.read_response_without_lock()
response = await connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in kwargs:
return kwargs[EMPTY_RESPONSE]
Expand Down
35 changes: 0 additions & 35 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,6 @@ def __init__(
self.set_parser(parser_class)
self._connect_callbacks: List[weakref.WeakMethod[ConnectCallbackT]] = []
self._buffer_cutoff = 6000
self._lock = asyncio.Lock()

def __repr__(self):
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
Expand Down Expand Up @@ -940,39 +939,6 @@ async def can_read(self, timeout: float = 0):
)

async def read_response(self, disable_decoding: bool = False):
"""Read the response from a previously sent command"""
try:
async with self._lock:
if self.socket_timeout:
async with async_timeout.timeout(self.socket_timeout):
response = await self._parser.read_response(
disable_decoding=disable_decoding
)
else:
response = await self._parser.read_response(
disable_decoding=disable_decoding
)
except asyncio.TimeoutError:
await self.disconnect()
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
except OSError as e:
await self.disconnect()
raise ConnectionError(
f"Error while reading from {self.host}:{self.port} : {e.args}"
)
except BaseException:
await self.disconnect()
raise

if self.health_check_interval:
next_time = asyncio.get_running_loop().time() + self.health_check_interval
self.next_health_check = next_time

if isinstance(response, ResponseError):
raise response from None
return response

async def read_response_without_lock(self, disable_decoding: bool = False):
"""Read the response from a previously sent command"""
try:
if self.socket_timeout:
Expand Down Expand Up @@ -1241,7 +1207,6 @@ def __init__(
self.set_parser(parser_class)
self._connect_callbacks = []
self._buffer_cutoff = 6000
self._lock = asyncio.Lock()

def repr_pieces(self) -> Iterable[Tuple[str, Union[str, int]]]:
pieces = [("path", self.path), ("db", self.db)]
Expand Down
48 changes: 22 additions & 26 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
connection = mock.AsyncMock()
connection.is_connected = True
connection.read_response_without_lock.return_value = response
connection.read_response.return_value = response
while node._free:
node._free.pop()
node._free.append(connection)
Expand All @@ -130,7 +130,7 @@ def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode:
connection = mock.AsyncMock()
connection.is_connected = True
connection.read_response_without_lock.side_effect = exc
connection.read_response.side_effect = exc
while node._free:
node._free.pop()
node._free.append(connection)
Expand Down Expand Up @@ -275,16 +275,12 @@ async def test_max_connections(
for node in rc.get_nodes():
assert node.max_connections == 10

with mock.patch.object(
Connection, "read_response_without_lock"
) as read_response_without_lock:
with mock.patch.object(Connection, "read_response") as read_response:

async def read_response_without_lock_mocked(
*args: Any, **kwargs: Any
) -> None:
async def read_response_mocked(*args: Any, **kwargs: Any) -> None:
await asyncio.sleep(10)

read_response_without_lock.side_effect = read_response_without_lock_mocked
read_response.side_effect = read_response_mocked

with pytest.raises(MaxConnectionsError):
await asyncio.gather(
Expand Down Expand Up @@ -316,10 +312,10 @@ async def test_execute_command_node_flag_primaries(self, r: RedisCluster) -> Non
assert await r.ping(target_nodes=RedisCluster.PRIMARIES) is True
for primary in primaries:
conn = primary._free.pop()
assert conn.read_response_without_lock.called is True
assert conn.read_response.called is True
for replica in replicas:
conn = replica._free.pop()
assert conn.read_response_without_lock.called is not True
assert conn.read_response.called is not True

async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None:
"""
Expand All @@ -333,10 +329,10 @@ async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None
assert await r.ping(target_nodes=RedisCluster.REPLICAS) is True
for replica in replicas:
conn = replica._free.pop()
assert conn.read_response_without_lock.called is True
assert conn.read_response.called is True
for primary in primaries:
conn = primary._free.pop()
assert conn.read_response_without_lock.called is not True
assert conn.read_response.called is not True

await r.close()

Expand All @@ -348,7 +344,7 @@ async def test_execute_command_node_flag_all_nodes(self, r: RedisCluster) -> Non
assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True
for node in r.get_nodes():
conn = node._free.pop()
assert conn.read_response_without_lock.called is True
assert conn.read_response.called is True

async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
"""
Expand All @@ -359,7 +355,7 @@ async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
called_count = 0
for node in r.get_nodes():
conn = node._free.pop()
if conn.read_response_without_lock.called is True:
if conn.read_response.called is True:
called_count += 1
assert called_count == 1

Expand All @@ -372,7 +368,7 @@ async def test_execute_command_default_node(self, r: RedisCluster) -> None:
mock_node_resp(def_node, "PONG")
assert await r.ping() is True
conn = def_node._free.pop()
assert conn.read_response_without_lock.called
assert conn.read_response.called

async def test_ask_redirection(self, r: RedisCluster) -> None:
"""
Expand Down Expand Up @@ -516,7 +512,7 @@ async def test_reading_from_replicas_in_round_robin(self) -> None:
with mock.patch.multiple(
Connection,
send_command=mock.DEFAULT,
read_response_without_lock=mock.DEFAULT,
read_response=mock.DEFAULT,
_connect=mock.DEFAULT,
can_read=mock.DEFAULT,
on_connect=mock.DEFAULT,
Expand Down Expand Up @@ -548,7 +544,7 @@ def execute_command_mock_third(self, *args, **options):
# so we'll mock some of the Connection's functions to allow it
execute_command.side_effect = execute_command_mock_first
mocks["send_command"].return_value = True
mocks["read_response_without_lock"].return_value = "OK"
mocks["read_response"].return_value = "OK"
mocks["_connect"].return_value = True
mocks["can_read"].return_value = False
mocks["on_connect"].return_value = True
Expand Down Expand Up @@ -857,8 +853,8 @@ async def test_cluster_delslots(self) -> None:
node0 = r.get_node(default_host, 7000)
node1 = r.get_node(default_host, 7001)
assert await r.cluster_delslots(0, 8192) == [True, True]
assert node0._free.pop().read_response_without_lock.called
assert node1._free.pop().read_response_without_lock.called
assert node0._free.pop().read_response.called
assert node1._free.pop().read_response.called

await r.close()

Expand Down Expand Up @@ -1027,7 +1023,7 @@ async def test_cluster_setslot_stable(self, r: RedisCluster) -> None:
node = r.nodes_manager.get_node_from_slot(12182)
mock_node_resp(node, "OK")
assert await r.cluster_setslot_stable(12182) is True
assert node._free.pop().read_response_without_lock.called
assert node._free.pop().read_response.called

@skip_if_redis_enterprise()
async def test_cluster_replicas(self, r: RedisCluster) -> None:
Expand Down Expand Up @@ -1069,7 +1065,7 @@ async def test_readonly(self) -> None:
for res in all_replicas_results.values():
assert res is True
for replica in r.get_replicas():
assert replica._free.pop().read_response_without_lock.called
assert replica._free.pop().read_response.called

await r.close()

Expand All @@ -1082,7 +1078,7 @@ async def test_readwrite(self) -> None:
for res in all_replicas_results.values():
assert res is True
for replica in r.get_replicas():
assert replica._free.pop().read_response_without_lock.called
assert replica._free.pop().read_response.called

await r.close()

Expand Down Expand Up @@ -2441,8 +2437,8 @@ async def test_asking_error(self, r: RedisCluster) -> None:
mock_node_resp_exc(first_node, AskError(ask_msg))
mock_node_resp(ask_node, "MOCK_OK")
res = await pipe.get(key).execute()
assert first_node._free.pop().read_response_without_lock.await_count
assert ask_node._free.pop().read_response_without_lock.await_count
assert first_node._free.pop().read_response.await_count
assert ask_node._free.pop().read_response.await_count
assert res == ["MOCK_OK"]

async def test_moved_redirection_on_slave_with_default(
Expand Down Expand Up @@ -2497,7 +2493,7 @@ async def test_readonly_pipeline_from_readonly_client(
executed_on_replica = False
for node in slot_nodes:
if node.server_type == REPLICA:
if node._free.pop().read_response_without_lock.await_count:
if node._free.pop().read_response.await_count:
executed_on_replica = True
break
assert executed_on_replica
Expand Down
4 changes: 4 additions & 0 deletions tests/test_asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ async def test_socket_param_regression(r):


async def test_can_run_concurrent_commands(r):
if getattr(r, "connection", None) is not None:
# Concurrent commands are only supported on pooled or cluster connections
# since there is no synchronization on a single connection.
pytest.skip("pool only")
assert await r.ping() is True
assert all(await asyncio.gather(*(r.ping() for _ in range(10))))

Expand Down