|
1 | 1 | import asyncio
|
2 | 2 | import functools
|
3 | 3 | import socket
|
| 4 | +import sys |
4 | 5 | from typing import Optional
|
| 6 | +from unittest.mock import patch |
5 | 7 |
|
6 | 8 | import async_timeout
|
7 | 9 | import pytest
|
@@ -914,3 +916,76 @@ async def loop_step_listen(self):
|
914 | 916 | return True
|
915 | 917 | except asyncio.TimeoutError:
|
916 | 918 | return False
|
| 919 | + |
| 920 | + |
| 921 | +@pytest.mark.xfail |
| 922 | +@pytest.mark.onlynoncluster |
| 923 | +class TestBaseException: |
| 924 | + @pytest.mark.skipif( |
| 925 | + sys.version_info < (3, 8), reason="requires python 3.8 or higher" |
| 926 | + ) |
| 927 | + async def test_outer_timeout(self, r: redis.Redis): |
| 928 | + """ |
| 929 | + Using asyncio_timeout manually outside the inner method timeouts works. |
| 930 | + This works on Python versions 3.8 and greater, at which time asyncio. |
| 931 | + CancelledError became a BaseException instead of an Exception before. |
| 932 | + """ |
| 933 | + pubsub = r.pubsub() |
| 934 | + await pubsub.subscribe("foo") |
| 935 | + assert pubsub.connection.is_connected |
| 936 | + |
| 937 | + async def get_msg_or_timeout(timeout=0.1): |
| 938 | + async with async_timeout.timeout(timeout): |
| 939 | + # blocking method to return messages |
| 940 | + while True: |
| 941 | + response = await pubsub.parse_response(block=True) |
| 942 | + message = await pubsub.handle_message( |
| 943 | + response, ignore_subscribe_messages=False |
| 944 | + ) |
| 945 | + if message is not None: |
| 946 | + return message |
| 947 | + |
| 948 | + # get subscribe message |
| 949 | + msg = await get_msg_or_timeout(10) |
| 950 | + assert msg is not None |
| 951 | + # timeout waiting for another message which never arrives |
| 952 | + assert pubsub.connection.is_connected |
| 953 | + with pytest.raises(asyncio.TimeoutError): |
| 954 | + await get_msg_or_timeout() |
| 955 | + # the timeout on the read should not cause disconnect |
| 956 | + assert pubsub.connection.is_connected |
| 957 | + |
| 958 | + async def test_base_exception(self, r: redis.Redis): |
| 959 | + """ |
| 960 | + Manually trigger a BaseException inside the parser's .read_response method |
| 961 | + and verify that it isn't caught |
| 962 | + """ |
| 963 | + pubsub = r.pubsub() |
| 964 | + await pubsub.subscribe("foo") |
| 965 | + assert pubsub.connection.is_connected |
| 966 | + |
| 967 | + async def get_msg(): |
| 968 | + # blocking method to return messages |
| 969 | + while True: |
| 970 | + response = await pubsub.parse_response(block=True) |
| 971 | + message = await pubsub.handle_message( |
| 972 | + response, ignore_subscribe_messages=False |
| 973 | + ) |
| 974 | + if message is not None: |
| 975 | + return message |
| 976 | + |
| 977 | + # get subscribe message |
| 978 | + msg = await get_msg() |
| 979 | + assert msg is not None |
| 980 | + # timeout waiting for another message which never arrives |
| 981 | + assert pubsub.connection.is_connected |
| 982 | + with patch("redis.asyncio.connection.PythonParser.read_response") as mock1: |
| 983 | + mock1.side_effect = BaseException("boom") |
| 984 | + with patch("redis.asyncio.connection.HiredisParser.read_response") as mock2: |
| 985 | + mock2.side_effect = BaseException("boom") |
| 986 | + |
| 987 | + with pytest.raises(BaseException): |
| 988 | + await get_msg() |
| 989 | + |
| 990 | + # the timeout on the read should not cause disconnect |
| 991 | + assert pubsub.connection.is_connected |
0 commit comments