Skip to content

Async redis subscription inside websocket doesn't shut down properly  #2523

Closed
@wholmen

Description

@wholmen

Version: 4.4.0

Platform: Python 3.11.0, ubuntu

Description:

I am using async redis to subscribe to a topic within a fastapi websocket connection. It works fine, but I cannot make it shut down properly. i.e. when the server shuts down, the await get_message is not stopping.

I have tried to use SIGTERM, but it seems the reader-function doesn't yield to SIGTERM, because it gives error

INFO:     Waiting for background tasks to complete. (CTRL+C to force quit)

Before handling the SIGTERM command.

Does anyone have a suggestion for where to start looking?

@router.websocket("/ws")
async def websocket_labtest(websocket: WebSocket):
    await websocket.accept()

    async_redis_client: aioredis.client.Redis = aioredis.from_url(
        url=settings.redis_url, port=settings.redis_port, password=settings.redis_password, decode_responses=True
    )
    channelname: str = f"channel:{events.LabtestAddedEvent.__name__}"
    redis_channel = async_redis_client.pubsub()

    async def reader(channel: aioredis.client.PubSub):
        while True:
            try:
                async with async_timeout.timeout(1):
                    message = await channel.get_message(ignore_subscribe_messages=True)
                    if message is not None:
                        response = json.loads(message["data"])
                        await websocket.send_json(response)
                    await asyncio.sleep(0.01)

            except WebSocketDisconnect:
                print("hit websocket disconnect")
            except asyncio.TimeoutError:
                pass
            except asyncio.CancelledError:
                print("Cancelled")
                break
            except aioredis.PubSubError:
                print("Pubsub error")
                break

    async with redis_channel as p:
        await p.subscribe(channelname)
        await reader(p)
        await p.unsubscribe(channelname)

    await redis_channel.close()

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions