From e6a84f41befc759cfca2e13398e6f2ba500c71da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 19 Jul 2023 16:43:30 +0000 Subject: [PATCH 01/11] Redis.from_url() should have an auto-closing connection pool. --- redis/asyncio/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 111df24185..867be53cf2 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -146,10 +146,12 @@ class initializer. In the case of conflicting arguments, querystring """ single_connection_client = kwargs.pop("single_connection_client", False) connection_pool = ConnectionPool.from_url(url, **kwargs) - return cls( + redis = cls( connection_pool=connection_pool, single_connection_client=single_connection_client, ) + redis.auto_close_connection_pool = True + return redis def __init__( self, From 4efb8810f7159cf3b5dd2f150b78985662649d08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 19 Jul 2023 16:44:23 +0000 Subject: [PATCH 02/11] An async socket now closes the write stream (synchronously) when it is deleted. A StreamWriter does not reliably close its socket when it is collected. --- redis/asyncio/connection.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 22c5030e6c..9098fd2573 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -207,6 +207,9 @@ def __repr__(self): repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces())) return f"{self.__class__.__name__}<{repr_args}>" + def __del__(self): + self._close_socket() + @abstractmethod def repr_pieces(self): pass @@ -377,6 +380,22 @@ async def disconnect(self, nowait: bool = False) -> None: f"Timed out closing connection after {self.socket_connect_timeout}" ) from None + def _close_socket(self): + """Close the socket directly. Used during garbage collection to + make sure the underlying socket is released. This does not happen + reliably when the stream is garbage collected. + """ + if self._writer: + if os.getpid() == self.pid: + try: + self._writer.close() + except RuntimeError: + # This may fail if the event loop is already closed, + # even though this is not an async call. In this + # case, just ignore the error, since it is during + # exit anyway. + pass + async def _send_ping(self): """Send PING, expect PONG in return""" await self.send_command("PING", check_health=False) From 2cb665116e6b9d69bebb74abf2bbbd27ff23cf2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 19 Jul 2023 16:45:17 +0000 Subject: [PATCH 03/11] Add tests --- tests/test_asyncio/test_connection.py | 51 +++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 09960fd7e2..47b3d1b431 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -1,7 +1,7 @@ import asyncio import socket import types -from unittest.mock import patch +from unittest.mock import Mock, patch import pytest import redis @@ -12,7 +12,12 @@ _AsyncRESPBase, ) from redis.asyncio import Redis -from redis.asyncio.connection import Connection, UnixDomainSocketConnection +from redis.asyncio.connection import ( + AbstractConnection, + Connection, + UnixDomainSocketConnection, + parse_url, +) from redis.asyncio.retry import Retry from redis.backoff import NoBackoff from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError @@ -278,3 +283,45 @@ async def open_connection(*args, **kwargs): def test_create_single_connection_client_from_url(): client = Redis.from_url("redis://localhost:6379/0?", single_connection_client=True) assert client.single_connection_client is True + + +@pytest.mark.parametrize("from_url", (True, False)) +async def test_pool_auto_close(request, from_url): + """Verify that basic Redis instances have auto_close_connection_pool set to True""" + + url: str = request.config.getoption("--redis-url") + url_args = parse_url(url) + + async def get_redis_connection(): + if from_url: + return Redis.from_url(url) + return Redis(**url_args) + + r1 = await get_redis_connection() + assert r1.auto_close_connection_pool is True + + +@pytest.mark.parametrize("from_url", (True, False)) +async def test_connection_socket_cleanup(request, from_url): + """Verify that connections are cleaned up when they + are garbage collected + """ + url: str = request.config.getoption("--redis-url") + url_args = parse_url(url) + + async def get_redis_connection(): + if from_url: + return Redis.from_url(url) + return Redis(**url_args) + + async def do_something(redis): + await redis.incr("counter") + await redis.close() + + mock = Mock() + with patch.object(AbstractConnection, "_close_socket", mock): + r1 = await get_redis_connection() + await do_something(r1) + r1 = None + + assert mock.call_count == 1 From f32cbd70f541d6cd15ff5c52192082ceb51fd046 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 19 Jul 2023 16:45:53 +0000 Subject: [PATCH 04/11] Skip a test not available on all platforms. --- tests/test_connect.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/tests/test_connect.py b/tests/test_connect.py index b233c67e83..f07750dc80 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -61,6 +61,8 @@ def test_tcp_ssl_connect(tcp_address): def _assert_connect(conn, server_address, certfile=None, keyfile=None): if isinstance(server_address, str): + if not _RedisUDSServer: + pytest.skip("Unix domain sockets are not supported on this platform") server = _RedisUDSServer(server_address, _RedisRequestHandler) else: server = _RedisTCPServer( @@ -113,24 +115,29 @@ def get_request(self): return connstream, fromaddr -class _RedisUDSServer(socketserver.UnixStreamServer): - def __init__(self, *args, **kw) -> None: - self._ready_event = threading.Event() - self._stop_requested = False - super().__init__(*args, **kw) +if hasattr(socket, "UnixStreamServer"): - def service_actions(self): - self._ready_event.set() + class _RedisUDSServer(socketserver.UnixStreamServer): + def __init__(self, *args, **kw) -> None: + self._ready_event = threading.Event() + self._stop_requested = False + super().__init__(*args, **kw) - def wait_online(self): - self._ready_event.wait() + def service_actions(self): + self._ready_event.set() - def stop(self): - self._stop_requested = True - self.shutdown() + def wait_online(self): + self._ready_event.wait() - def is_serving(self): - return not self._stop_requested + def stop(self): + self._stop_requested = True + self.shutdown() + + def is_serving(self): + return not self._stop_requested + +else: + _RedisUDSServer = None class _RedisRequestHandler(socketserver.StreamRequestHandler): From 6329a514a40650fabe2a4f716d42de1a1c7c9a82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 19 Jul 2023 16:56:36 +0000 Subject: [PATCH 05/11] changes --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 49f87cd35d..879722caf1 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Fix #2831, closing connections and sockets when garbage collected. * Fix incorrect redis.asyncio.Cluster type hint for `retry_on_error` * Fix dead weakref in sentinel connection causing ReferenceError (#2767) * Fix #2768, Fix KeyError: 'first-entry' in parse_xinfo_stream. From ed1b5c0bf7660e773e7dd9cca08ec5abcf95740f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 19 Jul 2023 18:41:57 +0000 Subject: [PATCH 06/11] only test on non-cluster --- tests/test_asyncio/test_connection.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 47b3d1b431..564f309fcc 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -301,6 +301,7 @@ async def get_redis_connection(): assert r1.auto_close_connection_pool is True +@pytest.mark.onlynoncluster @pytest.mark.parametrize("from_url", (True, False)) async def test_connection_socket_cleanup(request, from_url): """Verify that connections are cleaned up when they From b81b77eb9ef9562b764be0d9d5f2e105cf9a3f54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 19 Jul 2023 19:50:46 +0000 Subject: [PATCH 07/11] cope with classes that don't have _writer --- redis/asyncio/connection.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 9098fd2573..51c267c95f 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -383,12 +383,16 @@ async def disconnect(self, nowait: bool = False) -> None: def _close_socket(self): """Close the socket directly. Used during garbage collection to make sure the underlying socket is released. This does not happen - reliably when the stream is garbage collected. + reliably when the stream is garbage collected. This is a safety + precaution, correct use of the library should ensure that + sockets are disconnected properly. """ - if self._writer: + # some test classes don't even have this + writer = getattr(self, "_writer", None) + if writer: if os.getpid() == self.pid: try: - self._writer.close() + writer.close() except RuntimeError: # This may fail if the event loop is already closed, # even though this is not an async call. In this From 8fd35371af51c5d6a50c6c3cc42481101e936242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 19 Jul 2023 19:53:51 +0000 Subject: [PATCH 08/11] don't test GC behaviour on pypy --- tests/test_asyncio/test_connection.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 564f309fcc..3e641a27d5 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -1,4 +1,5 @@ import asyncio +import platform import socket import types from unittest.mock import Mock, patch @@ -307,6 +308,8 @@ async def test_connection_socket_cleanup(request, from_url): """Verify that connections are cleaned up when they are garbage collected """ + if platform.python_implementation() != "CPython": + pytest.skip("only works on CPython") url: str = request.config.getoption("--redis-url") url_args = parse_url(url) From a80c64c24295e524a6c13378e31a1bfef5046be8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 20 Jul 2023 10:18:45 +0000 Subject: [PATCH 09/11] Allow the "auto_close_connection_pool" argument to Redis.from_url --- redis/asyncio/client.py | 3 ++- tests/test_asyncio/test_connection.py | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 867be53cf2..a380f286c8 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -145,12 +145,13 @@ class initializer. In the case of conflicting arguments, querystring """ single_connection_client = kwargs.pop("single_connection_client", False) + auto_close_connection_pool = kwargs.pop("auto_close_connection_pool", True) connection_pool = ConnectionPool.from_url(url, **kwargs) redis = cls( connection_pool=connection_pool, single_connection_client=single_connection_client, ) - redis.auto_close_connection_pool = True + redis.auto_close_connection_pool = auto_close_connection_pool return redis def __init__( diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 3e641a27d5..c4816a7e53 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -300,6 +300,26 @@ async def get_redis_connection(): r1 = await get_redis_connection() assert r1.auto_close_connection_pool is True + await r1.close() + + +@pytest.mark.parametrize("from_url", (True, False)) +async def test_pool_auto_close_disable(request, from_url): + """Verify that auto_close_connection_pool can be disabled""" + + url: str = request.config.getoption("--redis-url") + url_args = parse_url(url) + + async def get_redis_connection(): + if from_url: + return Redis.from_url(url, auto_close_connection_pool=False) + url_args["auto_close_connection_pool"] = False + return Redis(**url_args) + + r1 = await get_redis_connection() + assert r1.auto_close_connection_pool is False + await r1.connection_pool.disconnect() + await r1.close() @pytest.mark.onlynoncluster From 2d8a12842085324f136186b1655f9c89b4ece36a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 20 Jul 2023 10:22:18 +0000 Subject: [PATCH 10/11] Remove the __del__ method again from asyincio.Connection --- redis/asyncio/connection.py | 23 ---------------- tests/test_asyncio/test_connection.py | 39 ++------------------------- 2 files changed, 2 insertions(+), 60 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 51c267c95f..22c5030e6c 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -207,9 +207,6 @@ def __repr__(self): repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces())) return f"{self.__class__.__name__}<{repr_args}>" - def __del__(self): - self._close_socket() - @abstractmethod def repr_pieces(self): pass @@ -380,26 +377,6 @@ async def disconnect(self, nowait: bool = False) -> None: f"Timed out closing connection after {self.socket_connect_timeout}" ) from None - def _close_socket(self): - """Close the socket directly. Used during garbage collection to - make sure the underlying socket is released. This does not happen - reliably when the stream is garbage collected. This is a safety - precaution, correct use of the library should ensure that - sockets are disconnected properly. - """ - # some test classes don't even have this - writer = getattr(self, "_writer", None) - if writer: - if os.getpid() == self.pid: - try: - writer.close() - except RuntimeError: - # This may fail if the event loop is already closed, - # even though this is not an async call. In this - # case, just ignore the error, since it is during - # exit anyway. - pass - async def _send_ping(self): """Send PING, expect PONG in return""" await self.send_command("PING", check_health=False) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index c4816a7e53..9a729392b8 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -1,8 +1,7 @@ import asyncio -import platform import socket import types -from unittest.mock import Mock, patch +from unittest.mock import patch import pytest import redis @@ -13,12 +12,7 @@ _AsyncRESPBase, ) from redis.asyncio import Redis -from redis.asyncio.connection import ( - AbstractConnection, - Connection, - UnixDomainSocketConnection, - parse_url, -) +from redis.asyncio.connection import Connection, UnixDomainSocketConnection, parse_url from redis.asyncio.retry import Retry from redis.backoff import NoBackoff from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError @@ -320,32 +314,3 @@ async def get_redis_connection(): assert r1.auto_close_connection_pool is False await r1.connection_pool.disconnect() await r1.close() - - -@pytest.mark.onlynoncluster -@pytest.mark.parametrize("from_url", (True, False)) -async def test_connection_socket_cleanup(request, from_url): - """Verify that connections are cleaned up when they - are garbage collected - """ - if platform.python_implementation() != "CPython": - pytest.skip("only works on CPython") - url: str = request.config.getoption("--redis-url") - url_args = parse_url(url) - - async def get_redis_connection(): - if from_url: - return Redis.from_url(url) - return Redis(**url_args) - - async def do_something(redis): - await redis.incr("counter") - await redis.close() - - mock = Mock() - with patch.object(AbstractConnection, "_close_socket", mock): - r1 = await get_redis_connection() - await do_something(r1) - r1 = None - - assert mock.call_count == 1 From 34a69ff58c9606fc2cf66f188e820f790a8655e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 28 Jul 2023 10:23:59 +0000 Subject: [PATCH 11/11] Add arguments to `asyncio.Redis.from_url()` which are not passed to `ConnectionPool` Documentation specifies that the `kwargs` are pssed to `ConnectionPool` but some arguments are not meant for it. --- CHANGES | 2 +- redis/asyncio/client.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/CHANGES b/CHANGES index 879722caf1..363f2b927d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,4 @@ - * Fix #2831, closing connections and sockets when garbage collected. + * Fix #2831, add auto_close_connection_pool=True arg to asyncio.Redis.from_url() * Fix incorrect redis.asyncio.Cluster type hint for `retry_on_error` * Fix dead weakref in sentinel connection causing ReferenceError (#2767) * Fix #2768, Fix KeyError: 'first-entry' in parse_xinfo_stream. diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index a380f286c8..31e27a4462 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -104,7 +104,13 @@ class Redis( response_callbacks: MutableMapping[Union[str, bytes], ResponseCallbackT] @classmethod - def from_url(cls, url: str, **kwargs): + def from_url( + cls, + url: str, + single_connection_client: bool = False, + auto_close_connection_pool: bool = True, + **kwargs, + ): """ Return a Redis client object configured from the given URL @@ -144,8 +150,6 @@ class initializer. In the case of conflicting arguments, querystring arguments always win. """ - single_connection_client = kwargs.pop("single_connection_client", False) - auto_close_connection_pool = kwargs.pop("auto_close_connection_pool", True) connection_pool = ConnectionPool.from_url(url, **kwargs) redis = cls( connection_pool=connection_pool,