From 0f74b9ef695cbddf73be972322a20e5cc05f898b Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Wed, 2 Nov 2022 09:12:12 +0100 Subject: [PATCH 1/7] Logging overhaul * Add more logging to the pool when acquiring and releasing connections. * Improved log format: * All debug messages should not start with `[#hhhh] x:` where `hhhh` is the hex representation of the local port number of the connection or 0000 if there is none. `x` is one of `C` if it's the driver writing/opening/closing onto the socket, `S` if it's the driver reading/receiving an error from the socket and `_` if it's not communication between the server and the driver. * Reduced logging noise in routing table. * API docs: explain how to manually log the async task id. * Logging helpers `neo4j.debug`: * Add config options to decide if thread id and/or task id should be logged. * Their docs make clear that the logging format is not following semver. * Changed logging format to accommodate the new config options. --- docs/source/api.rst | 13 ++- docs/source/async_api.rst | 49 +++++++++++ neo4j/_async/io/_bolt.py | 11 ++- neo4j/_async/io/_bolt3.py | 2 +- neo4j/_async/io/_bolt4.py | 5 +- neo4j/_async/io/_bolt5.py | 5 +- neo4j/_async/io/_pool.py | 77 ++++++++++------ neo4j/_async/work/session.py | 7 +- neo4j/_async/work/workspace.py | 6 +- neo4j/_async_compat/network/_bolt_socket.py | 12 +-- neo4j/_async_compat/network/_util.py | 16 +++- neo4j/_routing.py | 20 +++-- neo4j/_sync/io/_bolt.py | 11 ++- neo4j/_sync/io/_bolt3.py | 2 +- neo4j/_sync/io/_bolt4.py | 5 +- neo4j/_sync/io/_bolt5.py | 5 +- neo4j/_sync/io/_pool.py | 77 ++++++++++------ neo4j/_sync/work/session.py | 7 +- neo4j/_sync/work/workspace.py | 6 +- neo4j/debug.py | 97 ++++++++++++++++++--- 20 files changed, 324 insertions(+), 109 deletions(-) diff --git a/docs/source/api.rst b/docs/source/api.rst index fb168945..220991b9 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -1462,6 +1462,8 @@ following code: ... +.. _logging-ref: + ******* Logging ******* @@ -1472,6 +1474,11 @@ not able to connect to the database server or if undesired behavior is observed. There are different ways of enabling logging as listed below. +.. note:: + + For an improved logging experience with the async driver, please see + :ref:`async-logging-ref`. + Simple Approach =============== @@ -1484,8 +1491,8 @@ Context Manager :members: :special-members: __enter__, __exit__ -Full Controll -============= +Full Control +============ .. code-block:: python @@ -1502,7 +1509,7 @@ Full Controll logging.getLogger("neo4j").addHandler(handler) # make sure the logger logs on the desired log level logging.getLogger("neo4j").setLevel(logging.DEBUG) - # from now on, DEBUG logging to stderr is enabled in the driver + # from now on, DEBUG logging to stdout is enabled in the driver ********* diff --git a/docs/source/async_api.rst b/docs/source/async_api.rst index c39e7b77..0dbd2598 100644 --- a/docs/source/async_api.rst +++ b/docs/source/async_api.rst @@ -648,3 +648,52 @@ successfully executed on the server side or not, when a cancellation happens: ``await transaction.commit()`` and other methods can throw :exc:`asyncio.CancelledError` but still have managed to complete from the server's perspective. + + +.. _async-logging-ref: + +************* +Async Logging +************* + +For the most parts, logging works the same way as in the synchronous driver. +See :ref:`logging-ref` for more information. + +However, when following the manual approach to logging, it is recommended to +include information about the current async task in the log record. +Like so: + +.. code-block:: python + + import asyncio + import logging + import sys + + class TaskIdFilter(logging.Filter): + """Injecting async task id into log records.""" + + def filter(self, record): + try: + record.taskId = id(asyncio.current_task()) + except RuntimeError: + record.taskId = None + return True + + + # attache the filter injecting the task id to the driver's logger + logging.getLogger("neo4j").addFilter(TaskIdFilter()) + + # create a handler, e.g. to log to stdout + handler = logging.StreamHandler(sys.stdout) + # configure the handler to your liking + handler.setFormatter(logging.Formatter( + "[%(levelname)-8s] [Task %(taskId)-15s] %(asctime)s %(message)s" + # or when using threading AND asyncio + # "[%(levelname)-8s] [Thread %(thread)d] [Task %(taskId)-15s] " + # "%(asctime)s %(message)s" + )) + # add the handler to the driver's logger + logging.getLogger("neo4j").addHandler(handler) + # make sure the logger logs on the desired log level + logging.getLogger("neo4j").setLevel(logging.DEBUG) + # from now on, DEBUG logging to stdout is enabled in the driver diff --git a/neo4j/_async/io/_bolt.py b/neo4j/_async/io/_bolt.py index a83e58a6..6215506f 100644 --- a/neo4j/_async/io/_bolt.py +++ b/neo4j/_async/io/_bolt.py @@ -160,6 +160,10 @@ def __del__(self): if not asyncio.iscoroutinefunction(self.close): self.close() + @property + def connection_id(self): + return self.server_info._metadata.get("connection_id", "") + @property @abc.abstractmethod def supports_multiple_results(self): @@ -689,7 +693,8 @@ async def _set_defunct(self, message, error=None, silent=False): user_cancelled = isinstance(error, asyncio.CancelledError) if error: - log.debug("[#%04X] %r", self.local_port, error) + log.debug("[#%04X] _: error: %r", self.local_port, + error) if not user_cancelled: log.error(message) # We were attempting to receive data but the connection @@ -751,7 +756,7 @@ async def close(self): try: await self._send_all() except (OSError, BoltError, DriverError) as exc: - log.debug("[#%04X] ignoring failed close %r", + log.debug("[#%04X] _: ignoring failed close %r", self.local_port, exc) log.debug("[#%04X] C: ", self.local_port) try: @@ -770,7 +775,7 @@ def kill(self): try: self.socket.kill() except OSError as exc: - log.debug("[#%04X] ignoring failed kill %r", + log.debug("[#%04X] _: ignoring failed kill %r", self.local_port, exc) finally: self._closed = True diff --git a/neo4j/_async/io/_bolt3.py b/neo4j/_async/io/_bolt3.py index 072f7417..41835810 100644 --- a/neo4j/_async/io/_bolt3.py +++ b/neo4j/_async/io/_bolt3.py @@ -111,7 +111,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property diff --git a/neo4j/_async/io/_bolt4.py b/neo4j/_async/io/_bolt4.py index f82cf706..d523659b 100644 --- a/neo4j/_async/io/_bolt4.py +++ b/neo4j/_async/io/_bolt4.py @@ -69,7 +69,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property @@ -426,7 +426,8 @@ def on_success(metadata): if isinstance(recv_timeout, int) and recv_timeout > 0: self.socket.settimeout(recv_timeout) else: - log.info("[#%04X] Server supplied an invalid value for " + log.info("[#%04X] _: Server supplied an " + "invalid value for " "connection.recv_timeout_seconds (%r). Make sure " "the server and network is set up correctly.", self.local_port, recv_timeout) diff --git a/neo4j/_async/io/_bolt5.py b/neo4j/_async/io/_bolt5.py index aa399e8a..664f133c 100644 --- a/neo4j/_async/io/_bolt5.py +++ b/neo4j/_async/io/_bolt5.py @@ -66,7 +66,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property @@ -104,7 +104,8 @@ def on_success(metadata): if isinstance(recv_timeout, int) and recv_timeout > 0: self.socket.settimeout(recv_timeout) else: - log.info("[#%04X] Server supplied an invalid value for " + log.info("[#%04X] _: Server supplied an " + "invalid value for " "connection.recv_timeout_seconds (%r). Make sure " "the server and network is set up correctly.", self.local_port, recv_timeout) diff --git a/neo4j/_async/io/_pool.py b/neo4j/_async/io/_pool.py index f61ebd46..6d7cbe54 100644 --- a/neo4j/_async/io/_pool.py +++ b/neo4j/_async/io/_pool.py @@ -110,9 +110,9 @@ async def _acquire_from_pool_checked( # `stale` but still alive. if log.isEnabledFor(logging.DEBUG): log.debug( - "[#%04X] C: removing old connection " + "[#%04X] _: removing old connection %s " "(closed=%s, defunct=%s, stale=%s, in_use=%s)", - connection.local_port, + connection.local_port, connection.connection_id, connection.closed(), connection.defunct(), connection.stale(), connection.in_use ) @@ -182,7 +182,7 @@ async def health_check(connection_, deadline_): if connection_.is_idle_for(liveness_check_timeout): with connection_deadline(connection_, deadline_): try: - log.debug("[#%04X] C: ", + log.debug("[#%04X] _: liveness check", connection_.local_port) await connection_.reset() except (OSError, ServiceUnavailable, SessionExpired): @@ -195,6 +195,9 @@ async def health_check(connection_, deadline_): address, health_check, deadline ) if connection: + log.debug("[#%04X] _: handing out existing connection " + "%s", connection.local_port, + connection.connection_id) return connection # all connections in pool are in-use with self.lock: @@ -209,10 +212,12 @@ async def health_check(connection_, deadline_): timeout == 0 # deadline expired or not await self.cond.wait(timeout) ): + log.debug("[#0000] _: acquisition timed out") raise ClientError( "failed to obtain a connection from the pool within " "{!r}s (timeout)".format(deadline.original_timeout) ) + log.debug("[#0000] _: trying to hand out new connection") return await connection_creator() @abc.abstractmethod @@ -239,8 +244,8 @@ def kill_and_release(self, *connections): if not (connection.defunct() or connection.closed()): log.debug( - "[#%04X] C: killing connection on release", - connection.local_port + "[#%04X] _: killing connection on release %s", + connection.local_port, connection.connection_id ) connection.kill() with self.lock: @@ -260,26 +265,32 @@ async def release(self, *connections): or connection.is_reset): if cancelled is not None: log.debug( - "[#%04X] C: released unclean connection", - connection.local_port + "[#%04X] _: released unclean connection %s", + connection.local_port, connection.connection_id ) connection.kill() continue try: log.debug( - "[#%04X] C: released unclean connection", - connection.local_port + "[#%04X] _: released unclean connection %s", + connection.local_port, connection.connection_id ) await connection.reset() except (Neo4jError, DriverError, BoltError) as e: - log.debug("Failed to reset connection on release: %r", e) + log.debug("[#%04X] _: failed to reset connection " + "on release: %r", connection.local_port, e) except asyncio.CancelledError as e: - log.debug("Cancelled reset connection on release: %r", e) + log.debug("[#%04X] _: cancelled reset connection " + "on release: %r", connection.local_port, e) cancelled = e connection.kill() with self.lock: for connection in connections: connection.in_use = False + log.debug( + "[#%04X] _: released %s", + connection.local_port, connection.connection_id + ) self.cond.notify_all() if cancelled is not None: raise cancelled @@ -349,7 +360,7 @@ async def close(self): """ Close all connections and empty the pool. This method is thread safe. """ - log.debug("[#0000] C: close") + log.debug("[#0000] _: close") try: connections = [] with self.lock: @@ -381,6 +392,7 @@ async def opener(addr, timeout): ) pool = cls(opener, pool_config, workspace_config, address) + log.debug("[#0000] _: created, direct address %r", address) return pool def __init__(self, opener, pool_config, workspace_config, address): @@ -396,6 +408,7 @@ async def acquire( ): # The access_mode and database is not needed for a direct connection, # it's just there for consistency. + log.debug("[#0000] _: acquire direct connection") deadline = Deadline.from_timeout_or_deadline(timeout) return await self._acquire( self.address, deadline, liveness_check_timeout @@ -433,6 +446,7 @@ async def opener(addr, timeout): ) pool = cls(opener, pool_config, workspace_config, address) + log.debug("[#0000] _: created, routing address %r", address) return pool def __init__(self, opener, pool_config, workspace_config, address): @@ -445,7 +459,6 @@ def __init__(self, opener, pool_config, workspace_config, address): """ super().__init__(opener, pool_config, workspace_config) # Each database have a routing table, the default database is a special case. - log.debug("[#0000] C: routing address %r", address) self.address = address self.routing_tables = {workspace_config.database: RoutingTable(database=workspace_config.database, routers=[address])} self.refresh_lock = AsyncRLock() @@ -552,7 +565,8 @@ async def fetch_routing_table( except (ServiceUnavailable, SessionExpired): pass if not new_routing_info: - log.debug("Failed to fetch routing info %s", address) + log.debug("[#0000] _: failed to fetch routing info " + "from %r", address) return None else: servers = new_routing_info[0]["servers"] @@ -572,12 +586,14 @@ async def fetch_routing_table( # No routers if num_routers == 0: - log.debug("No routing servers returned from server %s", address) + log.debug("[#0000] _: no routing servers returned from " + "server %s", address) return None # No readers if num_readers == 0: - log.debug("No read servers returned from server %s", address) + log.debug("[#0000] _: no read servers returned from " + "server %s", address) return None # At least one of each is fine, so return this table @@ -592,9 +608,9 @@ async def _update_routing_table_from( :return: True if the routing table is successfully updated, otherwise False """ - log.debug("Attempting to update routing table from {}".format( - ", ".join(map(repr, routers))) - ) + if routers: + log.debug("[#0000] _: attempting to update routing " + "table from {}".format(", ".join(map(repr, routers)))) for router in routers: async for address in AsyncNetworkUtil.resolve_address( router, resolver=self.pool_config.resolver @@ -610,7 +626,8 @@ async def _update_routing_table_from( ) old_routing_table.update(new_routing_table) log.debug( - "[#0000] C: address=%r (%r)", + "[#0000] _: update routing table from " + "address=%r (%r)", address, self.routing_tables[new_database] ) if callable(database_callback): @@ -720,9 +737,12 @@ async def ensure_routing_table_is_fresh( for database in list(self.routing_tables.keys()): # Remove unused databases in the routing table # Remove the routing table after a timeout = TTL + 30s - log.debug("[#0000] C: database=%s", database) + log.debug("[#0000] _: routing aged, database=%s", + database) if (self.routing_tables[database].should_be_purged_from_memory() and database != self.workspace_config.database): + log.debug("[#0000] _: dropping routing table for " + "database=%s", database) del self.routing_tables[database] return True @@ -761,10 +781,12 @@ async def acquire( raise ClientError("'timeout' must be a float larger than 0; {}" .format(timeout)) + log.debug("[#0000] _: acquire routing connection") + from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) async with self.refresh_lock: - log.debug("[#0000] C: %r", + log.debug("[#0000] _: routing table ensure fresh %r", self.routing_tables) await self.ensure_routing_table_is_fresh( access_mode=access_mode, database=database, imp_user=None, @@ -781,7 +803,8 @@ async def acquire( except (ReadServiceUnavailable, WriteServiceUnavailable) as err: raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err try: - log.debug("[#0000] C: database=%r address=%r", database, address) + log.debug("[#0000] _: acquire address, database=%r " + "address=%r", database, address) deadline = Deadline.from_timeout_or_deadline(timeout) # should always be a resolved address connection = await self._acquire( @@ -797,20 +820,20 @@ async def deactivate(self, address): if present, remove from the routing table and also closing all idle connections to that address. """ - log.debug("[#0000] C: Deactivating address %r", address) + log.debug("[#0000] _: deactivating address %r", address) # We use `discard` instead of `remove` here since the former # will not fail if the address has already been removed. for database in self.routing_tables.keys(): self.routing_tables[database].routers.discard(address) self.routing_tables[database].readers.discard(address) self.routing_tables[database].writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_tables) + log.debug("[#0000] _: table=%r", self.routing_tables) await super(AsyncNeo4jPool, self).deactivate(address) def on_write_failure(self, address): """ Remove a writer address from the routing table, if present. """ - log.debug("[#0000] C: Removing writer %r", address) + log.debug("[#0000] _: removing writer %r", address) for database in self.routing_tables.keys(): self.routing_tables[database].writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_tables) + log.debug("[#0000] _: table=%r", self.routing_tables) diff --git a/neo4j/_async/work/session.py b/neo4j/_async/work/session.py index 6d5d40ef..9356a737 100644 --- a/neo4j/_async/work/session.py +++ b/neo4j/_async/work/session.py @@ -136,11 +136,12 @@ def _handle_cancellation(self, message="General"): connection = self._connection self._connection = None if connection: - log.debug("[#%04X] %s cancellation clean-up", + log.debug("[#%04X] _: %s cancellation clean-up", connection.local_port, message) self._pool.kill_and_release(connection) else: - log.debug("[#0000] %s cancellation clean-up", message) + log.debug("[#0000] _: %s cancellation clean-up", + message) async def _result_closed(self): if self._auto_result: @@ -512,7 +513,7 @@ async def _run_transaction( try: await async_sleep(delay) except asyncio.CancelledError: - log.debug("[#0000] Retry cancelled") + log.debug("[#0000] _: retry cancelled") raise if errors: diff --git a/neo4j/_async/work/workspace.py b/neo4j/_async/work/workspace.py index 45f9e5dd..980a8aff 100644 --- a/neo4j/_async/work/workspace.py +++ b/neo4j/_async/work/workspace.py @@ -19,10 +19,10 @@ from __future__ import annotations import asyncio +import logging from ..._async_compat.util import AsyncUtil from ..._conf import WorkspaceConfig -from ..._deadline import Deadline from ..._meta import ( deprecation_warn, unclosed_resource_warn, @@ -36,6 +36,9 @@ from ..io import AsyncNeo4jPool +log = logging.getLogger("neo4j") + + class AsyncWorkspace: def __init__(self, pool, config): @@ -176,6 +179,7 @@ async def _connect(self, access_mode, **acquire_kwargs): # to try to fetch the home database. If provided by the server, # we shall use this database explicitly for all subsequent # actions within this session. + log.debug("[#0000] _: resolve home database") await self._pool.update_routing_table( database=self._config.database, imp_user=self._config.impersonated_user, diff --git a/neo4j/_async_compat/network/_bolt_socket.py b/neo4j/_async_compat/network/_bolt_socket.py index 3d7e6946..fee8cedf 100644 --- a/neo4j/_async_compat/network/_bolt_socket.py +++ b/neo4j/_async_compat/network/_bolt_socket.py @@ -238,7 +238,7 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl): return cls(reader, protocol, writer) except asyncio.TimeoutError: - log.debug("[#0000] C: %s", resolved_address) + log.debug("[#0000] S: %s", resolved_address) log.debug("[#0000] C: %s", resolved_address) if s: await cls.close_socket(s) @@ -246,7 +246,7 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl): "Timed out trying to establish connection to {!r}".format( resolved_address)) except asyncio.CancelledError: - log.debug("[#0000] C: %s", resolved_address) + log.debug("[#0000] S: %s", resolved_address) log.debug("[#0000] C: %s", resolved_address) if s: await cls.close_socket(s) @@ -260,7 +260,7 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl): address=(resolved_address.host_name, local_port) ) from error except OSError as error: - log.debug("[#0000] C: %s %s", type(error).__name__, + log.debug("[#0000] S: %s %s", type(error).__name__, " ".join(map(repr, error.args))) log.debug("[#0000] C: %s", resolved_address) if s: @@ -507,14 +507,14 @@ def _connect(cls, resolved_address, timeout, keep_alive): s.setsockopt(SOL_SOCKET, SO_KEEPALIVE, keep_alive) return s except SocketTimeout: - log.debug("[#0000] C: %s", resolved_address) + log.debug("[#0000] S: %s", resolved_address) log.debug("[#0000] C: %s", resolved_address) cls.close_socket(s) raise ServiceUnavailable( "Timed out trying to establish connection to {!r}".format( resolved_address)) except OSError as error: - log.debug("[#0000] C: %s %s", type(error).__name__, + log.debug("[#0000] S: %s %s", type(error).__name__, " ".join(map(repr, error.args))) log.debug("[#0000] C: %s", resolved_address) cls.close_socket(s) @@ -655,7 +655,7 @@ def connect(cls, address, *, timeout, custom_resolver, ssl_context, err_str = error.__class__.__name__ if str(error): err_str += ": " + str(error) - log.debug("[#%04X] C: %s", local_port, + log.debug("[#%04X] S: %s", local_port, err_str) if s: cls.close_socket(s) diff --git a/neo4j/_async_compat/network/_util.py b/neo4j/_async_compat/network/_util.py index f1d5ca4a..b1ad27e9 100644 --- a/neo4j/_async_compat/network/_util.py +++ b/neo4j/_async_compat/network/_util.py @@ -72,21 +72,27 @@ async def resolve_address(address, family=0, resolver=None): yield address return - log.debug("[#0000] C: %s", address) + log.debug("[#0000] _: in: %s", address) if resolver: if asyncio.iscoroutinefunction(resolver): resolved_addresses = await resolver(address) else: resolved_addresses = resolver(address) for address in map(addressing.Address, resolved_addresses): + log.debug("[#0000] _: custom resolver out: %s", + address) for resolved_address in await AsyncNetworkUtil._dns_resolver( address, family=family ): + log.debug("[#0000] _: dns resolver out: %s", + resolved_address) yield resolved_address else: for resolved_address in await AsyncNetworkUtil._dns_resolver( address, family=family ): + log.debug("[#0000] _: dns resolver out: %s", + resolved_address) yield resolved_address @@ -136,15 +142,21 @@ def resolve_address(address, family=0, resolver=None): yield address return - addressing.log.debug("[#0000] C: %s", address) + addressing.log.debug("[#0000] _: in: %s", address) if resolver: for address in map(addressing.Address, resolver(address)): + log.debug("[#0000] _: custom resolver out: %s", + address) for resolved_address in NetworkUtil._dns_resolver( address, family=family ): + log.debug("[#0000] _: dns resolver out: %s", + resolved_address) yield resolved_address else: for resolved_address in NetworkUtil._dns_resolver( address, family=family ): + log.debug("[#0000] _: dns resolver out: %s", + resolved_address) yield resolved_address diff --git a/neo4j/_routing.py b/neo4j/_routing.py index a073dda3..4e562635 100644 --- a/neo4j/_routing.py +++ b/neo4j/_routing.py @@ -129,16 +129,17 @@ def is_fresh(self, readonly=False): """ Indicator for whether routing information is still usable. """ assert isinstance(readonly, bool) - log.debug("[#0000] C: Checking table freshness (readonly=%r)", readonly) expired = self.last_updated_time + self.ttl <= perf_counter() if readonly: has_server_for_mode = bool(self.readers) else: has_server_for_mode = bool(self.writers) - log.debug("[#0000] C: Table expired=%r", expired) - log.debug("[#0000] C: Table routers=%r", self.routers) - log.debug("[#0000] C: Table has_server_for_mode=%r", has_server_for_mode) - return not expired and self.routers and has_server_for_mode + res = not expired and self.routers and has_server_for_mode + log.debug("[#0000] _: checking table freshness " + "(readonly=%r): table expired=%r, " + "has_server_for_mode=%r, table routers=%r => %r", + readonly, expired, has_server_for_mode, self.routers, res) + return res def should_be_purged_from_memory(self): """ Check if the routing table is stale and not used for a long time and should be removed from memory. @@ -148,8 +149,11 @@ def should_be_purged_from_memory(self): """ from neo4j._conf import RoutingConfig perf_time = perf_counter() - log.debug("[#0000] C: last_updated_time=%r perf_time=%r", self.last_updated_time, perf_time) - return self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time + res = self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time + log.debug("[#0000] _: purge check: " + "last_updated_time=%r, ttl=%r, perf_time=%r => %r", + self.last_updated_time, self.ttl, perf_time, res) + return res def update(self, new_routing_table): """ Update the current routing table with new routing information @@ -161,7 +165,7 @@ def update(self, new_routing_table): self.initialized_without_writers = not self.writers self.last_updated_time = perf_counter() self.ttl = new_routing_table.ttl - log.debug("[#0000] S: table=%r", self) + log.debug("[#0000] _: updated table=%r", self) def servers(self): return set(self.routers) | set(self.writers) | set(self.readers) diff --git a/neo4j/_sync/io/_bolt.py b/neo4j/_sync/io/_bolt.py index 93e8207a..175e63b1 100644 --- a/neo4j/_sync/io/_bolt.py +++ b/neo4j/_sync/io/_bolt.py @@ -160,6 +160,10 @@ def __del__(self): if not asyncio.iscoroutinefunction(self.close): self.close() + @property + def connection_id(self): + return self.server_info._metadata.get("connection_id", "") + @property @abc.abstractmethod def supports_multiple_results(self): @@ -689,7 +693,8 @@ def _set_defunct(self, message, error=None, silent=False): user_cancelled = isinstance(error, asyncio.CancelledError) if error: - log.debug("[#%04X] %r", self.local_port, error) + log.debug("[#%04X] _: error: %r", self.local_port, + error) if not user_cancelled: log.error(message) # We were attempting to receive data but the connection @@ -751,7 +756,7 @@ def close(self): try: self._send_all() except (OSError, BoltError, DriverError) as exc: - log.debug("[#%04X] ignoring failed close %r", + log.debug("[#%04X] _: ignoring failed close %r", self.local_port, exc) log.debug("[#%04X] C: ", self.local_port) try: @@ -770,7 +775,7 @@ def kill(self): try: self.socket.kill() except OSError as exc: - log.debug("[#%04X] ignoring failed kill %r", + log.debug("[#%04X] _: ignoring failed kill %r", self.local_port, exc) finally: self._closed = True diff --git a/neo4j/_sync/io/_bolt3.py b/neo4j/_sync/io/_bolt3.py index 2db6d561..ba24eefc 100644 --- a/neo4j/_sync/io/_bolt3.py +++ b/neo4j/_sync/io/_bolt3.py @@ -111,7 +111,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property diff --git a/neo4j/_sync/io/_bolt4.py b/neo4j/_sync/io/_bolt4.py index 60911526..2ff95eb0 100644 --- a/neo4j/_sync/io/_bolt4.py +++ b/neo4j/_sync/io/_bolt4.py @@ -69,7 +69,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property @@ -426,7 +426,8 @@ def on_success(metadata): if isinstance(recv_timeout, int) and recv_timeout > 0: self.socket.settimeout(recv_timeout) else: - log.info("[#%04X] Server supplied an invalid value for " + log.info("[#%04X] _: Server supplied an " + "invalid value for " "connection.recv_timeout_seconds (%r). Make sure " "the server and network is set up correctly.", self.local_port, recv_timeout) diff --git a/neo4j/_sync/io/_bolt5.py b/neo4j/_sync/io/_bolt5.py index a7180e5a..25283af6 100644 --- a/neo4j/_sync/io/_bolt5.py +++ b/neo4j/_sync/io/_bolt5.py @@ -66,7 +66,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property @@ -104,7 +104,8 @@ def on_success(metadata): if isinstance(recv_timeout, int) and recv_timeout > 0: self.socket.settimeout(recv_timeout) else: - log.info("[#%04X] Server supplied an invalid value for " + log.info("[#%04X] _: Server supplied an " + "invalid value for " "connection.recv_timeout_seconds (%r). Make sure " "the server and network is set up correctly.", self.local_port, recv_timeout) diff --git a/neo4j/_sync/io/_pool.py b/neo4j/_sync/io/_pool.py index 0a379009..491917d7 100644 --- a/neo4j/_sync/io/_pool.py +++ b/neo4j/_sync/io/_pool.py @@ -110,9 +110,9 @@ def _acquire_from_pool_checked( # `stale` but still alive. if log.isEnabledFor(logging.DEBUG): log.debug( - "[#%04X] C: removing old connection " + "[#%04X] _: removing old connection %s " "(closed=%s, defunct=%s, stale=%s, in_use=%s)", - connection.local_port, + connection.local_port, connection.connection_id, connection.closed(), connection.defunct(), connection.stale(), connection.in_use ) @@ -182,7 +182,7 @@ def health_check(connection_, deadline_): if connection_.is_idle_for(liveness_check_timeout): with connection_deadline(connection_, deadline_): try: - log.debug("[#%04X] C: ", + log.debug("[#%04X] _: liveness check", connection_.local_port) connection_.reset() except (OSError, ServiceUnavailable, SessionExpired): @@ -195,6 +195,9 @@ def health_check(connection_, deadline_): address, health_check, deadline ) if connection: + log.debug("[#%04X] _: handing out existing connection " + "%s", connection.local_port, + connection.connection_id) return connection # all connections in pool are in-use with self.lock: @@ -209,10 +212,12 @@ def health_check(connection_, deadline_): timeout == 0 # deadline expired or not self.cond.wait(timeout) ): + log.debug("[#0000] _: acquisition timed out") raise ClientError( "failed to obtain a connection from the pool within " "{!r}s (timeout)".format(deadline.original_timeout) ) + log.debug("[#0000] _: trying to hand out new connection") return connection_creator() @abc.abstractmethod @@ -239,8 +244,8 @@ def kill_and_release(self, *connections): if not (connection.defunct() or connection.closed()): log.debug( - "[#%04X] C: killing connection on release", - connection.local_port + "[#%04X] _: killing connection on release %s", + connection.local_port, connection.connection_id ) connection.kill() with self.lock: @@ -260,26 +265,32 @@ def release(self, *connections): or connection.is_reset): if cancelled is not None: log.debug( - "[#%04X] C: released unclean connection", - connection.local_port + "[#%04X] _: released unclean connection %s", + connection.local_port, connection.connection_id ) connection.kill() continue try: log.debug( - "[#%04X] C: released unclean connection", - connection.local_port + "[#%04X] _: released unclean connection %s", + connection.local_port, connection.connection_id ) connection.reset() except (Neo4jError, DriverError, BoltError) as e: - log.debug("Failed to reset connection on release: %r", e) + log.debug("[#%04X] _: failed to reset connection " + "on release: %r", connection.local_port, e) except asyncio.CancelledError as e: - log.debug("Cancelled reset connection on release: %r", e) + log.debug("[#%04X] _: cancelled reset connection " + "on release: %r", connection.local_port, e) cancelled = e connection.kill() with self.lock: for connection in connections: connection.in_use = False + log.debug( + "[#%04X] _: released %s", + connection.local_port, connection.connection_id + ) self.cond.notify_all() if cancelled is not None: raise cancelled @@ -349,7 +360,7 @@ def close(self): """ Close all connections and empty the pool. This method is thread safe. """ - log.debug("[#0000] C: close") + log.debug("[#0000] _: close") try: connections = [] with self.lock: @@ -381,6 +392,7 @@ def opener(addr, timeout): ) pool = cls(opener, pool_config, workspace_config, address) + log.debug("[#0000] _: created, direct address %r", address) return pool def __init__(self, opener, pool_config, workspace_config, address): @@ -396,6 +408,7 @@ def acquire( ): # The access_mode and database is not needed for a direct connection, # it's just there for consistency. + log.debug("[#0000] _: acquire direct connection") deadline = Deadline.from_timeout_or_deadline(timeout) return self._acquire( self.address, deadline, liveness_check_timeout @@ -433,6 +446,7 @@ def opener(addr, timeout): ) pool = cls(opener, pool_config, workspace_config, address) + log.debug("[#0000] _: created, routing address %r", address) return pool def __init__(self, opener, pool_config, workspace_config, address): @@ -445,7 +459,6 @@ def __init__(self, opener, pool_config, workspace_config, address): """ super().__init__(opener, pool_config, workspace_config) # Each database have a routing table, the default database is a special case. - log.debug("[#0000] C: routing address %r", address) self.address = address self.routing_tables = {workspace_config.database: RoutingTable(database=workspace_config.database, routers=[address])} self.refresh_lock = RLock() @@ -552,7 +565,8 @@ def fetch_routing_table( except (ServiceUnavailable, SessionExpired): pass if not new_routing_info: - log.debug("Failed to fetch routing info %s", address) + log.debug("[#0000] _: failed to fetch routing info " + "from %r", address) return None else: servers = new_routing_info[0]["servers"] @@ -572,12 +586,14 @@ def fetch_routing_table( # No routers if num_routers == 0: - log.debug("No routing servers returned from server %s", address) + log.debug("[#0000] _: no routing servers returned from " + "server %s", address) return None # No readers if num_readers == 0: - log.debug("No read servers returned from server %s", address) + log.debug("[#0000] _: no read servers returned from " + "server %s", address) return None # At least one of each is fine, so return this table @@ -592,9 +608,9 @@ def _update_routing_table_from( :return: True if the routing table is successfully updated, otherwise False """ - log.debug("Attempting to update routing table from {}".format( - ", ".join(map(repr, routers))) - ) + if routers: + log.debug("[#0000] _: attempting to update routing " + "table from {}".format(", ".join(map(repr, routers)))) for router in routers: for address in NetworkUtil.resolve_address( router, resolver=self.pool_config.resolver @@ -610,7 +626,8 @@ def _update_routing_table_from( ) old_routing_table.update(new_routing_table) log.debug( - "[#0000] C: address=%r (%r)", + "[#0000] _: update routing table from " + "address=%r (%r)", address, self.routing_tables[new_database] ) if callable(database_callback): @@ -720,9 +737,12 @@ def ensure_routing_table_is_fresh( for database in list(self.routing_tables.keys()): # Remove unused databases in the routing table # Remove the routing table after a timeout = TTL + 30s - log.debug("[#0000] C: database=%s", database) + log.debug("[#0000] _: routing aged, database=%s", + database) if (self.routing_tables[database].should_be_purged_from_memory() and database != self.workspace_config.database): + log.debug("[#0000] _: dropping routing table for " + "database=%s", database) del self.routing_tables[database] return True @@ -761,10 +781,12 @@ def acquire( raise ClientError("'timeout' must be a float larger than 0; {}" .format(timeout)) + log.debug("[#0000] _: acquire routing connection") + from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) with self.refresh_lock: - log.debug("[#0000] C: %r", + log.debug("[#0000] _: routing table ensure fresh %r", self.routing_tables) self.ensure_routing_table_is_fresh( access_mode=access_mode, database=database, imp_user=None, @@ -781,7 +803,8 @@ def acquire( except (ReadServiceUnavailable, WriteServiceUnavailable) as err: raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err try: - log.debug("[#0000] C: database=%r address=%r", database, address) + log.debug("[#0000] _: acquire address, database=%r " + "address=%r", database, address) deadline = Deadline.from_timeout_or_deadline(timeout) # should always be a resolved address connection = self._acquire( @@ -797,20 +820,20 @@ def deactivate(self, address): if present, remove from the routing table and also closing all idle connections to that address. """ - log.debug("[#0000] C: Deactivating address %r", address) + log.debug("[#0000] _: deactivating address %r", address) # We use `discard` instead of `remove` here since the former # will not fail if the address has already been removed. for database in self.routing_tables.keys(): self.routing_tables[database].routers.discard(address) self.routing_tables[database].readers.discard(address) self.routing_tables[database].writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_tables) + log.debug("[#0000] _: table=%r", self.routing_tables) super(Neo4jPool, self).deactivate(address) def on_write_failure(self, address): """ Remove a writer address from the routing table, if present. """ - log.debug("[#0000] C: Removing writer %r", address) + log.debug("[#0000] _: removing writer %r", address) for database in self.routing_tables.keys(): self.routing_tables[database].writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_tables) + log.debug("[#0000] _: table=%r", self.routing_tables) diff --git a/neo4j/_sync/work/session.py b/neo4j/_sync/work/session.py index 05f34d30..078b1957 100644 --- a/neo4j/_sync/work/session.py +++ b/neo4j/_sync/work/session.py @@ -136,11 +136,12 @@ def _handle_cancellation(self, message="General"): connection = self._connection self._connection = None if connection: - log.debug("[#%04X] %s cancellation clean-up", + log.debug("[#%04X] _: %s cancellation clean-up", connection.local_port, message) self._pool.kill_and_release(connection) else: - log.debug("[#0000] %s cancellation clean-up", message) + log.debug("[#0000] _: %s cancellation clean-up", + message) def _result_closed(self): if self._auto_result: @@ -512,7 +513,7 @@ def _run_transaction( try: sleep(delay) except asyncio.CancelledError: - log.debug("[#0000] Retry cancelled") + log.debug("[#0000] _: retry cancelled") raise if errors: diff --git a/neo4j/_sync/work/workspace.py b/neo4j/_sync/work/workspace.py index 844bb96c..df885c45 100644 --- a/neo4j/_sync/work/workspace.py +++ b/neo4j/_sync/work/workspace.py @@ -19,10 +19,10 @@ from __future__ import annotations import asyncio +import logging from ..._async_compat.util import Util from ..._conf import WorkspaceConfig -from ..._deadline import Deadline from ..._meta import ( deprecation_warn, unclosed_resource_warn, @@ -36,6 +36,9 @@ from ..io import Neo4jPool +log = logging.getLogger("neo4j") + + class Workspace: def __init__(self, pool, config): @@ -176,6 +179,7 @@ def _connect(self, access_mode, **acquire_kwargs): # to try to fetch the home database. If provided by the server, # we shall use this database explicitly for all subsequent # actions within this session. + log.debug("[#0000] _: resolve home database") self._pool.update_routing_table( database=self._config.database, imp_user=self._config.impersonated_user, diff --git a/neo4j/debug.py b/neo4j/debug.py index 8934e69a..a4017344 100644 --- a/neo4j/debug.py +++ b/neo4j/debug.py @@ -18,11 +18,13 @@ from __future__ import annotations +import asyncio import typing as t from logging import ( CRITICAL, DEBUG, ERROR, + Filter, Formatter, getLogger, INFO, @@ -43,7 +45,7 @@ class ColourFormatter(Formatter): """ def format(self, record): - s = super(ColourFormatter, self).format(record) + s = super().format(record) if record.levelno == CRITICAL: return "\x1b[31;1m%s\x1b[0m" % s # bright red elif record.levelno == ERROR: @@ -58,6 +60,17 @@ def format(self, record): return s +class TaskIdFilter(Filter): + """Injecting async task id into log records.""" + + def filter(self, record): + try: + record.taskId = id(asyncio.current_task()) + except RuntimeError: + record.taskId = None + return True + + class Watcher: """Log watcher for easier logging setup. @@ -73,15 +86,31 @@ class Watcher: threads can lead to duplicate log messages as the context manager will enable logging for all threads. + .. note:: + The exact logging format is not part of the API contract and might + change at any time without notice. It is meant for debugging purposes + and human consumption only. + :param logger_names: Names of loggers to watch. :param default_level: Default minimum log level to show. - The level can be overridden by setting the level a level when calling + The level can be overridden by setting ``level`` when calling :meth:`.watch`. :param default_out: Default output stream for all loggers. - The level can be overridden by setting the level a level when calling + The level can be overridden by setting ``out`` when calling :meth:`.watch`. + :type default_out: stream or file-like object :param colour: Whether the log levels should be indicated with ANSI colour codes. + :param thread_info: whether to include information about the current + thread in the log message. Defaults to :const:`True`. + :param task_info: whether to include information about the current + async task in the log message. Defaults to :const:`True`. + + .. versionchanged:: + 5.3 + + * Added ``thread_info`` and ``task_info`` parameters. + * Logging format around thread and task information changed. """ def __init__( @@ -89,7 +118,9 @@ def __init__( *logger_names: str, default_level: int = DEBUG, default_out: t.TextIO = stderr, - colour: bool = False + colour: bool = False, + thread_info: bool = True, + task_info: bool = True, ) -> None: super(Watcher, self).__init__() self.logger_names = logger_names @@ -97,8 +128,14 @@ def __init__( self.default_level = default_level self.default_out = default_out self._handlers: t.Dict[str, StreamHandler] = {} - - format_ = "%(threadName)s(%(thread)d) %(asctime)s %(message)s" + self._filters: t.Dict[str, TaskIdFilter] = {} + self._task_info = task_info + + format_ = "%(asctime)s %(message)s" + if task_info: + format_ = "[Task %(taskId)-15s] " + format_ + if thread_info: + format_ = "[Thread %(thread)d] " + format_ if not colour: format_ = "[%(levelname)-8s] " + format_ formatter_cls = ColourFormatter if colour else Formatter @@ -120,6 +157,7 @@ def watch(self, level: int = None, out: t.TextIO = None): If :const:`None`, the ``default_level`` is used. :param out: Output stream for all loggers. If :const:`None`, the ``default_out`` is used. + :type out: stream or file-like object """ if level is None: level = self.default_level @@ -128,7 +166,12 @@ def watch(self, level: int = None, out: t.TextIO = None): self.stop() handler = StreamHandler(out) handler.setFormatter(self.formatter) + if self._task_info: + filter_ = TaskIdFilter() for logger in self. _loggers: + if self._task_info: + self._filters[logger.name] = filter_ + logger.addFilter(filter_) self._handlers[logger.name] = handler logger.addHandler(handler) logger.setLevel(level) @@ -140,13 +183,20 @@ def stop(self) -> None: logger.removeHandler(self._handlers.pop(logger.name)) except KeyError: pass + if self._task_info: + try: + logger.removeFilter(self._filters.pop(logger.name)) + except KeyError: + pass def watch( *logger_names: str, level: int = DEBUG, out: t.TextIO = stderr, - colour: bool = False + colour: bool = False, + thread_info: bool = True, + task_info: bool = True, ) -> Watcher: """Quick wrapper for using :class:`.Watcher`. @@ -160,19 +210,42 @@ def watch( watch("neo4j") # from now on, DEBUG logging to stderr is enabled in the driver + .. note:: + The exact logging format is not part of the API contract and might + change at any time without notice. It is meant for debugging purposes + and human consumption only. + :param logger_names: Names of loggers to watch. - :type logger_names: str :param level: see ``default_level`` of :class:`.Watcher`. - :type level: int :param out: see ``default_out`` of :class:`.Watcher`. :type out: stream or file-like object :param colour: see ``colour`` of :class:`.Watcher`. - :type colour: bool + :param thread_info: see ``thread_info`` of :class:`.Watcher`. + :param task_info: see ``task_info`` of :class:`.Watcher`. :return: Watcher instance :rtype: :class:`.Watcher` + + .. versionchanged:: + 5.3 + + * Added ``thread_info`` and ``task_info`` parameters. + * Logging format around thread and task information changed. """ - watcher = Watcher(*logger_names, colour=colour, default_level=level, - default_out=out) + watcher = Watcher(*logger_names, default_level=level, default_out=out, + colour=colour, thread_info=thread_info, + task_info=task_info) watcher.watch() return watcher + + +class Connection: + def connect(self): + self.hello() # buffer HELLO message + self.logon() # buffer LOGON message + self.send_and_receive() # send HELLO and LOGON, receive 2x SUCCESS + + def reauth(self): + self.logoff() # buffer LOGOFF message + self.logon() # buffer LOGON message + self.send_and_receive() # send LOGOFF and LOGON, receive 2x SUCCESS From 9c1da19b0b35aab8cf9d0e9459cdb886c860557f Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Wed, 2 Nov 2022 12:37:05 +0100 Subject: [PATCH 2/7] Adjust unit tests --- neo4j/debug.py | 6 +- tests/unit/async_/io/test_direct.py | 1 + tests/unit/common/test_debug.py | 109 ++++++++++++++++++++++------ tests/unit/sync/io/test_direct.py | 1 + 4 files changed, 90 insertions(+), 27 deletions(-) diff --git a/neo4j/debug.py b/neo4j/debug.py index a4017344..9d92b2c5 100644 --- a/neo4j/debug.py +++ b/neo4j/debug.py @@ -65,9 +65,9 @@ class TaskIdFilter(Filter): def filter(self, record): try: - record.taskId = id(asyncio.current_task()) + record.task = id(asyncio.current_task()) except RuntimeError: - record.taskId = None + record.task = None return True @@ -133,7 +133,7 @@ def __init__( format_ = "%(asctime)s %(message)s" if task_info: - format_ = "[Task %(taskId)-15s] " + format_ + format_ = "[Task %(task)-15s] " + format_ if thread_info: format_ = "[Thread %(thread)d] " + format_ if not colour: diff --git a/tests/unit/async_/io/test_direct.py b/tests/unit/async_/io/test_direct.py index d8cfbf62..7729036e 100644 --- a/tests/unit/async_/io/test_direct.py +++ b/tests/unit/async_/io/test_direct.py @@ -53,6 +53,7 @@ def __init__(self, socket): self.socket = socket self.address = socket.getpeername() self.local_port = self.address[1] + self.connection_id = "bolt-1234" @property def is_reset(self): diff --git a/tests/unit/common/test_debug.py b/tests/unit/common/test_debug.py index b5350436..fe7d6a9b 100644 --- a/tests/unit/common/test_debug.py +++ b/tests/unit/common/test_debug.py @@ -18,6 +18,7 @@ from __future__ import annotations +import asyncio import io import logging import sys @@ -31,6 +32,8 @@ from neo4j import debug as neo4j_debug +from ..._async_compat import mark_async_test + if t.TYPE_CHECKING: @@ -40,11 +43,12 @@ def __call__(self, *args: str) -> t.Sequence[t.Any]: @pytest.fixture -def add_handler_mocker(mocker) -> _TSetupMockProtocol: +def logger_mocker(mocker) -> _TSetupMockProtocol: def setup_mock(*logger_names): loggers = [logging.getLogger(name) for name in logger_names] for logger in loggers: logger.addHandler = mocker.Mock() + logger.addFilter = mocker.Mock() logger.removeHandler = mocker.Mock() logger.setLevel = mocker.Mock() return loggers @@ -52,25 +56,25 @@ def setup_mock(*logger_names): return setup_mock -def test_watch_returns_watcher(add_handler_mocker) -> None: +def test_watch_returns_watcher(logger_mocker) -> None: logger_name = "neo4j" - add_handler_mocker(logger_name) + logger_mocker(logger_name) watcher = neo4j_debug.watch(logger_name) assert isinstance(watcher, neo4j_debug.Watcher) @pytest.mark.parametrize("logger_names", (("neo4j",), ("foobar",), ("neo4j", "foobar"))) -def test_watch_enables_logging(logger_names, add_handler_mocker) -> None: - loggers = add_handler_mocker(*logger_names) +def test_watch_enables_logging(logger_names, logger_mocker) -> None: + loggers = logger_mocker(*logger_names) neo4j_debug.watch(*logger_names) for logger in loggers: logger.addHandler.assert_called_once() -def test_watcher_watch_adds_logger(add_handler_mocker) -> None: +def test_watcher_watch_adds_logger(logger_mocker) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] + logger = logger_mocker(logger_name)[0] watcher = neo4j_debug.Watcher(logger_name) logger.addHandler.assert_not_called() @@ -78,9 +82,9 @@ def test_watcher_watch_adds_logger(add_handler_mocker) -> None: logger.addHandler.assert_called_once() -def test_watcher_stop_removes_logger(add_handler_mocker) -> None: +def test_watcher_stop_removes_logger(logger_mocker) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] + logger = logger_mocker(logger_name)[0] watcher = neo4j_debug.Watcher(logger_name) watcher.watch() @@ -115,10 +119,10 @@ def test_watcher_context_manager(mocker) -> None: ) ) def test_watcher_level( - add_handler_mocker, default_level, level, expected_level + logger_mocker, default_level, level, expected_level ) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] + logger = logger_mocker(logger_name)[0] kwargs = {} if default_level is not None: kwargs["default_level"] = default_level @@ -147,10 +151,10 @@ def test_watcher_level( ) ) def test_watcher_out( - add_handler_mocker, default_out, out, expected_out + logger_mocker, default_out, out, expected_out ) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] + logger = logger_mocker(logger_name)[0] kwargs = {} if default_out is not None: kwargs["default_out"] = default_out @@ -166,10 +170,13 @@ def test_watcher_out( @pytest.mark.parametrize("colour", (True, False)) -def test_watcher_colour(add_handler_mocker, colour) -> None: +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +def test_watcher_colour(logger_mocker, colour, thread, task) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] - watcher = neo4j_debug.Watcher(logger_name, colour=colour) + logger = logger_mocker(logger_name)[0] + watcher = neo4j_debug.Watcher(logger_name, colour=colour, + thread_info=thread, task_info=task) watcher.watch() (handler,), _ = logger.addHandler.call_args @@ -182,19 +189,73 @@ def test_watcher_colour(add_handler_mocker, colour) -> None: @pytest.mark.parametrize("colour", (True, False)) -def test_watcher_format(add_handler_mocker, colour) -> None: +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +def test_watcher_format(logger_mocker, colour, thread, task) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] - watcher = neo4j_debug.Watcher(logger_name, colour=colour) + logger = logger_mocker(logger_name)[0] + watcher = neo4j_debug.Watcher(logger_name, colour=colour, + thread_info=thread, task_info=task) watcher.watch() (handler,), _ = logger.addHandler.call_args assert isinstance(handler, logging.Handler) assert isinstance(handler.formatter, logging.Formatter) + expected_format = "%(asctime)s %(message)s" + if task: + expected_format = "[Task %(task)-15s] " + expected_format + if thread: + expected_format = "[Thread %(thread)d] " + expected_format + if not colour: + expected_format = "[%(levelname)-8s] " + expected_format # Don't look at me like that. It's just for testing. - format = handler.formatter._fmt - if colour: - assert format == "%(threadName)s(%(thread)d) %(asctime)s %(message)s" + format_ = handler.formatter._fmt + assert format_ == expected_format + + +@pytest.mark.parametrize("colour", (True, False)) +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +def test_watcher_task_injection( + mocker, logger_mocker, colour, thread, task +) -> None: + logger_name = "neo4j" + logger = logger_mocker(logger_name)[0] + watcher = neo4j_debug.Watcher(logger_name, colour=colour, + thread_info=thread, task_info=task) + record_mock = mocker.Mock(spec=logging.LogRecord) + assert not hasattr(record_mock, "task") + + watcher.watch() + + if task: + (filter_,), _ = logger.addFilter.call_args + assert isinstance(filter_, logging.Filter) + assert record_mock.task is None + else: + logger.addFilter.assert_not_called() + + +@pytest.mark.parametrize("colour", (True, False)) +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +@mark_async_test +async def test_async_watcher_task_injection( + mocker, logger_mocker, colour, thread, task +) -> None: + logger_name = "neo4j" + logger = logger_mocker(logger_name)[0] + watcher = neo4j_debug.Watcher(logger_name, colour=colour, + thread_info=thread, task_info=task) + record_mock = mocker.Mock(spec=logging.LogRecord) + assert not hasattr(record_mock, "task") + + watcher.watch() + + if task: + (filter_,), _ = logger.addFilter.call_args + assert isinstance(filter_, logging.Filter) + filter_.filter(record_mock) + assert record_mock.task == id(asyncio.current_task()) else: - assert format == "[%(levelname)-8s] " \ - "%(threadName)s(%(thread)d) %(asctime)s %(message)s" + logger.addFilter.assert_not_called() diff --git a/tests/unit/sync/io/test_direct.py b/tests/unit/sync/io/test_direct.py index 70c5478d..d54dfd73 100644 --- a/tests/unit/sync/io/test_direct.py +++ b/tests/unit/sync/io/test_direct.py @@ -53,6 +53,7 @@ def __init__(self, socket): self.socket = socket self.address = socket.getpeername() self.local_port = self.address[1] + self.connection_id = "bolt-1234" @property def is_reset(self): From 988d75563bd6cdc6091b5ebbf82aa997b1ae931c Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Wed, 2 Nov 2022 16:27:44 +0100 Subject: [PATCH 3/7] Fix missing call in unit test --- tests/unit/common/test_debug.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/common/test_debug.py b/tests/unit/common/test_debug.py index fe7d6a9b..37429963 100644 --- a/tests/unit/common/test_debug.py +++ b/tests/unit/common/test_debug.py @@ -48,7 +48,7 @@ def setup_mock(*logger_names): loggers = [logging.getLogger(name) for name in logger_names] for logger in loggers: logger.addHandler = mocker.Mock() - logger.addFilter = mocker.Mock() + logger.addFilter = mocker.Mock(side_effect=logger.addFilter) logger.removeHandler = mocker.Mock() logger.setLevel = mocker.Mock() return loggers @@ -231,6 +231,7 @@ def test_watcher_task_injection( if task: (filter_,), _ = logger.addFilter.call_args assert isinstance(filter_, logging.Filter) + filter_.filter(record_mock) assert record_mock.task is None else: logger.addFilter.assert_not_called() From d9520e10793794a70f02e3ba0a981080e3db8e12 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 4 Nov 2022 10:55:51 +0100 Subject: [PATCH 4/7] Pool: log db and access mode on acquire --- neo4j/_async/io/_pool.py | 6 ++++-- neo4j/_sync/io/_pool.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/neo4j/_async/io/_pool.py b/neo4j/_async/io/_pool.py index 6d7cbe54..bf7c149e 100644 --- a/neo4j/_async/io/_pool.py +++ b/neo4j/_async/io/_pool.py @@ -408,7 +408,8 @@ async def acquire( ): # The access_mode and database is not needed for a direct connection, # it's just there for consistency. - log.debug("[#0000] _: acquire direct connection") + log.debug("[#0000] _: acquire direct connection, " + "access_mode=%r, database=%r", access_mode, database) deadline = Deadline.from_timeout_or_deadline(timeout) return await self._acquire( self.address, deadline, liveness_check_timeout @@ -781,7 +782,8 @@ async def acquire( raise ClientError("'timeout' must be a float larger than 0; {}" .format(timeout)) - log.debug("[#0000] _: acquire routing connection") + log.debug("[#0000] _: acquire routing connection, " + "access_mode=%r, database=%r", access_mode, database) from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) diff --git a/neo4j/_sync/io/_pool.py b/neo4j/_sync/io/_pool.py index 491917d7..a8e18440 100644 --- a/neo4j/_sync/io/_pool.py +++ b/neo4j/_sync/io/_pool.py @@ -408,7 +408,8 @@ def acquire( ): # The access_mode and database is not needed for a direct connection, # it's just there for consistency. - log.debug("[#0000] _: acquire direct connection") + log.debug("[#0000] _: acquire direct connection, " + "access_mode=%r, database=%r", access_mode, database) deadline = Deadline.from_timeout_or_deadline(timeout) return self._acquire( self.address, deadline, liveness_check_timeout @@ -781,7 +782,8 @@ def acquire( raise ClientError("'timeout' must be a float larger than 0; {}" .format(timeout)) - log.debug("[#0000] _: acquire routing connection") + log.debug("[#0000] _: acquire routing connection, " + "access_mode=%r, database=%r", access_mode, database) from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) From 3f7a6c70e7b918dc58679c877761accbd83c0017 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 4 Nov 2022 11:14:05 +0100 Subject: [PATCH 5/7] routing pool: log acquiring connection to router --- neo4j/_async/io/_pool.py | 2 ++ neo4j/_sync/io/_pool.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/neo4j/_async/io/_pool.py b/neo4j/_async/io/_pool.py index bf7c149e..f45c741b 100644 --- a/neo4j/_async/io/_pool.py +++ b/neo4j/_async/io/_pool.py @@ -524,6 +524,8 @@ async def fetch_routing_info( routing, or if routing support is broken or outdated """ deadline = Deadline.from_timeout_or_deadline(acquisition_timeout) + log.debug("[#0000] _: _acquire router connection, " + "database=%r, address=%r", database, address) cx = await self._acquire(address, deadline, None) try: routing_table = await cx.route( diff --git a/neo4j/_sync/io/_pool.py b/neo4j/_sync/io/_pool.py index a8e18440..59a09109 100644 --- a/neo4j/_sync/io/_pool.py +++ b/neo4j/_sync/io/_pool.py @@ -524,6 +524,8 @@ def fetch_routing_info( routing, or if routing support is broken or outdated """ deadline = Deadline.from_timeout_or_deadline(acquisition_timeout) + log.debug("[#0000] _: _acquire router connection, " + "database=%r, address=%r", database, address) cx = self._acquire(address, deadline, None) try: routing_table = cx.route( From 7281bfacf517c7d9b9318ea7f654ebd0a2ae3264 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 4 Nov 2022 19:37:20 +0100 Subject: [PATCH 6/7] Improve docs, logging helper, and type hints --- docs/source/async_api.rst | 5 ++--- neo4j/debug.py | 15 +++------------ 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/docs/source/async_api.rst b/docs/source/async_api.rst index 0dbd2598..f07ca72c 100644 --- a/docs/source/async_api.rst +++ b/docs/source/async_api.rst @@ -680,9 +680,6 @@ Like so: return True - # attache the filter injecting the task id to the driver's logger - logging.getLogger("neo4j").addFilter(TaskIdFilter()) - # create a handler, e.g. to log to stdout handler = logging.StreamHandler(sys.stdout) # configure the handler to your liking @@ -692,6 +689,8 @@ Like so: # "[%(levelname)-8s] [Thread %(thread)d] [Task %(taskId)-15s] " # "%(asctime)s %(message)s" )) + # attache the filter injecting the task id to the handler + handler.addFilter(TaskIdFilter()) # add the handler to the driver's logger logging.getLogger("neo4j").addHandler(handler) # make sure the logger logs on the desired log level diff --git a/neo4j/debug.py b/neo4j/debug.py index 9d92b2c5..a670f316 100644 --- a/neo4j/debug.py +++ b/neo4j/debug.py @@ -115,7 +115,7 @@ class Watcher: def __init__( self, - *logger_names: str, + *logger_names: t.Optional[str], default_level: int = DEBUG, default_out: t.TextIO = stderr, colour: bool = False, @@ -128,7 +128,6 @@ def __init__( self.default_level = default_level self.default_out = default_out self._handlers: t.Dict[str, StreamHandler] = {} - self._filters: t.Dict[str, TaskIdFilter] = {} self._task_info = task_info format_ = "%(asctime)s %(message)s" @@ -167,11 +166,8 @@ def watch(self, level: int = None, out: t.TextIO = None): handler = StreamHandler(out) handler.setFormatter(self.formatter) if self._task_info: - filter_ = TaskIdFilter() + handler.addFilter(TaskIdFilter()) for logger in self. _loggers: - if self._task_info: - self._filters[logger.name] = filter_ - logger.addFilter(filter_) self._handlers[logger.name] = handler logger.addHandler(handler) logger.setLevel(level) @@ -183,15 +179,10 @@ def stop(self) -> None: logger.removeHandler(self._handlers.pop(logger.name)) except KeyError: pass - if self._task_info: - try: - logger.removeFilter(self._filters.pop(logger.name)) - except KeyError: - pass def watch( - *logger_names: str, + *logger_names: t.Optional[str], level: int = DEBUG, out: t.TextIO = stderr, colour: bool = False, From 69991fb5ed7be64301b6f79357102f825ad55c4d Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Mon, 7 Nov 2022 05:25:52 +0100 Subject: [PATCH 7/7] Adjust unit tests --- tests/unit/common/test_debug.py | 49 ++++++++++++++++----------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/tests/unit/common/test_debug.py b/tests/unit/common/test_debug.py index 37429963..7454148e 100644 --- a/tests/unit/common/test_debug.py +++ b/tests/unit/common/test_debug.py @@ -48,7 +48,6 @@ def setup_mock(*logger_names): loggers = [logging.getLogger(name) for name in logger_names] for logger in loggers: logger.addHandler = mocker.Mock() - logger.addFilter = mocker.Mock(side_effect=logger.addFilter) logger.removeHandler = mocker.Mock() logger.setLevel = mocker.Mock() return loggers @@ -179,6 +178,7 @@ def test_watcher_colour(logger_mocker, colour, thread, task) -> None: thread_info=thread, task_info=task) watcher.watch() + logger.addHandler.assert_called_once() (handler,), _ = logger.addHandler.call_args assert isinstance(handler, logging.Handler) assert isinstance(handler.formatter, logging.Formatter) @@ -198,6 +198,7 @@ def test_watcher_format(logger_mocker, colour, thread, task) -> None: thread_info=thread, task_info=task) watcher.watch() + logger.addHandler.assert_called_once() (handler,), _ = logger.addHandler.call_args assert isinstance(handler, logging.Handler) assert isinstance(handler.formatter, logging.Formatter) @@ -213,14 +214,12 @@ def test_watcher_format(logger_mocker, colour, thread, task) -> None: assert format_ == expected_format -@pytest.mark.parametrize("colour", (True, False)) -@pytest.mark.parametrize("thread", (True, False)) -@pytest.mark.parametrize("task", (True, False)) -def test_watcher_task_injection( - mocker, logger_mocker, colour, thread, task +def _assert_task_injection( + async_: bool, mocker, logger_mocker, colour: bool, thread: bool, task: bool ) -> None: + handler_cls_mock = mocker.patch("neo4j.debug.StreamHandler", autospec=True) + handler_mock = handler_cls_mock.return_value logger_name = "neo4j" - logger = logger_mocker(logger_name)[0] watcher = neo4j_debug.Watcher(logger_name, colour=colour, thread_info=thread, task_info=task) record_mock = mocker.Mock(spec=logging.LogRecord) @@ -229,34 +228,32 @@ def test_watcher_task_injection( watcher.watch() if task: - (filter_,), _ = logger.addFilter.call_args + handler_mock.addFilter.assert_called_once() + (filter_,), _ = handler_mock.addFilter.call_args assert isinstance(filter_, logging.Filter) filter_.filter(record_mock) - assert record_mock.task is None + if async_: + assert record_mock.task == id(asyncio.current_task()) + else: + assert record_mock.task is None else: - logger.addFilter.assert_not_called() + handler_mock.addFilter.assert_not_called() @pytest.mark.parametrize("colour", (True, False)) @pytest.mark.parametrize("thread", (True, False)) @pytest.mark.parametrize("task", (True, False)) -@mark_async_test -async def test_async_watcher_task_injection( +def test_watcher_task_injection( mocker, logger_mocker, colour, thread, task ) -> None: - logger_name = "neo4j" - logger = logger_mocker(logger_name)[0] - watcher = neo4j_debug.Watcher(logger_name, colour=colour, - thread_info=thread, task_info=task) - record_mock = mocker.Mock(spec=logging.LogRecord) - assert not hasattr(record_mock, "task") + _assert_task_injection(False, mocker, logger_mocker, colour, thread, task) - watcher.watch() - if task: - (filter_,), _ = logger.addFilter.call_args - assert isinstance(filter_, logging.Filter) - filter_.filter(record_mock) - assert record_mock.task == id(asyncio.current_task()) - else: - logger.addFilter.assert_not_called() +@pytest.mark.parametrize("colour", (True, False)) +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +@mark_async_test +async def test_async_watcher_task_injection( + mocker, logger_mocker, colour, thread, task +) -> None: + _assert_task_injection(True, mocker, logger_mocker, colour, thread, task)