37
37
import java .util .List ;
38
38
import java .util .Map ;
39
39
import java .util .concurrent .Executors ;
40
+ import java .util .concurrent .ScheduledExecutorService ;
41
+ import java .util .concurrent .ThreadFactory ;
40
42
import java .util .concurrent .atomic .AtomicBoolean ;
41
43
import java .util .concurrent .atomic .AtomicInteger ;
42
44
import java .util .concurrent .atomic .AtomicReference ;
@@ -62,7 +64,12 @@ public class RecoveryClusterTest {
62
64
EnvironmentBuilder environmentBuilder ;
63
65
static List <Level > logLevels ;
64
66
static List <Class <?>> logClasses =
65
- List .of (ProducersCoordinator .class , ConsumersCoordinator .class , StreamEnvironment .class );
67
+ List .of (
68
+ ProducersCoordinator .class ,
69
+ ConsumersCoordinator .class ,
70
+ StreamEnvironment .class ,
71
+ AsyncRetry .class );
72
+ ScheduledExecutorService scheduledExecutorService ;
66
73
67
74
@ BeforeAll
68
75
static void initAll () {
@@ -72,10 +79,17 @@ static void initAll() {
72
79
73
80
@ BeforeEach
74
81
void init (TestInfo info ) {
82
+ int availableProcessors = Runtime .getRuntime ().availableProcessors ();
83
+ LOGGER .info ("Available processors: {}" , availableProcessors );
84
+ ThreadFactory threadFactory =
85
+ new Utils .NamedThreadFactory ("rabbitmq-stream-environment-scheduler-" );
86
+ scheduledExecutorService =
87
+ Executors .newScheduledThreadPool (availableProcessors * 2 , threadFactory );
75
88
environmentBuilder =
76
89
Environment .builder ()
77
90
.recoveryBackOffDelayPolicy (BACK_OFF_DELAY_POLICY )
78
91
.topologyUpdateBackOffDelayPolicy (BACK_OFF_DELAY_POLICY )
92
+ .scheduledExecutorService (scheduledExecutorService )
79
93
.netty ()
80
94
.eventLoopGroup (eventLoopGroup )
81
95
.environmentBuilder ();
@@ -87,6 +101,9 @@ void tearDown() {
87
101
if (environment != null ) {
88
102
environment .close ();
89
103
}
104
+ if (scheduledExecutorService != null ) {
105
+ scheduledExecutorService .shutdownNow ();
106
+ }
90
107
}
91
108
92
109
@ AfterAll
@@ -104,6 +121,10 @@ static void tearDownAll() {
104
121
"true,false" ,
105
122
})
106
123
void clusterRestart (boolean useLoadBalancer , boolean forceLeader ) throws InterruptedException {
124
+ LOGGER .info (
125
+ "Cluster restart test, use load balancer {}, force leader {}" ,
126
+ useLoadBalancer ,
127
+ forceLeader );
107
128
int streamCount = 10 ;
108
129
int producerCount = streamCount * 2 ;
109
130
int consumerCount = streamCount * 2 ;
0 commit comments