9
9
import asyncpg
10
10
import collections
11
11
import collections .abc
12
+ import contextlib
12
13
import functools
13
14
import itertools
14
15
import inspect
@@ -226,10 +227,9 @@ def add_query_logger(self, callback):
226
227
"""Add a logger that will be called when queries are executed.
227
228
228
229
:param callable callback:
229
- A callable or a coroutine function receiving two arguments:
230
- **connection**: a Connection the callback is registered with.
231
- **query**: a LoggedQuery containing the query, args, timeout, and
232
- elapsed.
230
+ A callable or a coroutine function receiving one argument:
231
+ **record**: a LoggedQuery containing `query`, `args`, `timeout`,
232
+ `elapsed`, `addr`, `params`, and `exception`.
233
233
234
234
.. versionadded:: 0.29.0
235
235
"""
@@ -339,9 +339,8 @@ async def execute(self, query: str, *args, timeout: float=None) -> str:
339
339
self ._check_open ()
340
340
341
341
if not args :
342
- with utils . timer () as t :
342
+ with self . _time_and_log ( query , args , timeout ) :
343
343
result = await self ._protocol .query (query , timeout )
344
- self ._log_query (query , args , timeout , t .elapsed )
345
344
return result
346
345
347
346
_ , status , _ = await self ._execute (
@@ -1412,6 +1411,7 @@ def _cleanup(self):
1412
1411
self ._mark_stmts_as_closed ()
1413
1412
self ._listeners .clear ()
1414
1413
self ._log_listeners .clear ()
1414
+ self ._query_loggers .clear ()
1415
1415
self ._clean_tasks ()
1416
1416
1417
1417
def _clean_tasks (self ):
@@ -1695,15 +1695,15 @@ async def _execute(
1695
1695
)
1696
1696
return result
1697
1697
1698
+ @contextlib .contextmanager
1698
1699
def logger (self , callback ):
1699
1700
"""Context manager that adds `callback` to the list of query loggers,
1700
1701
and removes it upon exit.
1701
1702
1702
1703
:param callable callback:
1703
- A callable or a coroutine function receiving two arguments:
1704
- **connection**: a Connection the callback is registered with.
1705
- **query**: a LoggedQuery containing the query, args, timeout, and
1706
- elapsed.
1704
+ A callable or a coroutine function receiving one argument:
1705
+ **record**: a LoggedQuery containing `query`, `args`, `timeout`,
1706
+ `elapsed`, `addr`, and `params`.
1707
1707
1708
1708
Example:
1709
1709
@@ -1721,18 +1721,35 @@ def __call__(self, conn, record):
1721
1721
1722
1722
.. versionadded:: 0.29.0
1723
1723
"""
1724
- return _LoggingContext (self , callback )
1725
-
1726
- def _log_query (self , query , args , timeout , elapsed ):
1727
- if not self ._query_loggers :
1728
- return
1729
- con_ref = self ._unwrap ()
1730
- record = LoggedQuery (query , args , timeout , elapsed )
1731
- for cb in self ._query_loggers :
1732
- if cb .is_async :
1733
- self ._loop .create_task (cb .cb (con_ref , record ))
1734
- else :
1735
- self ._loop .call_soon (cb .cb , con_ref , record )
1724
+ self .add_query_logger (callback )
1725
+ yield callback
1726
+ self .remove_query_logger (callback )
1727
+
1728
+ @contextlib .contextmanager
1729
+ def _time_and_log (self , query , args , timeout ):
1730
+ start = time .monotonic ()
1731
+ exception = None
1732
+ try :
1733
+ yield
1734
+ except Exception as ex :
1735
+ exception = ex
1736
+ raise
1737
+ finally :
1738
+ elapsed = time .monotonic () - start
1739
+ record = LoggedQuery (
1740
+ query = query ,
1741
+ args = args ,
1742
+ timeout = timeout ,
1743
+ elapsed = elapsed ,
1744
+ addr = self ._addr ,
1745
+ params = self ._params ,
1746
+ exception = exception ,
1747
+ )
1748
+ for cb in self ._query_loggers :
1749
+ if cb .is_async :
1750
+ self ._loop .create_task (cb .cb (record ))
1751
+ else :
1752
+ self ._loop .call_soon (cb .cb , record )
1736
1753
1737
1754
async def __execute (
1738
1755
self ,
@@ -1748,25 +1765,23 @@ async def __execute(
1748
1765
executor = lambda stmt , timeout : self ._protocol .bind_execute (
1749
1766
stmt , args , '' , limit , return_status , timeout )
1750
1767
timeout = self ._protocol ._get_timeout (timeout )
1751
- with utils . timer () as t :
1768
+ with self . _time_and_log ( query , args , timeout ) :
1752
1769
result , stmt = await self ._do_execute (
1753
1770
query ,
1754
1771
executor ,
1755
1772
timeout ,
1756
1773
record_class = record_class ,
1757
1774
ignore_custom_codec = ignore_custom_codec ,
1758
1775
)
1759
- self ._log_query (query , args , timeout , t .elapsed )
1760
1776
return result , stmt
1761
1777
1762
1778
async def _executemany (self , query , args , timeout ):
1763
1779
executor = lambda stmt , timeout : self ._protocol .bind_execute_many (
1764
1780
stmt , args , '' , timeout )
1765
1781
timeout = self ._protocol ._get_timeout (timeout )
1766
1782
with self ._stmt_exclusive_section :
1767
- with utils . timer () as t :
1783
+ with self . _time_and_log ( query , args , timeout ) :
1768
1784
result , _ = await self ._do_execute (query , executor , timeout )
1769
- self ._log_query (query , args , timeout , t .elapsed )
1770
1785
return result
1771
1786
1772
1787
async def _do_execute (
@@ -2401,25 +2416,10 @@ class _ConnectionProxy:
2401
2416
2402
2417
LoggedQuery = collections .namedtuple (
2403
2418
'LoggedQuery' ,
2404
- ['query' , 'args' , 'timeout' , 'elapsed' ])
2419
+ ['query' , 'args' , 'timeout' , 'elapsed' , 'exception' , 'addr' , 'params' ])
2405
2420
LoggedQuery .__doc__ = 'Log record of an executed query.'
2406
2421
2407
2422
2408
- class _LoggingContext :
2409
- __slots__ = ('_conn' , '_cb' )
2410
-
2411
- def __init__ (self , conn , callback ):
2412
- self ._conn = conn
2413
- self ._cb = callback
2414
-
2415
- def __enter__ (self ):
2416
- self ._conn .add_query_logger (self ._cb )
2417
- return self ._cb
2418
-
2419
- def __exit__ (self , * exc_info ):
2420
- self ._conn .remove_query_logger (self ._cb )
2421
-
2422
-
2423
2423
ServerCapabilities = collections .namedtuple (
2424
2424
'ServerCapabilities' ,
2425
2425
['advisory_locks' , 'notifications' , 'plpgsql' , 'sql_reset' ,
0 commit comments