@@ -77,6 +77,7 @@ def __init__(self, data_dir, *, pg_config_path=None):
77
77
self ._daemon_pid = None
78
78
self ._daemon_process = None
79
79
self ._connection_addr = None
80
+ self ._connection_spec_override = None
80
81
81
82
def is_managed (self ):
82
83
return True
@@ -111,9 +112,9 @@ def get_status(self):
111
112
process .returncode , stderr ))
112
113
113
114
async def connect (self , loop = None , ** kwargs ):
114
- conn_addr = self .get_connection_addr ()
115
- return await asyncpg . connect (
116
- host = conn_addr [ 0 ], port = conn_addr [ 1 ], loop = loop , ** kwargs )
115
+ conn_info = self .get_connection_spec ()
116
+ conn_info . update ( kwargs )
117
+ return await asyncpg . connect ( loop = loop , ** conn_info )
117
118
118
119
def init (self , ** settings ):
119
120
"""Initialize cluster."""
@@ -130,14 +131,16 @@ def init(self, **settings):
130
131
131
132
process = subprocess .run (
132
133
[self ._pg_ctl , 'init' , '-D' , self ._data_dir ] + extra_args ,
133
- stdout = subprocess .PIPE , stderr = subprocess .PIPE )
134
+ stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
134
135
135
- stderr = process .stderr
136
+ output = process .stdout
136
137
137
138
if process .returncode != 0 :
138
139
raise ClusterError (
139
- 'pg_ctl init exited with status {:d}: {}' .format (
140
- process .returncode , stderr .decode ()))
140
+ 'pg_ctl init exited with status {:d}:\n {}' .format (
141
+ process .returncode , output .decode ()))
142
+
143
+ return output .decode ()
141
144
142
145
def start (self , wait = 60 , * , server_settings = {}, ** opts ):
143
146
"""Start the cluster."""
@@ -213,15 +216,27 @@ def destroy(self):
213
216
else :
214
217
raise ClusterError ('cannot destroy {} cluster' .format (status ))
215
218
216
- def get_connection_addr (self ):
219
+ def _get_connection_spec (self ):
220
+ if self ._connection_addr is None :
221
+ self ._connection_addr = self ._connection_addr_from_pidfile ()
222
+
223
+ if self ._connection_addr is not None :
224
+ if self ._connection_spec_override :
225
+ args = self ._connection_addr .copy ()
226
+ args .update (self ._connection_spec_override )
227
+ return args
228
+ else :
229
+ return self ._connection_addr
230
+
231
+ def get_connection_spec (self ):
217
232
status = self .get_status ()
218
233
if status != 'running' :
219
234
raise ClusterError ('cluster is not running' )
220
235
221
- if self ._connection_addr is None :
222
- self ._connection_addr = self ._connection_addr_from_pidfile ()
236
+ return self ._get_connection_spec ()
223
237
224
- return self ._connection_addr ['host' ], self ._connection_addr ['port' ]
238
+ def override_connection_spec (self , ** kwargs ):
239
+ self ._connection_spec_override = kwargs
225
240
226
241
def reset_hba (self ):
227
242
"""Remove all records from pg_hba.conf."""
@@ -345,9 +360,8 @@ def _test_connection(self, timeout=60):
345
360
try :
346
361
for i in range (timeout ):
347
362
if self ._connection_addr is None :
348
- self ._connection_addr = \
349
- self ._connection_addr_from_pidfile ()
350
- if self ._connection_addr is None :
363
+ conn_spec = self ._get_connection_spec ()
364
+ if conn_spec is None :
351
365
time .sleep (1 )
352
366
continue
353
367
@@ -441,21 +455,14 @@ def __init__(self, *,
441
455
442
456
443
457
class RunningCluster (Cluster ):
444
- def __init__ (self , host = None , port = None ):
445
- if host is None :
446
- host = os .environ .get ('PGHOST' ) or 'localhost'
447
-
448
- if port is None :
449
- port = os .environ .get ('PGPORT' ) or 5432
450
-
451
- self .host = host
452
- self .port = port
458
+ def __init__ (self , ** kwargs ):
459
+ self .conn_spec = kwargs
453
460
454
461
def is_managed (self ):
455
462
return False
456
463
457
- def get_connection_addr (self ):
458
- return self .host , self . port
464
+ def get_connection_spec (self ):
465
+ return self .conn_spec
459
466
460
467
def get_status (self ):
461
468
return 'running'
0 commit comments