Skip to content

Connection pool aenter get connection from queue and not return connection to user. #955

Closed
@matemax

Description

@matemax
  • asyncpg version: 0.26.0
  • PostgreSQL version: 12
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
    the issue with a local PostgreSQL install?
    : no, local installation
  • Python version: 3.9
  • Platform: linux, centos8
  • Do you use pgbouncer?: no
  • Did you install asyncpg with pip?: we use poetry
  • If you built asyncpg locally, which version of Cython did you use?: no
  • Can the issue be reproduced under both asyncio and
    uvloop?
    : didn't try, problem with reproducing, we use uvloop

We use connection pool. And connections are ending after while and our app is hang up (all new response has status code 500 and error "can not get connection from pool"). We observe this behavior when postgres process several queries simultaneous long time and we try to get new connection from the pool with timeout. This is not not stable and can happen 1 per week.

We debug and see following strangeness:

Our code

ctx: PoolAcquireContext = self.pool.acquire(timeout=timeoutBudget)
try: 
    connection = await ctx.__aenter__()
    logger.debug("connection was received")
except asyncio.TimeoutError:
  ...

asyncpg code (we added log)

    async def _acquire(self, timeout):
        async def _acquire_impl():
            ch = await self._queue.get()  # type: PoolConnectionHolder
            try:
                proxy = await ch.acquire()  # type: PoolConnectionProxy
            except (Exception, asyncio.CancelledError):
                self._queue.put_nowait(ch)
                raise
            else:
                # Record the timeout, as we will apply it by default
                # in release().
                ch._timeout = timeout
                logger.debug("connection was gotten from queue")
                return proxy

        if self._closing:
            raise exceptions.InterfaceError('pool is closing')
        self._check_init()

        if timeout is None:
            return await _acquire_impl()
        else:
            return await compat.wait_for(
                _acquire_impl(), timeout=timeout)

We calculate "connection was received" messages count (let be A ) and "connection was gotten from queue" messages count (let be B ) after after app hanged up. B - A = connection pool size.

We have hypothesis that problem here . Future is completed but occurred timeout before compat.wait_for return result.

We try to use following code

       ctx: PoolAcquireContext = self.pool.acquire()
       connection = None
        try:
           async with async_timeout.timeout(timeoutBudget):
               connection = await ctx.__aenter__()
       except aTimeoutError as e:
           if connection:
               # connection valid but pool returns it too late. Exception was raised in timeout
               # __aexit__, return connection to loop
               await ctx.__aexit__()
 
       except asyncio.CancelledError:
           # user canceled request
           if connection:
               await ctx.__aexit__()

[async-timeout](https://github.com/aio-libs/async-timeout) does not create additional task from ctx.__aenter__() coroutine. This code work for us (but maybe we're lucky).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions