Skip to content

Commit 5beb8fe

Browse files
authored
Fix testInvokeRecordInterceptorAllSkipped()
* When `AckMode.RECORD` and early is `false,` and the last execute method is `RecordInterceptor.afterRecord`, we missed a `CountDownLatch` in KMLCT#testInvokeRecordInterceptorAllSkipped that must wait for `RecordInterceptor.afterRecord` to complete. **Auto-cherry-pick to `3.1.x`**
1 parent 2ad8b06 commit 5beb8fe

File tree

1 file changed

+15
-7
lines changed

1 file changed

+15
-7
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,7 +2769,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
27692769
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
27702770
return null;
27712771
}).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class));
2772-
final CountDownLatch resumeLatch = new CountDownLatch(1);
27732772
ContainerProperties containerProps = new ContainerProperties("foo");
27742773
containerProps.setGroupId("grp");
27752774
containerProps.setAckMode(AckMode.RECORD);
@@ -2780,7 +2779,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
27802779
KafkaMessageListenerContainer<Integer, String> container =
27812780
new KafkaMessageListenerContainer<>(cf, containerProps);
27822781
container.start();
2783-
InOrder inOrder = inOrder(consumer);
27842782
assertThat(firstPoll.await(10, TimeUnit.SECONDS)).isNotNull();
27852783
container.pausePartition(tp0);
27862784
container.pausePartition(tp1);
@@ -2811,7 +2809,6 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
28112809
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
28122810
Consumer<Integer, String> consumer = mock(Consumer.class);
28132811
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2814-
AtomicBoolean first = new AtomicBoolean(true);
28152812
TopicPartition tp0 = new TopicPartition("foo", 0);
28162813
TopicPartition tp1 = new TopicPartition("foo", 1);
28172814
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
@@ -3462,7 +3459,6 @@ public void testCooperativeRebalance() throws Exception {
34623459
containerProps.setGroupId("grp");
34633460
containerProps.setClientId("clientId");
34643461
containerProps.setMessageListener((MessageListener<?, ?>) msg -> { });
3465-
Properties consumerProps = new Properties();
34663462
KafkaMessageListenerContainer<Integer, String> container =
34673463
new KafkaMessageListenerContainer<>(cf, containerProps);
34683464
container.start();
@@ -3606,7 +3602,6 @@ else if (call == 1) {
36063602
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
36073603
List<Map<TopicPartition, OffsetAndMetadata>> commits = new ArrayList<>();
36083604
AtomicBoolean firstCommit = new AtomicBoolean(true);
3609-
AtomicInteger commitCount = new AtomicInteger();
36103605
willAnswer(invoc -> {
36113606
commits.add(invoc.getArgument(0, Map.class));
36123607
if (!firstCommit.getAndSet(false)) {
@@ -3888,6 +3883,11 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
38883883
latch.countDown();
38893884
return null;
38903885
}).given(consumer).commitSync(any(), any());
3886+
CountDownLatch closeLatch = new CountDownLatch(1);
3887+
willAnswer(inv -> {
3888+
closeLatch.countDown();
3889+
return null;
3890+
}).given(consumer).close();
38913891
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
38923892
new TopicPartitionOffset("foo", 0) };
38933893

@@ -3902,6 +3902,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
39023902
containerProps.setKafkaAwareTransactionManager(mock(KafkaAwareTransactionManager.class));
39033903
}
39043904

3905+
CountDownLatch afterRecordLatch = new CountDownLatch(2);
39053906
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
39063907

39073908
@Override
@@ -3912,6 +3913,10 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39123913
return null;
39133914
}
39143915

3916+
public void afterRecord(ConsumerRecord<Integer, String> record, Consumer<Integer, String> consumer) {
3917+
afterRecordLatch.countDown();
3918+
}
3919+
39153920
});
39163921

39173922
KafkaMessageListenerContainer<Integer, String> container =
@@ -3920,6 +3925,9 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39203925
container.setInterceptBeforeTx(early);
39213926
container.start();
39223927
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3928+
assertThat(afterRecordLatch.await(10, TimeUnit.SECONDS)).isTrue();
3929+
container.stop();
3930+
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
39233931

39243932
InOrder inOrder = inOrder(recordInterceptor, consumer);
39253933
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
@@ -3946,12 +3954,12 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39463954
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
39473955
any(Duration.class));
39483956
}
3949-
container.stop();
3957+
inOrder.verify(consumer).close();
39503958
}
39513959

39523960
@ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}")
39533961
@ValueSource(booleans = { true, false })
3954-
@SuppressWarnings({ "unchecked", "deprecation" })
3962+
@SuppressWarnings("unchecked")
39553963
public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception {
39563964
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
39573965
Consumer<Integer, String> consumer = mock(Consumer.class);

0 commit comments

Comments
 (0)