Skip to content

Commit bfe6125

Browse files
committed
Fix AbstractConsumerSeekAwareTests for race condition
Looks like there is some delay between calling seek and actual consume. * Just increase latch timeout for 30 second
1 parent 1ab37ab commit bfe6125

File tree

1 file changed

+36
-21
lines changed

1 file changed

+36
-21
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
/**
5555
* @author Borahm Lee
56+
* @author Artem Bilan
5657
* @since 3.3
5758
*/
5859
@DirtiesContext
@@ -73,48 +74,59 @@ class AbstractConsumerSeekAwareTests {
7374

7475
@Test
7576
public void checkCallbacksAndTopicPartitions() {
76-
await().timeout(Duration.ofSeconds(5)).untilAsserted(() -> {
77-
Map<ConsumerSeekCallback, List<TopicPartition>> callbacksAndTopics = multiGroupListener.getCallbacksAndTopics();
78-
Set<ConsumerSeekCallback> registeredCallbacks = callbacksAndTopics.keySet();
79-
Set<TopicPartition> registeredTopicPartitions = callbacksAndTopics.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
80-
81-
Map<TopicPartition, List<ConsumerSeekCallback>> topicsAndCallbacks = multiGroupListener.getTopicsAndCallbacks();
82-
Set<TopicPartition> getTopicPartitions = topicsAndCallbacks.keySet();
83-
Set<ConsumerSeekCallback> getCallbacks = topicsAndCallbacks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
84-
85-
assertThat(registeredCallbacks).containsExactlyInAnyOrderElementsOf(getCallbacks).isNotEmpty();
86-
assertThat(registeredTopicPartitions).containsExactlyInAnyOrderElementsOf(getTopicPartitions).hasSize(3);
87-
});
77+
await().timeout(Duration.ofSeconds(5))
78+
.untilAsserted(() -> {
79+
Map<ConsumerSeekCallback, List<TopicPartition>> callbacksAndTopics =
80+
multiGroupListener.getCallbacksAndTopics();
81+
Set<ConsumerSeekCallback> registeredCallbacks = callbacksAndTopics.keySet();
82+
Set<TopicPartition> registeredTopicPartitions =
83+
callbacksAndTopics.values()
84+
.stream()
85+
.flatMap(Collection::stream)
86+
.collect(Collectors.toSet());
87+
88+
Map<TopicPartition, List<ConsumerSeekCallback>> topicsAndCallbacks =
89+
multiGroupListener.getTopicsAndCallbacks();
90+
Set<TopicPartition> getTopicPartitions = topicsAndCallbacks.keySet();
91+
Set<ConsumerSeekCallback> getCallbacks =
92+
topicsAndCallbacks.values()
93+
.stream()
94+
.flatMap(Collection::stream)
95+
.collect(Collectors.toSet());
96+
97+
assertThat(registeredCallbacks).containsExactlyInAnyOrderElementsOf(getCallbacks).isNotEmpty();
98+
assertThat(registeredTopicPartitions).containsExactlyInAnyOrderElementsOf(getTopicPartitions);
99+
});
88100
}
89101

90102
@Test
91103
void seekForAllGroups() throws Exception {
92104
template.send(TOPIC, "test-data");
93105
template.send(TOPIC, "test-data");
94-
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
95-
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
106+
assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue();
107+
assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue();
96108

97109
MultiGroupListener.latch1 = new CountDownLatch(2);
98110
MultiGroupListener.latch2 = new CountDownLatch(2);
99111

100112
multiGroupListener.seekToBeginning();
101-
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
102-
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
113+
assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue();
114+
assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue();
103115
}
104116

105117
@Test
106118
void seekForSpecificGroup() throws Exception {
107119
template.send(TOPIC, "test-data");
108120
template.send(TOPIC, "test-data");
109-
assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
110-
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
121+
assertThat(MultiGroupListener.latch1.await(30, TimeUnit.SECONDS)).isTrue();
122+
assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue();
111123

112124
MultiGroupListener.latch1 = new CountDownLatch(2);
113125
MultiGroupListener.latch2 = new CountDownLatch(2);
114126

115127
multiGroupListener.seekToBeginningForGroup("group2");
116-
assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
117-
assertThat(MultiGroupListener.latch1.await(100, TimeUnit.MICROSECONDS)).isFalse();
128+
assertThat(MultiGroupListener.latch2.await(30, TimeUnit.SECONDS)).isTrue();
129+
assertThat(MultiGroupListener.latch1.await(1, TimeUnit.SECONDS)).isFalse();
118130
assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2);
119131
}
120132

@@ -128,7 +140,8 @@ static class Config {
128140
@Bean
129141
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
130142
ConsumerFactory<String, String> consumerFactory) {
131-
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
143+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
144+
new ConcurrentKafkaListenerContainerFactory<>();
132145
factory.setConsumerFactory(consumerFactory);
133146
return factory;
134147
}
@@ -172,7 +185,9 @@ void seekToBeginningForGroup(String groupIdForSeek) {
172185
}
173186
});
174187
}
188+
175189
}
190+
176191
}
177192

178193
}

0 commit comments

Comments
 (0)