@@ -30,8 +30,8 @@ public class JavaNioTest {
30
30
31
31
@ BeforeEach
32
32
public void init () throws Exception {
33
- ConnectionFactory connectionFactory = new ConnectionFactory ();
34
- connectionFactory .useNio ();
33
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
34
+ .useNio ();
35
35
testConnection = connectionFactory .newConnection ();
36
36
}
37
37
@@ -46,8 +46,8 @@ public void tearDown() throws Exception {
46
46
@ Test
47
47
public void connection () throws Exception {
48
48
CountDownLatch latch = new CountDownLatch (1 );
49
- ConnectionFactory connectionFactory = new ConnectionFactory ();
50
- connectionFactory .useNio ();
49
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
50
+ .useNio ();
51
51
Connection connection = null ;
52
52
try {
53
53
connection = basicGetBasicConsume (connectionFactory , "nio.queue" , latch );
@@ -61,9 +61,9 @@ public void connection() throws Exception {
61
61
@ Test
62
62
public void twoConnections () throws IOException , TimeoutException , InterruptedException {
63
63
CountDownLatch latch = new CountDownLatch (2 );
64
- ConnectionFactory connectionFactory = new ConnectionFactory ();
65
- connectionFactory .useNio ();
66
- connectionFactory .setNioParams (new NioParams ().setNbIoThreads (4 ));
64
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
65
+ .useNio ()
66
+ .setNioParams (new NioParams ().setNbIoThreads (4 ));
67
67
Connection connection1 = null ;
68
68
Connection connection2 = null ;
69
69
try {
@@ -82,8 +82,8 @@ public void twoConnections() throws IOException, TimeoutException, InterruptedEx
82
82
public void twoConnectionsWithNioExecutor () throws IOException , TimeoutException , InterruptedException {
83
83
CountDownLatch latch = new CountDownLatch (2 );
84
84
ExecutorService nioExecutor = Executors .newFixedThreadPool (5 );
85
- ConnectionFactory connectionFactory = new ConnectionFactory ();
86
- connectionFactory .useNio ();
85
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
86
+ .useNio ();
87
87
Connection connection1 = null ;
88
88
Connection connection2 = null ;
89
89
try {
@@ -101,8 +101,8 @@ public void twoConnectionsWithNioExecutor() throws IOException, TimeoutException
101
101
102
102
@ Test
103
103
public void shutdownListenerCalled () throws IOException , TimeoutException , InterruptedException {
104
- ConnectionFactory connectionFactory = new ConnectionFactory ();
105
- connectionFactory .useNio ();
104
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
105
+ .useNio ();
106
106
Connection connection = connectionFactory .newConnection ();
107
107
try {
108
108
final CountDownLatch latch = new CountDownLatch (1 );
@@ -122,8 +122,8 @@ public void shutdownCompleted(ShutdownSignalException cause) {
122
122
123
123
@ Test
124
124
public void nioLoopCleaning () throws Exception {
125
- ConnectionFactory connectionFactory = new ConnectionFactory ();
126
- connectionFactory .useNio ();
125
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
126
+ .useNio ();
127
127
for (int i = 0 ; i < 10 ; i ++) {
128
128
Connection connection = connectionFactory .newConnection ();
129
129
connection .abort ();
@@ -139,20 +139,20 @@ public void messageSize() throws Exception {
139
139
140
140
@ Test
141
141
public void byteBufferFactory () throws Exception {
142
- ConnectionFactory cf = new ConnectionFactory ();
143
- cf .useNio ();
142
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
143
+ .useNio ();
144
144
int baseCapacity = 32768 ;
145
145
NioParams nioParams = new NioParams ();
146
146
nioParams .setReadByteBufferSize (baseCapacity / 2 );
147
147
nioParams .setWriteByteBufferSize (baseCapacity / 4 );
148
148
List <ByteBuffer > byteBuffers = new CopyOnWriteArrayList <>();
149
- cf .setNioParams (nioParams .setByteBufferFactory (new DefaultByteBufferFactory (capacity -> {
149
+ connectionFactory .setNioParams (nioParams .setByteBufferFactory (new DefaultByteBufferFactory (capacity -> {
150
150
ByteBuffer bb = ByteBuffer .allocate (capacity );
151
151
byteBuffers .add (bb );
152
152
return bb ;
153
153
})));
154
154
155
- try (Connection c = cf .newConnection ()) {
155
+ try (Connection c = connectionFactory .newConnection ()) {
156
156
sendAndVerifyMessage (c , 100 );
157
157
}
158
158
@@ -165,27 +165,27 @@ public void byteBufferFactory() throws Exception {
165
165
166
166
@ Test
167
167
public void directByteBuffers () throws Exception {
168
- ConnectionFactory cf = new ConnectionFactory ();
169
- cf .useNio ();
170
- cf .setNioParams (new NioParams ().setByteBufferFactory (new DefaultByteBufferFactory (capacity -> ByteBuffer .allocateDirect (capacity ))));
171
- try (Connection c = cf .newConnection ()) {
168
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
169
+ .useNio ()
170
+ .setNioParams (new NioParams ().setByteBufferFactory (new DefaultByteBufferFactory (capacity -> ByteBuffer .allocateDirect (capacity ))));
171
+ try (Connection c = connectionFactory .newConnection ()) {
172
172
sendAndVerifyMessage (c , 100 );
173
173
}
174
174
}
175
175
176
176
@ Test
177
177
public void customWriteQueue () throws Exception {
178
- ConnectionFactory cf = new ConnectionFactory ();
179
- cf .useNio ();
180
178
AtomicInteger count = new AtomicInteger (0 );
181
- cf .setNioParams (new NioParams ().setWriteQueueFactory (ctx -> {
182
- count .incrementAndGet ();
183
- return new BlockingQueueNioQueue (
184
- new LinkedBlockingQueue <>(ctx .getNioParams ().getWriteQueueCapacity ()),
185
- ctx .getNioParams ().getWriteEnqueuingTimeoutInMs ()
186
- );
187
- }));
188
- try (Connection c = cf .newConnection ()) {
179
+ ConnectionFactory connectionFactory = new ConnectionFactory ()
180
+ .useNio ()
181
+ .setNioParams (new NioParams ().setWriteQueueFactory (ctx -> {
182
+ count .incrementAndGet ();
183
+ return new BlockingQueueNioQueue (
184
+ new LinkedBlockingQueue <>(ctx .getNioParams ().getWriteQueueCapacity ()),
185
+ ctx .getNioParams ().getWriteEnqueuingTimeoutInMs ()
186
+ );
187
+ }));
188
+ try (Connection c = connectionFactory .newConnection ()) {
189
189
sendAndVerifyMessage (c , 100 );
190
190
}
191
191
assertEquals (1 , count .get ());
0 commit comments