diff --git a/tests/test_asyncio/test_connect.py b/tests/test_asyncio/test_connect.py index bead7208f5..0b2d7c2afa 100644 --- a/tests/test_asyncio/test_connect.py +++ b/tests/test_asyncio/test_connect.py @@ -62,6 +62,7 @@ async def test_tcp_ssl_connect(tcp_address): socket_timeout=10, ) await _assert_connect(conn, tcp_address, certfile=certfile, keyfile=keyfile) + await conn.disconnect() async def _assert_connect(conn, server_address, certfile=None, keyfile=None): diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index d1aad796e7..9ee6db1566 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -254,9 +254,8 @@ async def do_close(): async def do_read(): return await conn.read_response() - reader = mock.AsyncMock() - writer = mock.AsyncMock() - writer.transport = mock.Mock() + reader = mock.Mock(spec=asyncio.StreamReader) + writer = mock.Mock(spec=asyncio.StreamWriter) writer.transport.get_extra_info.side_effect = None # for HiredisParser diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index ff588861e4..35707553f8 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -49,22 +49,22 @@ def set_delay(self, delay: float = 0.0): async def handle(self, reader, writer): # establish connection to redis redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr) - try: - pipe1 = asyncio.create_task( - self.pipe(reader, redis_writer, "to redis:", self.send_event) - ) - pipe2 = asyncio.create_task(self.pipe(redis_reader, writer, "from redis:")) - await asyncio.gather(pipe1, pipe2) - finally: - redis_writer.close() + pipe1 = asyncio.create_task( + self.pipe(reader, redis_writer, "to redis:", self.send_event) + ) + pipe2 = asyncio.create_task(self.pipe(redis_reader, writer, "from redis:")) + await asyncio.gather(pipe1, pipe2) async def stop(self): - # clean up enough so that we can reuse the looper + # shutdown the server self.task.cancel() try: await self.task except asyncio.CancelledError: pass + await self.server.wait_closed() + # do we need to close individual connections too? + # prudently close all async generators loop = self.server.get_loop() await loop.shutdown_asyncgens() @@ -75,16 +75,25 @@ async def pipe( name="", event: asyncio.Event = None, ): - while True: - data = await reader.read(1000) - if not data: - break - # print(f"{name} read {len(data)} delay {self.delay}") - if event: - event.set() - await asyncio.sleep(self.delay) - writer.write(data) - await writer.drain() + try: + while True: + data = await reader.read(1000) + if not data: + break + # print(f"{name} read {len(data)} delay {self.delay}") + if event: + event.set() + await asyncio.sleep(self.delay) + writer.write(data) + await writer.drain() + finally: + try: + writer.close() + await writer.wait_closed() + except RuntimeError: + # ignore errors on close pertaining to no event loop. Don't want + # to clutter the test output with errors if being garbage collected + pass @pytest.mark.onlynoncluster