From b7c4f09fc875097f6690687c86d9681a4b6cb648 Mon Sep 17 00:00:00 2001 From: jmarkin Date: Sun, 16 Apr 2023 00:30:37 +0300 Subject: [PATCH 1/4] fix local pool addresses --- .gitignore | 1 + asyncpg/connect_utils.py | 27 +++++++++++++++++++-------- asyncpg/connection.py | 2 ++ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 21286094..def5770d 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ docs/_build /.eggs /.vscode /.mypy_cache +.venv diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 40905edf..4fe3c6e6 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -10,6 +10,9 @@ import enum import functools import getpass +import inspect +import logging +import random import os import pathlib import platform @@ -23,12 +26,10 @@ import typing import urllib.parse import warnings -import inspect -from . import compat -from . import exceptions -from . import protocol +from . import compat, exceptions, protocol +logger = logging.getLogger(__name__) class SSLMode(enum.IntEnum): disable = 0 @@ -867,21 +868,27 @@ async def __connect_addr( return con -async def _connect(*, loop, timeout, connection_class, record_class, **kwargs): +async def _connect(*, loop, timeout, connection_class, record_class, connect_timeout=60, **kwargs): if loop is None: loop = asyncio.get_event_loop() addrs, params, config = _parse_connect_arguments(timeout=timeout, **kwargs) - - last_error = None + + random.shuffle(addrs) + last_error = ConnectionError(f"Can't conenct to all hosts {addrs}") addr = None for addr in addrs: + if addr in _connect._skip_addresses: + current_time = time.monotonic() + if current_time < _connect._skip_addresses[addr]: + del _connect._skip_addresses[addr] + continue before = time.monotonic() try: return await _connect_addr( addr=addr, loop=loop, - timeout=timeout, + timeout=connect_timeout, params=params, config=config, connection_class=connection_class, @@ -889,11 +896,15 @@ async def _connect(*, loop, timeout, connection_class, record_class, **kwargs): ) except (OSError, asyncio.TimeoutError, ConnectionError) as ex: last_error = ex + _connect._skip_addresses[addr] = time.monotonic() + connect_timeout + logger.warning("Can't connect to %s: %s", addr, ex, exc_info=True) finally: timeout -= time.monotonic() - before raise last_error +_connect._skip_addresses = {} + async def _cancel(*, loop, addr, params: _ConnectionParameters, backend_pid, backend_secret): diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 73cb6e66..91667a28 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -1784,6 +1784,7 @@ async def connect(dsn=None, *, database=None, loop=None, timeout=60, + connect_timeout=60, statement_cache_size=100, max_cached_statement_lifetime=300, max_cacheable_statement_size=1024 * 15, @@ -2104,6 +2105,7 @@ async def connect(dsn=None, *, ssl=ssl, direct_tls=direct_tls, database=database, + connect_timeout=connect_timeout, server_settings=server_settings, command_timeout=command_timeout, statement_cache_size=statement_cache_size, From aa9d4070b2e05994163e62db034aa2d100c96136 Mon Sep 17 00:00:00 2001 From: jmarkin Date: Sun, 16 Apr 2023 00:49:22 +0300 Subject: [PATCH 2/4] fix --- asyncpg/connect_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 4fe3c6e6..1d9741d6 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -873,9 +873,9 @@ async def _connect(*, loop, timeout, connection_class, record_class, connect_tim loop = asyncio.get_event_loop() addrs, params, config = _parse_connect_arguments(timeout=timeout, **kwargs) - + random.shuffle(addrs) - last_error = ConnectionError(f"Can't conenct to all hosts {addrs}") + last_error = ConnectionError(f"Can't connect to all hosts {addrs}") addr = None for addr in addrs: if addr in _connect._skip_addresses: From 610d340d23b41549fca27bb237a7aba6027718a9 Mon Sep 17 00:00:00 2001 From: jmarkin Date: Sun, 16 Apr 2023 01:22:13 +0300 Subject: [PATCH 3/4] remove skip_addresses --- asyncpg/connect_utils.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 1d9741d6..752861ea 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -878,11 +878,6 @@ async def _connect(*, loop, timeout, connection_class, record_class, connect_tim last_error = ConnectionError(f"Can't connect to all hosts {addrs}") addr = None for addr in addrs: - if addr in _connect._skip_addresses: - current_time = time.monotonic() - if current_time < _connect._skip_addresses[addr]: - del _connect._skip_addresses[addr] - continue before = time.monotonic() try: return await _connect_addr( @@ -896,15 +891,12 @@ async def _connect(*, loop, timeout, connection_class, record_class, connect_tim ) except (OSError, asyncio.TimeoutError, ConnectionError) as ex: last_error = ex - _connect._skip_addresses[addr] = time.monotonic() + connect_timeout logger.warning("Can't connect to %s: %s", addr, ex, exc_info=True) finally: timeout -= time.monotonic() - before raise last_error -_connect._skip_addresses = {} - async def _cancel(*, loop, addr, params: _ConnectionParameters, backend_pid, backend_secret): From 34f0fe2a69abffc72b73da359d12e9fc3b2fbd7b Mon Sep 17 00:00:00 2001 From: jmarkin Date: Sun, 16 Apr 2023 21:50:49 +0300 Subject: [PATCH 4/4] fix parse conenct argument --- asyncpg/connect_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 8204eb1c..f36851b3 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -887,12 +887,14 @@ async def _connect(*, loop, timeout, connection_class, record_class, connect_tim if loop is None: loop = asyncio.get_event_loop() - addrs, params, config = _parse_connect_arguments(timeout=timeout, **kwargs) + addrs, params, config = _parse_connect_arguments(timeout=connect_timeout, **kwargs) random.shuffle(addrs) last_error = ConnectionError(f"Can't connect to all hosts {addrs}") addr = None for addr in addrs: + if timeout <= 0: + raise ConnectionError("Timeout") before = time.monotonic() try: return await _connect_addr(