diff --git a/docs/source/api.rst b/docs/source/api.rst index fb168945..220991b9 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -1462,6 +1462,8 @@ following code: ... +.. _logging-ref: + ******* Logging ******* @@ -1472,6 +1474,11 @@ not able to connect to the database server or if undesired behavior is observed. There are different ways of enabling logging as listed below. +.. note:: + + For an improved logging experience with the async driver, please see + :ref:`async-logging-ref`. + Simple Approach =============== @@ -1484,8 +1491,8 @@ Context Manager :members: :special-members: __enter__, __exit__ -Full Controll -============= +Full Control +============ .. code-block:: python @@ -1502,7 +1509,7 @@ Full Controll logging.getLogger("neo4j").addHandler(handler) # make sure the logger logs on the desired log level logging.getLogger("neo4j").setLevel(logging.DEBUG) - # from now on, DEBUG logging to stderr is enabled in the driver + # from now on, DEBUG logging to stdout is enabled in the driver ********* diff --git a/docs/source/async_api.rst b/docs/source/async_api.rst index c39e7b77..f07ca72c 100644 --- a/docs/source/async_api.rst +++ b/docs/source/async_api.rst @@ -648,3 +648,51 @@ successfully executed on the server side or not, when a cancellation happens: ``await transaction.commit()`` and other methods can throw :exc:`asyncio.CancelledError` but still have managed to complete from the server's perspective. + + +.. _async-logging-ref: + +************* +Async Logging +************* + +For the most parts, logging works the same way as in the synchronous driver. +See :ref:`logging-ref` for more information. + +However, when following the manual approach to logging, it is recommended to +include information about the current async task in the log record. +Like so: + +.. code-block:: python + + import asyncio + import logging + import sys + + class TaskIdFilter(logging.Filter): + """Injecting async task id into log records.""" + + def filter(self, record): + try: + record.taskId = id(asyncio.current_task()) + except RuntimeError: + record.taskId = None + return True + + + # create a handler, e.g. to log to stdout + handler = logging.StreamHandler(sys.stdout) + # configure the handler to your liking + handler.setFormatter(logging.Formatter( + "[%(levelname)-8s] [Task %(taskId)-15s] %(asctime)s %(message)s" + # or when using threading AND asyncio + # "[%(levelname)-8s] [Thread %(thread)d] [Task %(taskId)-15s] " + # "%(asctime)s %(message)s" + )) + # attache the filter injecting the task id to the handler + handler.addFilter(TaskIdFilter()) + # add the handler to the driver's logger + logging.getLogger("neo4j").addHandler(handler) + # make sure the logger logs on the desired log level + logging.getLogger("neo4j").setLevel(logging.DEBUG) + # from now on, DEBUG logging to stdout is enabled in the driver diff --git a/neo4j/_async/io/_bolt.py b/neo4j/_async/io/_bolt.py index a83e58a6..6215506f 100644 --- a/neo4j/_async/io/_bolt.py +++ b/neo4j/_async/io/_bolt.py @@ -160,6 +160,10 @@ def __del__(self): if not asyncio.iscoroutinefunction(self.close): self.close() + @property + def connection_id(self): + return self.server_info._metadata.get("connection_id", "") + @property @abc.abstractmethod def supports_multiple_results(self): @@ -689,7 +693,8 @@ async def _set_defunct(self, message, error=None, silent=False): user_cancelled = isinstance(error, asyncio.CancelledError) if error: - log.debug("[#%04X] %r", self.local_port, error) + log.debug("[#%04X] _: error: %r", self.local_port, + error) if not user_cancelled: log.error(message) # We were attempting to receive data but the connection @@ -751,7 +756,7 @@ async def close(self): try: await self._send_all() except (OSError, BoltError, DriverError) as exc: - log.debug("[#%04X] ignoring failed close %r", + log.debug("[#%04X] _: ignoring failed close %r", self.local_port, exc) log.debug("[#%04X] C: ", self.local_port) try: @@ -770,7 +775,7 @@ def kill(self): try: self.socket.kill() except OSError as exc: - log.debug("[#%04X] ignoring failed kill %r", + log.debug("[#%04X] _: ignoring failed kill %r", self.local_port, exc) finally: self._closed = True diff --git a/neo4j/_async/io/_bolt3.py b/neo4j/_async/io/_bolt3.py index 072f7417..41835810 100644 --- a/neo4j/_async/io/_bolt3.py +++ b/neo4j/_async/io/_bolt3.py @@ -111,7 +111,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property diff --git a/neo4j/_async/io/_bolt4.py b/neo4j/_async/io/_bolt4.py index f82cf706..d523659b 100644 --- a/neo4j/_async/io/_bolt4.py +++ b/neo4j/_async/io/_bolt4.py @@ -69,7 +69,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property @@ -426,7 +426,8 @@ def on_success(metadata): if isinstance(recv_timeout, int) and recv_timeout > 0: self.socket.settimeout(recv_timeout) else: - log.info("[#%04X] Server supplied an invalid value for " + log.info("[#%04X] _: Server supplied an " + "invalid value for " "connection.recv_timeout_seconds (%r). Make sure " "the server and network is set up correctly.", self.local_port, recv_timeout) diff --git a/neo4j/_async/io/_bolt5.py b/neo4j/_async/io/_bolt5.py index aa399e8a..664f133c 100644 --- a/neo4j/_async/io/_bolt5.py +++ b/neo4j/_async/io/_bolt5.py @@ -66,7 +66,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property @@ -104,7 +104,8 @@ def on_success(metadata): if isinstance(recv_timeout, int) and recv_timeout > 0: self.socket.settimeout(recv_timeout) else: - log.info("[#%04X] Server supplied an invalid value for " + log.info("[#%04X] _: Server supplied an " + "invalid value for " "connection.recv_timeout_seconds (%r). Make sure " "the server and network is set up correctly.", self.local_port, recv_timeout) diff --git a/neo4j/_async/io/_pool.py b/neo4j/_async/io/_pool.py index 53a5d5e7..a637c262 100644 --- a/neo4j/_async/io/_pool.py +++ b/neo4j/_async/io/_pool.py @@ -110,9 +110,9 @@ async def _acquire_from_pool_checked( # `stale` but still alive. if log.isEnabledFor(logging.DEBUG): log.debug( - "[#%04X] C: removing old connection " + "[#%04X] _: removing old connection %s " "(closed=%s, defunct=%s, stale=%s, in_use=%s)", - connection.local_port, + connection.local_port, connection.connection_id, connection.closed(), connection.defunct(), connection.stale(), connection.in_use ) @@ -182,7 +182,7 @@ async def health_check(connection_, deadline_): if connection_.is_idle_for(liveness_check_timeout): with connection_deadline(connection_, deadline_): try: - log.debug("[#%04X] C: ", + log.debug("[#%04X] _: liveness check", connection_.local_port) await connection_.reset() except (OSError, ServiceUnavailable, SessionExpired): @@ -195,6 +195,9 @@ async def health_check(connection_, deadline_): address, health_check, deadline ) if connection: + log.debug("[#%04X] _: handing out existing connection " + "%s", connection.local_port, + connection.connection_id) return connection # all connections in pool are in-use with self.lock: @@ -209,10 +212,12 @@ async def health_check(connection_, deadline_): timeout == 0 # deadline expired or not await self.cond.wait(timeout) ): + log.debug("[#0000] _: acquisition timed out") raise ClientError( "failed to obtain a connection from the pool within " "{!r}s (timeout)".format(deadline.original_timeout) ) + log.debug("[#0000] _: trying to hand out new connection") return await connection_creator() @abc.abstractmethod @@ -239,8 +244,8 @@ def kill_and_release(self, *connections): if not (connection.defunct() or connection.closed()): log.debug( - "[#%04X] C: killing connection on release", - connection.local_port + "[#%04X] _: killing connection on release %s", + connection.local_port, connection.connection_id ) connection.kill() with self.lock: @@ -260,26 +265,32 @@ async def release(self, *connections): or connection.is_reset): if cancelled is not None: log.debug( - "[#%04X] C: released unclean connection", - connection.local_port + "[#%04X] _: released unclean connection %s", + connection.local_port, connection.connection_id ) connection.kill() continue try: log.debug( - "[#%04X] C: released unclean connection", - connection.local_port + "[#%04X] _: released unclean connection %s", + connection.local_port, connection.connection_id ) await connection.reset() except (Neo4jError, DriverError, BoltError) as e: - log.debug("Failed to reset connection on release: %r", e) + log.debug("[#%04X] _: failed to reset connection " + "on release: %r", connection.local_port, e) except asyncio.CancelledError as e: - log.debug("Cancelled reset connection on release: %r", e) + log.debug("[#%04X] _: cancelled reset connection " + "on release: %r", connection.local_port, e) cancelled = e connection.kill() with self.lock: for connection in connections: connection.in_use = False + log.debug( + "[#%04X] _: released %s", + connection.local_port, connection.connection_id + ) self.cond.notify_all() if cancelled is not None: raise cancelled @@ -346,7 +357,7 @@ async def close(self): """ Close all connections and empty the pool. This method is thread safe. """ - log.debug("[#0000] C: close") + log.debug("[#0000] _: close") try: connections = [] with self.lock: @@ -378,6 +389,7 @@ async def opener(addr, timeout): ) pool = cls(opener, pool_config, workspace_config, address) + log.debug("[#0000] _: created, direct address %r", address) return pool def __init__(self, opener, pool_config, workspace_config, address): @@ -393,6 +405,8 @@ async def acquire( ): # The access_mode and database is not needed for a direct connection, # it's just there for consistency. + log.debug("[#0000] _: acquire direct connection, " + "access_mode=%r, database=%r", access_mode, database) deadline = Deadline.from_timeout_or_deadline(timeout) return await self._acquire( self.address, deadline, liveness_check_timeout @@ -430,6 +444,7 @@ async def opener(addr, timeout): ) pool = cls(opener, pool_config, workspace_config, address) + log.debug("[#0000] _: created, routing address %r", address) return pool def __init__(self, opener, pool_config, workspace_config, address): @@ -442,7 +457,6 @@ def __init__(self, opener, pool_config, workspace_config, address): """ super().__init__(opener, pool_config, workspace_config) # Each database have a routing table, the default database is a special case. - log.debug("[#0000] C: routing address %r", address) self.address = address self.routing_tables = {} self.refresh_lock = AsyncRLock() @@ -485,6 +499,8 @@ async def fetch_routing_info( routing, or if routing support is broken or outdated """ deadline = Deadline.from_timeout_or_deadline(acquisition_timeout) + log.debug("[#0000] _: _acquire router connection, " + "database=%r, address=%r", database, address) cx = await self._acquire(address, deadline, None) try: routing_table = await cx.route( @@ -527,7 +543,8 @@ async def fetch_routing_table( except (ServiceUnavailable, SessionExpired): pass if not new_routing_info: - log.debug("Failed to fetch routing info %s", address) + log.debug("[#0000] _: failed to fetch routing info " + "from %r", address) return None else: servers = new_routing_info[0]["servers"] @@ -547,12 +564,14 @@ async def fetch_routing_table( # No routers if num_routers == 0: - log.debug("No routing servers returned from server %s", address) + log.debug("[#0000] _: no routing servers returned from " + "server %s", address) return None # No readers if num_readers == 0: - log.debug("No read servers returned from server %s", address) + log.debug("[#0000] _: no read servers returned from " + "server %s", address) return None # At least one of each is fine, so return this table @@ -567,9 +586,9 @@ async def _update_routing_table_from( :return: True if the routing table is successfully updated, otherwise False """ - log.debug("Attempting to update routing table from {}".format( - ", ".join(map(repr, routers))) - ) + if routers: + log.debug("[#0000] _: attempting to update routing " + "table from {}".format(", ".join(map(repr, routers)))) for router in routers: async for address in AsyncNetworkUtil.resolve_address( router, resolver=self.pool_config.resolver @@ -585,7 +604,8 @@ async def _update_routing_table_from( ) old_routing_table.update(new_routing_table) log.debug( - "[#0000] C: address=%r (%r)", + "[#0000] _: update routing table from " + "address=%r (%r)", address, self.routing_tables[new_database] ) if callable(database_callback): @@ -683,14 +703,19 @@ async def ensure_routing_table_is_fresh( for database_ in list(self.routing_tables.keys()): # Remove unused databases in the routing table # Remove the routing table after a timeout = TTL + 30s - log.debug("[#0000] C: database=%s", database_) + log.debug("[#0000] _: routing aged?, database=%s", + database) routing_table = self.routing_tables[database_] if routing_table.should_be_purged_from_memory(): + log.debug("[#0000] _: dropping routing table for " + "database=%s", database) del self.routing_tables[database_] routing_table = await self.get_or_create_routing_table(database) if routing_table.is_fresh(readonly=(access_mode == READ_ACCESS)): - # Readers are fresh. + # table is still valid + log.debug("[#0000] _: using existing routing table %r", + routing_table) return False await self.update_routing_table( @@ -740,9 +765,16 @@ async def acquire( raise ClientError("'timeout' must be a float larger than 0; {}" .format(timeout)) + from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) + # await self.ensure_routing_table_is_fresh( + # access_mode=access_mode, database=database, imp_user=None, + # bookmarks=bookmarks, acquisition_timeout=timeout + # ) + log.debug("[#0000] _: acquire routing connection, " + "access_mode=%r, database=%r", access_mode, database) await self.ensure_routing_table_is_fresh( access_mode=access_mode, database=database, imp_user=None, bookmarks=bookmarks, @@ -759,7 +791,8 @@ async def acquire( except (ReadServiceUnavailable, WriteServiceUnavailable) as err: raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err try: - log.debug("[#0000] C: database=%r address=%r", database, address) + log.debug("[#0000] _: acquire address, database=%r " + "address=%r", database, address) deadline = Deadline.from_timeout_or_deadline(timeout) # should always be a resolved address connection = await self._acquire( @@ -775,20 +808,20 @@ async def deactivate(self, address): if present, remove from the routing table and also closing all idle connections to that address. """ - log.debug("[#0000] C: Deactivating address %r", address) + log.debug("[#0000] _: deactivating address %r", address) # We use `discard` instead of `remove` here since the former # will not fail if the address has already been removed. for database in self.routing_tables.keys(): self.routing_tables[database].routers.discard(address) self.routing_tables[database].readers.discard(address) self.routing_tables[database].writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_tables) + log.debug("[#0000] _: table=%r", self.routing_tables) await super(AsyncNeo4jPool, self).deactivate(address) def on_write_failure(self, address): """ Remove a writer address from the routing table, if present. """ - log.debug("[#0000] C: Removing writer %r", address) + log.debug("[#0000] _: removing writer %r", address) for database in self.routing_tables.keys(): self.routing_tables[database].writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_tables) + log.debug("[#0000] _: table=%r", self.routing_tables) diff --git a/neo4j/_async/work/session.py b/neo4j/_async/work/session.py index 8c7b1c7f..cc539b64 100644 --- a/neo4j/_async/work/session.py +++ b/neo4j/_async/work/session.py @@ -138,11 +138,12 @@ def _handle_cancellation(self, message="General"): connection = self._connection self._connection = None if connection: - log.debug("[#%04X] %s cancellation clean-up", + log.debug("[#%04X] _: %s cancellation clean-up", connection.local_port, message) self._pool.kill_and_release(connection) else: - log.debug("[#0000] %s cancellation clean-up", message) + log.debug("[#0000] _: %s cancellation clean-up", + message) async def _result_closed(self): if self._auto_result: @@ -519,7 +520,7 @@ async def _run_transaction( try: await async_sleep(delay) except asyncio.CancelledError: - log.debug("[#0000] Retry cancelled") + log.debug("[#0000] _: retry cancelled") raise if errors: diff --git a/neo4j/_async/work/workspace.py b/neo4j/_async/work/workspace.py index 45f9e5dd..980a8aff 100644 --- a/neo4j/_async/work/workspace.py +++ b/neo4j/_async/work/workspace.py @@ -19,10 +19,10 @@ from __future__ import annotations import asyncio +import logging from ..._async_compat.util import AsyncUtil from ..._conf import WorkspaceConfig -from ..._deadline import Deadline from ..._meta import ( deprecation_warn, unclosed_resource_warn, @@ -36,6 +36,9 @@ from ..io import AsyncNeo4jPool +log = logging.getLogger("neo4j") + + class AsyncWorkspace: def __init__(self, pool, config): @@ -176,6 +179,7 @@ async def _connect(self, access_mode, **acquire_kwargs): # to try to fetch the home database. If provided by the server, # we shall use this database explicitly for all subsequent # actions within this session. + log.debug("[#0000] _: resolve home database") await self._pool.update_routing_table( database=self._config.database, imp_user=self._config.impersonated_user, diff --git a/neo4j/_async_compat/network/_bolt_socket.py b/neo4j/_async_compat/network/_bolt_socket.py index 3d7e6946..fee8cedf 100644 --- a/neo4j/_async_compat/network/_bolt_socket.py +++ b/neo4j/_async_compat/network/_bolt_socket.py @@ -238,7 +238,7 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl): return cls(reader, protocol, writer) except asyncio.TimeoutError: - log.debug("[#0000] C: %s", resolved_address) + log.debug("[#0000] S: %s", resolved_address) log.debug("[#0000] C: %s", resolved_address) if s: await cls.close_socket(s) @@ -246,7 +246,7 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl): "Timed out trying to establish connection to {!r}".format( resolved_address)) except asyncio.CancelledError: - log.debug("[#0000] C: %s", resolved_address) + log.debug("[#0000] S: %s", resolved_address) log.debug("[#0000] C: %s", resolved_address) if s: await cls.close_socket(s) @@ -260,7 +260,7 @@ async def _connect_secure(cls, resolved_address, timeout, keep_alive, ssl): address=(resolved_address.host_name, local_port) ) from error except OSError as error: - log.debug("[#0000] C: %s %s", type(error).__name__, + log.debug("[#0000] S: %s %s", type(error).__name__, " ".join(map(repr, error.args))) log.debug("[#0000] C: %s", resolved_address) if s: @@ -507,14 +507,14 @@ def _connect(cls, resolved_address, timeout, keep_alive): s.setsockopt(SOL_SOCKET, SO_KEEPALIVE, keep_alive) return s except SocketTimeout: - log.debug("[#0000] C: %s", resolved_address) + log.debug("[#0000] S: %s", resolved_address) log.debug("[#0000] C: %s", resolved_address) cls.close_socket(s) raise ServiceUnavailable( "Timed out trying to establish connection to {!r}".format( resolved_address)) except OSError as error: - log.debug("[#0000] C: %s %s", type(error).__name__, + log.debug("[#0000] S: %s %s", type(error).__name__, " ".join(map(repr, error.args))) log.debug("[#0000] C: %s", resolved_address) cls.close_socket(s) @@ -655,7 +655,7 @@ def connect(cls, address, *, timeout, custom_resolver, ssl_context, err_str = error.__class__.__name__ if str(error): err_str += ": " + str(error) - log.debug("[#%04X] C: %s", local_port, + log.debug("[#%04X] S: %s", local_port, err_str) if s: cls.close_socket(s) diff --git a/neo4j/_async_compat/network/_util.py b/neo4j/_async_compat/network/_util.py index f1d5ca4a..b1ad27e9 100644 --- a/neo4j/_async_compat/network/_util.py +++ b/neo4j/_async_compat/network/_util.py @@ -72,21 +72,27 @@ async def resolve_address(address, family=0, resolver=None): yield address return - log.debug("[#0000] C: %s", address) + log.debug("[#0000] _: in: %s", address) if resolver: if asyncio.iscoroutinefunction(resolver): resolved_addresses = await resolver(address) else: resolved_addresses = resolver(address) for address in map(addressing.Address, resolved_addresses): + log.debug("[#0000] _: custom resolver out: %s", + address) for resolved_address in await AsyncNetworkUtil._dns_resolver( address, family=family ): + log.debug("[#0000] _: dns resolver out: %s", + resolved_address) yield resolved_address else: for resolved_address in await AsyncNetworkUtil._dns_resolver( address, family=family ): + log.debug("[#0000] _: dns resolver out: %s", + resolved_address) yield resolved_address @@ -136,15 +142,21 @@ def resolve_address(address, family=0, resolver=None): yield address return - addressing.log.debug("[#0000] C: %s", address) + addressing.log.debug("[#0000] _: in: %s", address) if resolver: for address in map(addressing.Address, resolver(address)): + log.debug("[#0000] _: custom resolver out: %s", + address) for resolved_address in NetworkUtil._dns_resolver( address, family=family ): + log.debug("[#0000] _: dns resolver out: %s", + resolved_address) yield resolved_address else: for resolved_address in NetworkUtil._dns_resolver( address, family=family ): + log.debug("[#0000] _: dns resolver out: %s", + resolved_address) yield resolved_address diff --git a/neo4j/_routing.py b/neo4j/_routing.py index a073dda3..4e562635 100644 --- a/neo4j/_routing.py +++ b/neo4j/_routing.py @@ -129,16 +129,17 @@ def is_fresh(self, readonly=False): """ Indicator for whether routing information is still usable. """ assert isinstance(readonly, bool) - log.debug("[#0000] C: Checking table freshness (readonly=%r)", readonly) expired = self.last_updated_time + self.ttl <= perf_counter() if readonly: has_server_for_mode = bool(self.readers) else: has_server_for_mode = bool(self.writers) - log.debug("[#0000] C: Table expired=%r", expired) - log.debug("[#0000] C: Table routers=%r", self.routers) - log.debug("[#0000] C: Table has_server_for_mode=%r", has_server_for_mode) - return not expired and self.routers and has_server_for_mode + res = not expired and self.routers and has_server_for_mode + log.debug("[#0000] _: checking table freshness " + "(readonly=%r): table expired=%r, " + "has_server_for_mode=%r, table routers=%r => %r", + readonly, expired, has_server_for_mode, self.routers, res) + return res def should_be_purged_from_memory(self): """ Check if the routing table is stale and not used for a long time and should be removed from memory. @@ -148,8 +149,11 @@ def should_be_purged_from_memory(self): """ from neo4j._conf import RoutingConfig perf_time = perf_counter() - log.debug("[#0000] C: last_updated_time=%r perf_time=%r", self.last_updated_time, perf_time) - return self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time + res = self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time + log.debug("[#0000] _: purge check: " + "last_updated_time=%r, ttl=%r, perf_time=%r => %r", + self.last_updated_time, self.ttl, perf_time, res) + return res def update(self, new_routing_table): """ Update the current routing table with new routing information @@ -161,7 +165,7 @@ def update(self, new_routing_table): self.initialized_without_writers = not self.writers self.last_updated_time = perf_counter() self.ttl = new_routing_table.ttl - log.debug("[#0000] S: table=%r", self) + log.debug("[#0000] _: updated table=%r", self) def servers(self): return set(self.routers) | set(self.writers) | set(self.readers) diff --git a/neo4j/_sync/io/_bolt.py b/neo4j/_sync/io/_bolt.py index 93e8207a..175e63b1 100644 --- a/neo4j/_sync/io/_bolt.py +++ b/neo4j/_sync/io/_bolt.py @@ -160,6 +160,10 @@ def __del__(self): if not asyncio.iscoroutinefunction(self.close): self.close() + @property + def connection_id(self): + return self.server_info._metadata.get("connection_id", "") + @property @abc.abstractmethod def supports_multiple_results(self): @@ -689,7 +693,8 @@ def _set_defunct(self, message, error=None, silent=False): user_cancelled = isinstance(error, asyncio.CancelledError) if error: - log.debug("[#%04X] %r", self.local_port, error) + log.debug("[#%04X] _: error: %r", self.local_port, + error) if not user_cancelled: log.error(message) # We were attempting to receive data but the connection @@ -751,7 +756,7 @@ def close(self): try: self._send_all() except (OSError, BoltError, DriverError) as exc: - log.debug("[#%04X] ignoring failed close %r", + log.debug("[#%04X] _: ignoring failed close %r", self.local_port, exc) log.debug("[#%04X] C: ", self.local_port) try: @@ -770,7 +775,7 @@ def kill(self): try: self.socket.kill() except OSError as exc: - log.debug("[#%04X] ignoring failed kill %r", + log.debug("[#%04X] _: ignoring failed kill %r", self.local_port, exc) finally: self._closed = True diff --git a/neo4j/_sync/io/_bolt3.py b/neo4j/_sync/io/_bolt3.py index 2db6d561..ba24eefc 100644 --- a/neo4j/_sync/io/_bolt3.py +++ b/neo4j/_sync/io/_bolt3.py @@ -111,7 +111,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property diff --git a/neo4j/_sync/io/_bolt4.py b/neo4j/_sync/io/_bolt4.py index 60911526..2ff95eb0 100644 --- a/neo4j/_sync/io/_bolt4.py +++ b/neo4j/_sync/io/_bolt4.py @@ -69,7 +69,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property @@ -426,7 +426,8 @@ def on_success(metadata): if isinstance(recv_timeout, int) and recv_timeout > 0: self.socket.settimeout(recv_timeout) else: - log.info("[#%04X] Server supplied an invalid value for " + log.info("[#%04X] _: Server supplied an " + "invalid value for " "connection.recv_timeout_seconds (%r). Make sure " "the server and network is set up correctly.", self.local_port, recv_timeout) diff --git a/neo4j/_sync/io/_bolt5.py b/neo4j/_sync/io/_bolt5.py index a7180e5a..25283af6 100644 --- a/neo4j/_sync/io/_bolt5.py +++ b/neo4j/_sync/io/_bolt5.py @@ -66,7 +66,7 @@ def __init__(self, *args, **kwargs): ) def _on_server_state_change(self, old_state, new_state): - log.debug("[#%04X] State: %s > %s", self.local_port, + log.debug("[#%04X] _: state: %s > %s", self.local_port, old_state.name, new_state.name) @property @@ -104,7 +104,8 @@ def on_success(metadata): if isinstance(recv_timeout, int) and recv_timeout > 0: self.socket.settimeout(recv_timeout) else: - log.info("[#%04X] Server supplied an invalid value for " + log.info("[#%04X] _: Server supplied an " + "invalid value for " "connection.recv_timeout_seconds (%r). Make sure " "the server and network is set up correctly.", self.local_port, recv_timeout) diff --git a/neo4j/_sync/io/_pool.py b/neo4j/_sync/io/_pool.py index 468ce1dc..43fe68ab 100644 --- a/neo4j/_sync/io/_pool.py +++ b/neo4j/_sync/io/_pool.py @@ -110,9 +110,9 @@ def _acquire_from_pool_checked( # `stale` but still alive. if log.isEnabledFor(logging.DEBUG): log.debug( - "[#%04X] C: removing old connection " + "[#%04X] _: removing old connection %s " "(closed=%s, defunct=%s, stale=%s, in_use=%s)", - connection.local_port, + connection.local_port, connection.connection_id, connection.closed(), connection.defunct(), connection.stale(), connection.in_use ) @@ -182,7 +182,7 @@ def health_check(connection_, deadline_): if connection_.is_idle_for(liveness_check_timeout): with connection_deadline(connection_, deadline_): try: - log.debug("[#%04X] C: ", + log.debug("[#%04X] _: liveness check", connection_.local_port) connection_.reset() except (OSError, ServiceUnavailable, SessionExpired): @@ -195,6 +195,9 @@ def health_check(connection_, deadline_): address, health_check, deadline ) if connection: + log.debug("[#%04X] _: handing out existing connection " + "%s", connection.local_port, + connection.connection_id) return connection # all connections in pool are in-use with self.lock: @@ -209,10 +212,12 @@ def health_check(connection_, deadline_): timeout == 0 # deadline expired or not self.cond.wait(timeout) ): + log.debug("[#0000] _: acquisition timed out") raise ClientError( "failed to obtain a connection from the pool within " "{!r}s (timeout)".format(deadline.original_timeout) ) + log.debug("[#0000] _: trying to hand out new connection") return connection_creator() @abc.abstractmethod @@ -239,8 +244,8 @@ def kill_and_release(self, *connections): if not (connection.defunct() or connection.closed()): log.debug( - "[#%04X] C: killing connection on release", - connection.local_port + "[#%04X] _: killing connection on release %s", + connection.local_port, connection.connection_id ) connection.kill() with self.lock: @@ -260,26 +265,32 @@ def release(self, *connections): or connection.is_reset): if cancelled is not None: log.debug( - "[#%04X] C: released unclean connection", - connection.local_port + "[#%04X] _: released unclean connection %s", + connection.local_port, connection.connection_id ) connection.kill() continue try: log.debug( - "[#%04X] C: released unclean connection", - connection.local_port + "[#%04X] _: released unclean connection %s", + connection.local_port, connection.connection_id ) connection.reset() except (Neo4jError, DriverError, BoltError) as e: - log.debug("Failed to reset connection on release: %r", e) + log.debug("[#%04X] _: failed to reset connection " + "on release: %r", connection.local_port, e) except asyncio.CancelledError as e: - log.debug("Cancelled reset connection on release: %r", e) + log.debug("[#%04X] _: cancelled reset connection " + "on release: %r", connection.local_port, e) cancelled = e connection.kill() with self.lock: for connection in connections: connection.in_use = False + log.debug( + "[#%04X] _: released %s", + connection.local_port, connection.connection_id + ) self.cond.notify_all() if cancelled is not None: raise cancelled @@ -346,7 +357,7 @@ def close(self): """ Close all connections and empty the pool. This method is thread safe. """ - log.debug("[#0000] C: close") + log.debug("[#0000] _: close") try: connections = [] with self.lock: @@ -378,6 +389,7 @@ def opener(addr, timeout): ) pool = cls(opener, pool_config, workspace_config, address) + log.debug("[#0000] _: created, direct address %r", address) return pool def __init__(self, opener, pool_config, workspace_config, address): @@ -393,6 +405,8 @@ def acquire( ): # The access_mode and database is not needed for a direct connection, # it's just there for consistency. + log.debug("[#0000] _: acquire direct connection, " + "access_mode=%r, database=%r", access_mode, database) deadline = Deadline.from_timeout_or_deadline(timeout) return self._acquire( self.address, deadline, liveness_check_timeout @@ -430,6 +444,7 @@ def opener(addr, timeout): ) pool = cls(opener, pool_config, workspace_config, address) + log.debug("[#0000] _: created, routing address %r", address) return pool def __init__(self, opener, pool_config, workspace_config, address): @@ -442,7 +457,6 @@ def __init__(self, opener, pool_config, workspace_config, address): """ super().__init__(opener, pool_config, workspace_config) # Each database have a routing table, the default database is a special case. - log.debug("[#0000] C: routing address %r", address) self.address = address self.routing_tables = {} self.refresh_lock = RLock() @@ -485,6 +499,8 @@ def fetch_routing_info( routing, or if routing support is broken or outdated """ deadline = Deadline.from_timeout_or_deadline(acquisition_timeout) + log.debug("[#0000] _: _acquire router connection, " + "database=%r, address=%r", database, address) cx = self._acquire(address, deadline, None) try: routing_table = cx.route( @@ -527,7 +543,8 @@ def fetch_routing_table( except (ServiceUnavailable, SessionExpired): pass if not new_routing_info: - log.debug("Failed to fetch routing info %s", address) + log.debug("[#0000] _: failed to fetch routing info " + "from %r", address) return None else: servers = new_routing_info[0]["servers"] @@ -547,12 +564,14 @@ def fetch_routing_table( # No routers if num_routers == 0: - log.debug("No routing servers returned from server %s", address) + log.debug("[#0000] _: no routing servers returned from " + "server %s", address) return None # No readers if num_readers == 0: - log.debug("No read servers returned from server %s", address) + log.debug("[#0000] _: no read servers returned from " + "server %s", address) return None # At least one of each is fine, so return this table @@ -567,9 +586,9 @@ def _update_routing_table_from( :return: True if the routing table is successfully updated, otherwise False """ - log.debug("Attempting to update routing table from {}".format( - ", ".join(map(repr, routers))) - ) + if routers: + log.debug("[#0000] _: attempting to update routing " + "table from {}".format(", ".join(map(repr, routers)))) for router in routers: for address in NetworkUtil.resolve_address( router, resolver=self.pool_config.resolver @@ -585,7 +604,8 @@ def _update_routing_table_from( ) old_routing_table.update(new_routing_table) log.debug( - "[#0000] C: address=%r (%r)", + "[#0000] _: update routing table from " + "address=%r (%r)", address, self.routing_tables[new_database] ) if callable(database_callback): @@ -683,14 +703,19 @@ def ensure_routing_table_is_fresh( for database_ in list(self.routing_tables.keys()): # Remove unused databases in the routing table # Remove the routing table after a timeout = TTL + 30s - log.debug("[#0000] C: database=%s", database_) + log.debug("[#0000] _: routing aged?, database=%s", + database) routing_table = self.routing_tables[database_] if routing_table.should_be_purged_from_memory(): + log.debug("[#0000] _: dropping routing table for " + "database=%s", database) del self.routing_tables[database_] routing_table = self.get_or_create_routing_table(database) if routing_table.is_fresh(readonly=(access_mode == READ_ACCESS)): - # Readers are fresh. + # table is still valid + log.debug("[#0000] _: using existing routing table %r", + routing_table) return False self.update_routing_table( @@ -740,9 +765,16 @@ def acquire( raise ClientError("'timeout' must be a float larger than 0; {}" .format(timeout)) + from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) + # await self.ensure_routing_table_is_fresh( + # access_mode=access_mode, database=database, imp_user=None, + # bookmarks=bookmarks, acquisition_timeout=timeout + # ) + log.debug("[#0000] _: acquire routing connection, " + "access_mode=%r, database=%r", access_mode, database) self.ensure_routing_table_is_fresh( access_mode=access_mode, database=database, imp_user=None, bookmarks=bookmarks, @@ -759,7 +791,8 @@ def acquire( except (ReadServiceUnavailable, WriteServiceUnavailable) as err: raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err try: - log.debug("[#0000] C: database=%r address=%r", database, address) + log.debug("[#0000] _: acquire address, database=%r " + "address=%r", database, address) deadline = Deadline.from_timeout_or_deadline(timeout) # should always be a resolved address connection = self._acquire( @@ -775,20 +808,20 @@ def deactivate(self, address): if present, remove from the routing table and also closing all idle connections to that address. """ - log.debug("[#0000] C: Deactivating address %r", address) + log.debug("[#0000] _: deactivating address %r", address) # We use `discard` instead of `remove` here since the former # will not fail if the address has already been removed. for database in self.routing_tables.keys(): self.routing_tables[database].routers.discard(address) self.routing_tables[database].readers.discard(address) self.routing_tables[database].writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_tables) + log.debug("[#0000] _: table=%r", self.routing_tables) super(Neo4jPool, self).deactivate(address) def on_write_failure(self, address): """ Remove a writer address from the routing table, if present. """ - log.debug("[#0000] C: Removing writer %r", address) + log.debug("[#0000] _: removing writer %r", address) for database in self.routing_tables.keys(): self.routing_tables[database].writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_tables) + log.debug("[#0000] _: table=%r", self.routing_tables) diff --git a/neo4j/_sync/work/session.py b/neo4j/_sync/work/session.py index b5046a8c..21dfc526 100644 --- a/neo4j/_sync/work/session.py +++ b/neo4j/_sync/work/session.py @@ -138,11 +138,12 @@ def _handle_cancellation(self, message="General"): connection = self._connection self._connection = None if connection: - log.debug("[#%04X] %s cancellation clean-up", + log.debug("[#%04X] _: %s cancellation clean-up", connection.local_port, message) self._pool.kill_and_release(connection) else: - log.debug("[#0000] %s cancellation clean-up", message) + log.debug("[#0000] _: %s cancellation clean-up", + message) def _result_closed(self): if self._auto_result: @@ -519,7 +520,7 @@ def _run_transaction( try: sleep(delay) except asyncio.CancelledError: - log.debug("[#0000] Retry cancelled") + log.debug("[#0000] _: retry cancelled") raise if errors: diff --git a/neo4j/_sync/work/workspace.py b/neo4j/_sync/work/workspace.py index 844bb96c..df885c45 100644 --- a/neo4j/_sync/work/workspace.py +++ b/neo4j/_sync/work/workspace.py @@ -19,10 +19,10 @@ from __future__ import annotations import asyncio +import logging from ..._async_compat.util import Util from ..._conf import WorkspaceConfig -from ..._deadline import Deadline from ..._meta import ( deprecation_warn, unclosed_resource_warn, @@ -36,6 +36,9 @@ from ..io import Neo4jPool +log = logging.getLogger("neo4j") + + class Workspace: def __init__(self, pool, config): @@ -176,6 +179,7 @@ def _connect(self, access_mode, **acquire_kwargs): # to try to fetch the home database. If provided by the server, # we shall use this database explicitly for all subsequent # actions within this session. + log.debug("[#0000] _: resolve home database") self._pool.update_routing_table( database=self._config.database, imp_user=self._config.impersonated_user, diff --git a/neo4j/debug.py b/neo4j/debug.py index 50863ff0..7b7fc24a 100644 --- a/neo4j/debug.py +++ b/neo4j/debug.py @@ -18,11 +18,13 @@ from __future__ import annotations +import asyncio import typing as t from logging import ( CRITICAL, DEBUG, ERROR, + Filter, Formatter, getLogger, INFO, @@ -43,7 +45,7 @@ class ColourFormatter(Formatter): """ def format(self, record): - s = super(ColourFormatter, self).format(record) + s = super().format(record) if record.levelno == CRITICAL: return "\x1b[31;1m%s\x1b[0m" % s # bright red elif record.levelno == ERROR: @@ -58,6 +60,17 @@ def format(self, record): return s +class TaskIdFilter(Filter): + """Injecting async task id into log records.""" + + def filter(self, record): + try: + record.task = id(asyncio.current_task()) + except RuntimeError: + record.task = None + return True + + class Watcher: """Log watcher for easier logging setup. @@ -73,23 +86,41 @@ class Watcher: threads can lead to duplicate log messages as the context manager will enable logging for all threads. + .. note:: + The exact logging format is not part of the API contract and might + change at any time without notice. It is meant for debugging purposes + and human consumption only. + :param logger_names: Names of loggers to watch. :param default_level: Default minimum log level to show. - The level can be overridden by setting the level a level when calling + The level can be overridden by setting ``level`` when calling :meth:`.watch`. :param default_out: Default output stream for all loggers. - The level can be overridden by setting the level a level when calling + The level can be overridden by setting ``out`` when calling :meth:`.watch`. + :type default_out: stream or file-like object :param colour: Whether the log levels should be indicated with ANSI colour codes. + :param thread_info: whether to include information about the current + thread in the log message. Defaults to :const:`True`. + :param task_info: whether to include information about the current + async task in the log message. Defaults to :const:`True`. + + .. versionchanged:: + 5.3 + + * Added ``thread_info`` and ``task_info`` parameters. + * Logging format around thread and task information changed. """ def __init__( self, - *logger_names: str, + *logger_names: t.Optional[str], default_level: int = DEBUG, default_out: t.TextIO = stderr, - colour: bool = False + colour: bool = False, + thread_info: bool = True, + task_info: bool = True, ) -> None: super(Watcher, self).__init__() self.logger_names = logger_names @@ -97,8 +128,13 @@ def __init__( self.default_level = default_level self.default_out = default_out self._handlers: t.Dict[str, StreamHandler] = {} + self._task_info = task_info - format_ = "%(threadName)s(%(thread)d) %(asctime)s %(message)s" + format_ = "%(asctime)s %(message)s" + if task_info: + format_ = "[Task %(task)-15s] " + format_ + if thread_info: + format_ = "[Thread %(thread)d] " + format_ if not colour: format_ = "[%(levelname)-8s] " + format_ formatter_cls = ColourFormatter if colour else Formatter @@ -122,6 +158,7 @@ def watch( If :const:`None`, the ``default_level`` is used. :param out: Output stream for all loggers. If :const:`None`, the ``default_out`` is used. + :type out: stream or file-like object """ if level is None: level = self.default_level @@ -130,6 +167,8 @@ def watch( self.stop() handler = StreamHandler(out) handler.setFormatter(self.formatter) + if self._task_info: + handler.addFilter(TaskIdFilter()) for logger in self. _loggers: self._handlers[logger.name] = handler logger.addHandler(handler) @@ -145,10 +184,12 @@ def stop(self) -> None: def watch( - *logger_names: str, + *logger_names: t.Optional[str], level: int = DEBUG, out: t.TextIO = stderr, - colour: bool = False + colour: bool = False, + thread_info: bool = True, + task_info: bool = True, ) -> Watcher: """Quick wrapper for using :class:`.Watcher`. @@ -162,19 +203,42 @@ def watch( watch("neo4j") # from now on, DEBUG logging to stderr is enabled in the driver + .. note:: + The exact logging format is not part of the API contract and might + change at any time without notice. It is meant for debugging purposes + and human consumption only. + :param logger_names: Names of loggers to watch. - :type logger_names: str :param level: see ``default_level`` of :class:`.Watcher`. - :type level: int :param out: see ``default_out`` of :class:`.Watcher`. :type out: stream or file-like object :param colour: see ``colour`` of :class:`.Watcher`. - :type colour: bool + :param thread_info: see ``thread_info`` of :class:`.Watcher`. + :param task_info: see ``task_info`` of :class:`.Watcher`. :return: Watcher instance :rtype: :class:`.Watcher` + + .. versionchanged:: + 5.3 + + * Added ``thread_info`` and ``task_info`` parameters. + * Logging format around thread and task information changed. """ - watcher = Watcher(*logger_names, colour=colour, default_level=level, - default_out=out) + watcher = Watcher(*logger_names, default_level=level, default_out=out, + colour=colour, thread_info=thread_info, + task_info=task_info) watcher.watch() return watcher + + +class Connection: + def connect(self): + self.hello() # buffer HELLO message + self.logon() # buffer LOGON message + self.send_and_receive() # send HELLO and LOGON, receive 2x SUCCESS + + def reauth(self): + self.logoff() # buffer LOGOFF message + self.logon() # buffer LOGON message + self.send_and_receive() # send LOGOFF and LOGON, receive 2x SUCCESS diff --git a/tests/unit/async_/io/test_direct.py b/tests/unit/async_/io/test_direct.py index d8cfbf62..7729036e 100644 --- a/tests/unit/async_/io/test_direct.py +++ b/tests/unit/async_/io/test_direct.py @@ -53,6 +53,7 @@ def __init__(self, socket): self.socket = socket self.address = socket.getpeername() self.local_port = self.address[1] + self.connection_id = "bolt-1234" @property def is_reset(self): diff --git a/tests/unit/common/test_debug.py b/tests/unit/common/test_debug.py index b5350436..7454148e 100644 --- a/tests/unit/common/test_debug.py +++ b/tests/unit/common/test_debug.py @@ -18,6 +18,7 @@ from __future__ import annotations +import asyncio import io import logging import sys @@ -31,6 +32,8 @@ from neo4j import debug as neo4j_debug +from ..._async_compat import mark_async_test + if t.TYPE_CHECKING: @@ -40,7 +43,7 @@ def __call__(self, *args: str) -> t.Sequence[t.Any]: @pytest.fixture -def add_handler_mocker(mocker) -> _TSetupMockProtocol: +def logger_mocker(mocker) -> _TSetupMockProtocol: def setup_mock(*logger_names): loggers = [logging.getLogger(name) for name in logger_names] for logger in loggers: @@ -52,25 +55,25 @@ def setup_mock(*logger_names): return setup_mock -def test_watch_returns_watcher(add_handler_mocker) -> None: +def test_watch_returns_watcher(logger_mocker) -> None: logger_name = "neo4j" - add_handler_mocker(logger_name) + logger_mocker(logger_name) watcher = neo4j_debug.watch(logger_name) assert isinstance(watcher, neo4j_debug.Watcher) @pytest.mark.parametrize("logger_names", (("neo4j",), ("foobar",), ("neo4j", "foobar"))) -def test_watch_enables_logging(logger_names, add_handler_mocker) -> None: - loggers = add_handler_mocker(*logger_names) +def test_watch_enables_logging(logger_names, logger_mocker) -> None: + loggers = logger_mocker(*logger_names) neo4j_debug.watch(*logger_names) for logger in loggers: logger.addHandler.assert_called_once() -def test_watcher_watch_adds_logger(add_handler_mocker) -> None: +def test_watcher_watch_adds_logger(logger_mocker) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] + logger = logger_mocker(logger_name)[0] watcher = neo4j_debug.Watcher(logger_name) logger.addHandler.assert_not_called() @@ -78,9 +81,9 @@ def test_watcher_watch_adds_logger(add_handler_mocker) -> None: logger.addHandler.assert_called_once() -def test_watcher_stop_removes_logger(add_handler_mocker) -> None: +def test_watcher_stop_removes_logger(logger_mocker) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] + logger = logger_mocker(logger_name)[0] watcher = neo4j_debug.Watcher(logger_name) watcher.watch() @@ -115,10 +118,10 @@ def test_watcher_context_manager(mocker) -> None: ) ) def test_watcher_level( - add_handler_mocker, default_level, level, expected_level + logger_mocker, default_level, level, expected_level ) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] + logger = logger_mocker(logger_name)[0] kwargs = {} if default_level is not None: kwargs["default_level"] = default_level @@ -147,10 +150,10 @@ def test_watcher_level( ) ) def test_watcher_out( - add_handler_mocker, default_out, out, expected_out + logger_mocker, default_out, out, expected_out ) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] + logger = logger_mocker(logger_name)[0] kwargs = {} if default_out is not None: kwargs["default_out"] = default_out @@ -166,12 +169,16 @@ def test_watcher_out( @pytest.mark.parametrize("colour", (True, False)) -def test_watcher_colour(add_handler_mocker, colour) -> None: +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +def test_watcher_colour(logger_mocker, colour, thread, task) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] - watcher = neo4j_debug.Watcher(logger_name, colour=colour) + logger = logger_mocker(logger_name)[0] + watcher = neo4j_debug.Watcher(logger_name, colour=colour, + thread_info=thread, task_info=task) watcher.watch() + logger.addHandler.assert_called_once() (handler,), _ = logger.addHandler.call_args assert isinstance(handler, logging.Handler) assert isinstance(handler.formatter, logging.Formatter) @@ -182,19 +189,71 @@ def test_watcher_colour(add_handler_mocker, colour) -> None: @pytest.mark.parametrize("colour", (True, False)) -def test_watcher_format(add_handler_mocker, colour) -> None: +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +def test_watcher_format(logger_mocker, colour, thread, task) -> None: logger_name = "neo4j" - logger = add_handler_mocker(logger_name)[0] - watcher = neo4j_debug.Watcher(logger_name, colour=colour) + logger = logger_mocker(logger_name)[0] + watcher = neo4j_debug.Watcher(logger_name, colour=colour, + thread_info=thread, task_info=task) watcher.watch() + logger.addHandler.assert_called_once() (handler,), _ = logger.addHandler.call_args assert isinstance(handler, logging.Handler) assert isinstance(handler.formatter, logging.Formatter) + expected_format = "%(asctime)s %(message)s" + if task: + expected_format = "[Task %(task)-15s] " + expected_format + if thread: + expected_format = "[Thread %(thread)d] " + expected_format + if not colour: + expected_format = "[%(levelname)-8s] " + expected_format # Don't look at me like that. It's just for testing. - format = handler.formatter._fmt - if colour: - assert format == "%(threadName)s(%(thread)d) %(asctime)s %(message)s" + format_ = handler.formatter._fmt + assert format_ == expected_format + + +def _assert_task_injection( + async_: bool, mocker, logger_mocker, colour: bool, thread: bool, task: bool +) -> None: + handler_cls_mock = mocker.patch("neo4j.debug.StreamHandler", autospec=True) + handler_mock = handler_cls_mock.return_value + logger_name = "neo4j" + watcher = neo4j_debug.Watcher(logger_name, colour=colour, + thread_info=thread, task_info=task) + record_mock = mocker.Mock(spec=logging.LogRecord) + assert not hasattr(record_mock, "task") + + watcher.watch() + + if task: + handler_mock.addFilter.assert_called_once() + (filter_,), _ = handler_mock.addFilter.call_args + assert isinstance(filter_, logging.Filter) + filter_.filter(record_mock) + if async_: + assert record_mock.task == id(asyncio.current_task()) + else: + assert record_mock.task is None else: - assert format == "[%(levelname)-8s] " \ - "%(threadName)s(%(thread)d) %(asctime)s %(message)s" + handler_mock.addFilter.assert_not_called() + + +@pytest.mark.parametrize("colour", (True, False)) +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +def test_watcher_task_injection( + mocker, logger_mocker, colour, thread, task +) -> None: + _assert_task_injection(False, mocker, logger_mocker, colour, thread, task) + + +@pytest.mark.parametrize("colour", (True, False)) +@pytest.mark.parametrize("thread", (True, False)) +@pytest.mark.parametrize("task", (True, False)) +@mark_async_test +async def test_async_watcher_task_injection( + mocker, logger_mocker, colour, thread, task +) -> None: + _assert_task_injection(True, mocker, logger_mocker, colour, thread, task) diff --git a/tests/unit/sync/io/test_direct.py b/tests/unit/sync/io/test_direct.py index 70c5478d..d54dfd73 100644 --- a/tests/unit/sync/io/test_direct.py +++ b/tests/unit/sync/io/test_direct.py @@ -53,6 +53,7 @@ def __init__(self, socket): self.socket = socket self.address = socket.getpeername() self.local_port = self.address[1] + self.connection_id = "bolt-1234" @property def is_reset(self):