@@ -46,8 +46,8 @@ class Connection(metaclass=ConnectionMeta):
46
46
'_listeners' , '_server_version' , '_server_caps' ,
47
47
'_intro_query' , '_reset_query' , '_proxy' ,
48
48
'_stmt_exclusive_section' , '_config' , '_params' , '_addr' ,
49
- '_log_listeners' , '_cancellations ' , '_source_traceback ' ,
50
- '__weakref__' )
49
+ '_log_listeners' , '_cleanup_listeners ' , '_cancellations ' ,
50
+ '_source_traceback' , ' __weakref__' )
51
51
52
52
def __init__ (self , protocol , transport , loop ,
53
53
addr : (str , int ) or str ,
@@ -78,6 +78,7 @@ def __init__(self, protocol, transport, loop,
78
78
self ._listeners = {}
79
79
self ._log_listeners = set ()
80
80
self ._cancellations = set ()
81
+ self ._cleanup_listeners = set ()
81
82
82
83
settings = self ._protocol .get_settings ()
83
84
ver_string = settings .server_version
@@ -178,6 +179,19 @@ def remove_log_listener(self, callback):
178
179
"""
179
180
self ._log_listeners .discard (callback )
180
181
182
+ def add_close_listener (self , callback ):
183
+ """Add a listener that will be called when the the connection is closing.
184
+
185
+ :param callable callback:
186
+ A callable receiving one argument:
187
+ **connection**: a Connection the callback is registered with.
188
+ """
189
+ self ._cleanup_listeners .add (callback )
190
+
191
+ def remove_close_listener (self , callback ):
192
+ """Remove a listening callback for the connection closing."""
193
+ self ._cleanup_listeners .discard (callback )
194
+
181
195
def get_server_pid (self ):
182
196
"""Return the PID of the Postgres server the connection is bound to."""
183
197
return self ._protocol .get_server_pid ()
@@ -1120,6 +1134,7 @@ def _abort(self):
1120
1134
self ._protocol = None
1121
1135
1122
1136
def _cleanup (self ):
1137
+ self ._call_cleanup_listeners ()
1123
1138
# Free the resources associated with this connection.
1124
1139
# This must be called when a connection is terminated.
1125
1140
@@ -1237,6 +1252,23 @@ def _call_log_listener(self, cb, con_ref, message):
1237
1252
'exception' : ex
1238
1253
})
1239
1254
1255
+ def _call_cleanup_listeners (self ):
1256
+ if not self ._cleanup_listeners :
1257
+ return
1258
+
1259
+ con_ref = self ._unwrap ()
1260
+ for cb in self ._cleanup_listeners :
1261
+ try :
1262
+ cb (con_ref )
1263
+ except Exception as ex :
1264
+ self ._loop .call_exception_handler ({
1265
+ 'message' : 'Unhandled exception in asyncpg connection '
1266
+ 'connection closed callback {!r}' .format (cb ),
1267
+ 'exception' : ex
1268
+ })
1269
+
1270
+ self ._cleanup_listeners .clear ()
1271
+
1240
1272
def _process_notification (self , pid , channel , payload ):
1241
1273
if channel not in self ._listeners :
1242
1274
return
0 commit comments