Skip to content

Warn about event loop changes using Redis/FakeRedis #622

Open
@M1ha-Shvn

Description

@M1ha-Shvn

Hi.
I'm using fakeredis to run tests via pytest-asyncio in FastAPI application.

Library versions:

fakeredis==2.19.0
fastapi==0.103.1
hiredis==2.2.3
lupa==2.0
pytest-asyncio==0.21.1
redis==5.0.0

Everything works well in python 3.9 and earlier. But in 3.10 and 3.11 I start getting exceptions:

test setup failed
event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
request = <SubRequest 'storage_clean_up' for <Function test_read_from_cache__cache_save_failure>>
kwargs = {}
setup = <function _wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup at 0x7feb442d5900>

    @functools.wraps(fixture)
    def _async_fixture_wrapper(
        event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
    ):
        func = _perhaps_rebind_fixture_func(
            fixture, request.instance, fixturedef.unittest
        )
    
        async def setup():
            res = await func(**_add_kwargs(func, kwargs, event_loop, request))
            return res
    
>       return event_loop.run_until_complete(setup())

../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:326: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
    return future.result()
../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323: in setup
    res = await func(**_add_kwargs(func, kwargs, event_loop, request))
/opt/project/core/storages/tests/fixtures.py:23: in storage_clean_up
    await cls.storage.truncate()
/opt/project/core/storages/redis.py:285: in truncate
    await client.flushall()
../pip/lib/python3.10/site-packages/redis/asyncio/client.py:545: in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:1111: in get_connection
    if await connection.can_read_destructive():
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:472: in can_read_destructive
    return await self._parser.can_read_destructive()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:179: in can_read_destructive
    return await self.read_from_socket()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
    buffer = await self._stream.read(self._read_size)
../pip/lib/python3.10/site-packages/fakeredis/aioredis.py:83: in read
    return await self._socket.responses.get()  # type:ignore
/usr/local/lib/python3.10/asyncio/queues.py:156: in get
    getter = self._get_loop().create_future()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7feb429a9c60 maxsize=0 tasks=4>

    def _get_loop(self):
        loop = events._get_running_loop()
    
        if self._loop is None:
            with _global_lock:
                if self._loop is None:
                    self._loop = loop
        if loop is not self._loop:
>           raise RuntimeError(f'{self!r} is bound to a different event loop')
E           RuntimeError: <Queue at 0x7feb429a9c60 maxsize=0 tasks=4> is bound to a different event loop

/usr/local/lib/python3.10/asyncio/mixins.py:30: RuntimeError

If I disable FakeRedis and use redis-py in tests without isolation, I start getting this error:

self = Connection<host=redis,port=6379,db=0>, disable_decoding = False
timeout = None

    async def read_response(
        self,
        disable_decoding: bool = False,
        timeout: Optional[float] = None,
        *,
        disconnect_on_error: bool = True,
        push_request: Optional[bool] = False,
    ):
        """Read the response from a previously sent command"""
        read_timeout = timeout if timeout is not None else self.socket_timeout
        host_error = self._host_error()
        try:
            if (
                read_timeout is not None
                and self.protocol in ["3", 3]
                and not HIREDIS_AVAILABLE
            ):
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding, push_request=push_request
                    )
            elif read_timeout is not None:
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding
                    )
            elif self.protocol in ["3", 3] and not HIREDIS_AVAILABLE:
                response = await self._parser.read_response(
                    disable_decoding=disable_decoding, push_request=push_request
                )
            else:
>               response = await self._parser.read_response(
                    disable_decoding=disable_decoding
                )

/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:509: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:203: in read_response
    await self.read_from_socket()
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
    buffer = await self._stream.read(self._read_size)
/usr/local/lib/python3.10/asyncio/streams.py:669: in read
    await self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <StreamReader transport=<_SelectorSocketTransport closing fd=11>>
func_name = 'read'

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.
    
        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')
    
        assert not self._eof, '_wait_for_data after EOF'
    
        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()
    
        self._waiter = self._loop.create_future()
        try:
>           await self._waiter
E           RuntimeError: Task <Task pending name='Task-3' coro=<_wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup() running at /app/pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323> cb=[_run_until_complete_cb() at /usr/local/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

/usr/local/lib/python3.10/asyncio/streams.py:501: RuntimeError

During handling of the above exception, another exception occurred:

cls = <core.storages.tests.test_storages.TestRedisStorage object at 0x7f8c542801c0>

    @classmethod
    @pytest_asyncio.fixture(autouse=True)
    async def storage_clean_up(cls):
>       await cls.storage.truncate()

test_storages.py:51: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../redis.py:282: in truncate
    await client.flushall()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:550: in execute_command
    return await conn.retry.call_with_retry(
/app/pip/lib/python3.10/site-packages/redis/asyncio/retry.py:59: in call_with_retry
    return await do()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:524: in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:571: in parse_response
    response = await connection.read_response()
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:529: in read_response
    await self.disconnect(nowait=True)
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:385: in disconnect
    self._writer.close()  # type: ignore[union-attr]
/usr/local/lib/python3.10/asyncio/streams.py:337: in close
    return self._transport.close()
/usr/local/lib/python3.10/asyncio/selector_events.py:706: in close
    self._loop.call_soon(self._call_connection_lost, None)
/usr/local/lib/python3.10/asyncio/base_events.py:753: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/local/lib/python3.10/asyncio/base_events.py:515: RuntimeError

Firstly, I thought, it was a problem in Fakeredis and created an issue there. But now I suppose, that it is something connected with pytest-asyncio as:

  1. Tests fail in base redis library too
  2. Redis-py does not fail without tests, my project works well while testing the server manually without tests

Can you suggest anything and where to dig more?
Thanks

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions