24
24
import java .net .ServerSocket ;
25
25
import java .net .Socket ;
26
26
import java .nio .ByteBuffer ;
27
- import java .nio .channels .AsynchronousFileChannel ;
28
- import java .nio .channels .Channel ;
29
- import java .nio .channels .CompletionHandler ;
30
- import java .nio .channels .FileLock ;
31
- import java .nio .channels .OverlappingFileLockException ;
27
+ import java .nio .channels .*;
32
28
import java .nio .file .FileSystems ;
33
29
import java .nio .file .Files ;
34
30
import java .nio .file .Path ;
49
45
import java .util .Objects ;
50
46
import java .util .Optional ;
51
47
import java .util .UUID ;
52
- import java .util .concurrent .Phaser ;
53
- import java .util .concurrent .TimeUnit ;
48
+ import java .util .concurrent .*;
54
49
import java .util .concurrent .atomic .AtomicBoolean ;
50
+ import java .util .concurrent .atomic .AtomicInteger ;
55
51
import java .util .concurrent .locks .Lock ;
56
52
import java .util .concurrent .locks .ReentrantLock ;
57
53
@@ -87,6 +83,8 @@ public class EmbeddedPostgres implements Closeable
87
83
private static final String PG_SUPERUSER = "postgres" ;
88
84
private static final Duration DEFAULT_PG_STARTUP_WAIT = Duration .ofSeconds (10 );
89
85
private static final String LOCK_FILE_NAME = "epg-lock" ;
86
+ private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor ) Executors .newCachedThreadPool ();
87
+ private static final AtomicInteger active = new AtomicInteger (0 );
90
88
91
89
private final File pgDir ;
92
90
@@ -108,6 +106,8 @@ public class EmbeddedPostgres implements Closeable
108
106
private final ProcessBuilder .Redirect errorRedirector ;
109
107
private final ProcessBuilder .Redirect outputRedirector ;
110
108
109
+ private Process pgProcess ;
110
+
111
111
EmbeddedPostgres (File parentDirectory , File dataDirectory , boolean cleanDataDirectory ,
112
112
Map <String , String > postgresConfig , Map <String , String > localeConfig , int port , Map <String , String > connectConfig ,
113
113
PgBinaryResolver pgBinaryResolver , ProcessBuilder .Redirect errorRedirector , ProcessBuilder .Redirect outputRedirector ) throws IOException
@@ -237,16 +237,19 @@ private void lock() throws IOException
237
237
}
238
238
}
239
239
240
- private void initdb ()
241
- {
240
+ private void initdb () throws IOException {
242
241
final StopWatch watch = new StopWatch ();
243
242
watch .start ();
244
243
List <String > args = new ArrayList <>();
245
244
args .addAll (Arrays .asList (
246
245
"-A" , "trust" , "-U" , PG_SUPERUSER ,
247
246
"-D" , dataDirectory .getPath (), "-E" , "UTF-8" ));
248
247
args .addAll (createLocaleOptions ());
249
- system (INIT_DB , args );
248
+ try {
249
+ system (INIT_DB , args , true , true ).exit .get ();
250
+ } catch (InterruptedException | ExecutionException e ) {
251
+ throw new IOException (e .getMessage ());
252
+ }
250
253
LOG .info ("{} initdb completed in {}" , instanceId , watch );
251
254
}
252
255
@@ -262,16 +265,13 @@ private void startPostmaster() throws IOException
262
265
args .addAll (Arrays .asList ("-D" , dataDirectory .getPath ()));
263
266
args .addAll (createInitOptions ());
264
267
265
- final ProcessBuilder builder = new ProcessBuilder ();
266
- POSTGRES .applyTo (builder , args );
268
+ SystemResult result = system (POSTGRES , args );
267
269
268
- builder .redirectErrorStream (true );
269
- builder .redirectError (errorRedirector );
270
- builder .redirectOutput (outputRedirector );
271
- final Process postmaster = builder .start ();
272
-
273
- if (outputRedirector .type () == ProcessBuilder .Redirect .Type .PIPE ) {
274
- ProcessOutputLogger .logOutput (LOG , postmaster , POSTGRES .processName ());
270
+ final Process postmaster ;
271
+ try {
272
+ postmaster = result .initProcess .get ();
273
+ } catch (InterruptedException | ExecutionException e ) {
274
+ throw new IOException (e .getMessage ());
275
275
}
276
276
277
277
LOG .info ("{} postmaster started as {} on port {}. Waiting up to {} for server startup to finish." , instanceId , postmaster .toString (), port , pgStartupWait );
@@ -311,7 +311,7 @@ private void waitForServerStartup(StopWatch watch) throws IOException
311
311
Throwable lastCause = null ;
312
312
final long start = System .nanoTime ();
313
313
final long maxWaitNs = TimeUnit .NANOSECONDS .convert (pgStartupWait .toMillis (), TimeUnit .MILLISECONDS );
314
- while (System .nanoTime () - start < maxWaitNs ) {
314
+ while (System .nanoTime () - start < ( maxWaitNs * Math . max ( THREAD_POOL . getActiveCount (), 1 )) ) {
315
315
try {
316
316
verifyReady ();
317
317
LOG .info ("{} postmaster startup finished in {}" , instanceId , watch );
@@ -328,7 +328,7 @@ private void waitForServerStartup(StopWatch watch) throws IOException
328
328
return ;
329
329
}
330
330
}
331
- throw new IOException ("Gave up waiting for server to start after " + pgStartupWait .toMillis () + "ms" , lastCause );
331
+ throw new IOException ("Gave up waiting for " + instanceId + " server to start after " + ( pgStartupWait .toMillis () * Math . max ( THREAD_POOL . getActiveCount (), 1 ) ) + "ms" , lastCause );
332
332
}
333
333
334
334
private void verifyReady () throws SQLException
@@ -383,7 +383,8 @@ public void close() throws IOException
383
383
final StopWatch watch = new StopWatch ();
384
384
watch .start ();
385
385
try {
386
- pgCtl (dataDirectory , "stop" );
386
+ if (pgProcess != null )
387
+ pgProcess .destroy ();
387
388
LOG .info ("{} shut down postmaster in {}" , instanceId , watch );
388
389
} catch (final Exception e ) {
389
390
LOG .error ("Could not stop postmaster " + instanceId , e );
@@ -408,15 +409,18 @@ public void close() throws IOException
408
409
}
409
410
}
410
411
411
- private void pgCtl (File dir , String action )
412
- {
412
+ private void pgCtl (File dir , String action ) throws IOException {
413
413
final List <String > args = new ArrayList <>();
414
414
args .addAll (Arrays .asList (
415
415
"-D" , dir .getPath (), action ,
416
416
"-m" , PG_STOP_MODE , "-t" ,
417
417
PG_STOP_WAIT_S , "-w"
418
418
));
419
- system (PG_CTL , args );
419
+ try {
420
+ system (PG_CTL , args , true ).exit .get ();
421
+ } catch (InterruptedException | ExecutionException e ) {
422
+ throw new IOException (e .getMessage ());
423
+ }
420
424
}
421
425
422
426
private void cleanOldDataDirectories (File parentDirectory )
@@ -610,29 +614,121 @@ public int hashCode() {
610
614
}
611
615
}
612
616
613
- private void system (Command command , List <String > args )
617
+ private int systemThread (ProcessBuilder builder , Command command , SystemResult result , boolean retry , boolean clean ) {
618
+ AtomicInteger exit = new AtomicInteger (-1 );
619
+ int retries = 0 ;
620
+ while (exit .get () != 0 ) {
621
+ final Process [] process = new Process [1 ];
622
+ CompletableFuture <Process > initProcess ;
623
+ if (retries == 0 ) {
624
+ initProcess = result .initProcess ;
625
+ } else {
626
+ initProcess = new CompletableFuture <>();
627
+ }
628
+ int lastActive = THREAD_POOL .getActiveCount ();
629
+ if (lastActive > active .get ())
630
+ active .set (lastActive );
631
+ Callable <Process > task = () -> {
632
+ try {
633
+ process [0 ] = builder .start ();
634
+ pgProcess = process [0 ];
635
+ result .process = process [0 ];
636
+ result .initProcess = initProcess ;
637
+ initProcess .complete (process [0 ]);
638
+
639
+ if (outputRedirector .type () == ProcessBuilder .Redirect .Type .PIPE ) {
640
+ ProcessOutputLogger .logOutput (LOG , process [0 ], command .processName ());
641
+ }
642
+ return process [0 ];
643
+ } catch (IOException e ) {
644
+ initProcess .completeExceptionally (e );
645
+ throw e ;
646
+ }
647
+ };
648
+ Future <Process > thread = THREAD_POOL .submit (task );
649
+ if (retry ) {
650
+ try {
651
+ exit .set (thread .get ().waitFor ());
652
+ } catch (InterruptedException | ExecutionException e ) {
653
+ e .printStackTrace ();
654
+ }
655
+ if (0 != exit .get ()) {
656
+ LOG .info ("Active threads running {}" , THREAD_POOL .getActiveCount ());
657
+ int currentActive = THREAD_POOL .getActiveCount ();
658
+ if (currentActive >= lastActive )
659
+ lastActive = currentActive ;
660
+ if (lastActive > active .get ())
661
+ active .set (lastActive );
662
+ if (lastActive >= active .get () - 1 && active .get () > 0 && THREAD_POOL .getActiveCount () <= THREAD_POOL .getPoolSize ()) {
663
+ THREAD_POOL .setMaximumPoolSize (active .decrementAndGet ());
664
+ LOG .info ("Reduced thread pool size to {}" , active .get ());
665
+ }
666
+ retries ++;
667
+ if (clean ) {
668
+ try {
669
+ FileUtils .cleanDirectory (dataDirectory );
670
+ } catch (IOException e ) {
671
+ LOG .error ("Could not clean up directory {} for retry" , dataDirectory .getAbsolutePath ());
672
+ result .initProcess .completeExceptionally (e );
673
+ result .exit .completeExceptionally (e );
674
+ break ;
675
+ }
676
+ }
677
+ }
678
+ } else {
679
+ try {
680
+ thread .wait ();
681
+ } catch (InterruptedException e ) {
682
+ result .initProcess .completeExceptionally (new IOException ());
683
+ result .exit .completeExceptionally (new IOException ("Failed to execute: " + command .processName ()));
684
+ e .printStackTrace ();
685
+ break ;
686
+ }
687
+ }
688
+
689
+ if (retries >= 10 ) {
690
+ result .initProcess .completeExceptionally (new IOException ());
691
+ result .exit .completeExceptionally (new IOException ("Failed to execute: " + command .processName () + ", too many failures." ));
692
+ break ;
693
+ }
694
+
695
+ if (!retry )
696
+ break ;
697
+ }
698
+ return exit .get ();
699
+ }
700
+
701
+ private SystemResult system (Command command , List <String > args , boolean retry , boolean clean )
614
702
{
615
- try {
616
- final ProcessBuilder builder = new ProcessBuilder ();
703
+ final ProcessBuilder builder = new ProcessBuilder ();
617
704
618
- command .applyTo (builder , args );
619
- builder .redirectErrorStream (true );
620
- builder .redirectError (errorRedirector );
621
- builder .redirectOutput (outputRedirector );
705
+ command .applyTo (builder , args );
706
+ builder .redirectErrorStream (true );
707
+ builder .redirectError (errorRedirector );
708
+ builder .redirectOutput (outputRedirector );
622
709
623
- final Process process = builder .start ();
710
+ SystemResult result = new SystemResult ();
711
+ result .initProcess = new CompletableFuture <>();
712
+ result .builder = builder ;
713
+ result .exit = CompletableFuture .supplyAsync (() -> systemThread (builder , command , result , retry , clean ));
714
+ return result ;
715
+ }
624
716
625
- if (outputRedirector .type () == ProcessBuilder .Redirect .Type .PIPE ) {
626
- ProcessOutputLogger .logOutput (LOG , process , command .processName ());
627
- }
628
- if (0 != process .waitFor ()) {
629
- throw new IllegalStateException (String .format ("Process %s failed" , builder .command ()));
630
- }
631
- } catch (final RuntimeException e ) { // NOPMD
632
- throw e ;
633
- } catch (final Exception e ) {
634
- throw new RuntimeException (e );
635
- }
717
+ private SystemResult system (Command command , List <String > args , boolean retry )
718
+ {
719
+ return system (command , args , retry , false );
720
+ }
721
+
722
+ private SystemResult system (Command command , List <String > args )
723
+ {
724
+ return system (command , args , false , false );
725
+ }
726
+
727
+ private class SystemResult {
728
+ ProcessBuilder builder ;
729
+ CompletableFuture <Process > initProcess ;
730
+ Process process ;
731
+ CompletableFuture <Integer > exit ;
636
732
}
637
733
638
734
private static void mkdirs (File dir )
0 commit comments