25
25
import java .util .concurrent .TimeUnit ;
26
26
import java .util .stream .Stream ;
27
27
28
+ import org .apache .commons .logging .Log ;
29
+ import org .apache .commons .logging .LogFactory ;
28
30
import org .apache .kafka .clients .consumer .ConsumerConfig ;
29
31
import org .apache .kafka .clients .consumer .ConsumerRebalanceListener ;
30
32
import org .apache .kafka .clients .producer .ProducerConfig ;
98
100
@ DirtiesContext
99
101
public class KafkaDslTests {
100
102
103
+ private static final Log log = LogFactory .getLog (KafkaDslTests .class );
104
+
101
105
static final String TEST_TOPIC1 = "test-topic1" ;
102
106
103
107
static final String TEST_TOPIC2 = "test-topic2" ;
@@ -179,6 +183,7 @@ void testKafkaAdapters() throws Exception {
179
183
for (int i = 0 ; i < 100 ; i ++) {
180
184
Message <?> receive = this .listeningFromKafkaResults1 .receive (20000 );
181
185
assertThat (receive ).isNotNull ();
186
+ log .warn ("Received '%s' for index '%d'" .formatted (receive , i ));
182
187
assertThat (receive .getPayload ()).isEqualTo ("FOO" );
183
188
MessageHeaders headers = receive .getHeaders ();
184
189
assertThat (headers .containsKey (KafkaHeaders .ACKNOWLEDGMENT )).isTrue ();
@@ -196,6 +201,7 @@ void testKafkaAdapters() throws Exception {
196
201
for (int i = 0 ; i < 100 ; i ++) {
197
202
Message <?> receive = this .listeningFromKafkaResults2 .receive (20000 );
198
203
assertThat (receive ).isNotNull ();
204
+ log .warn ("Received '%s' for index '%d'" .formatted (receive , i ));
199
205
assertThat (receive .getPayload ()).isEqualTo ("FOO" );
200
206
MessageHeaders headers = receive .getHeaders ();
201
207
assertThat (headers .containsKey (KafkaHeaders .ACKNOWLEDGMENT )).isTrue ();
0 commit comments