Skip to content

Commit 62ae458

Browse files
huisamspring-builds
authored andcommitted
GH-3277 : Fix coroutine detection logic in MessagingMessageListenerAdapter
Fixes: #3277 * Fix coroutine detection logic in `MessagingMessageListenerAdapter` by continuation class type * Use proper `@since 3.2.1` for new API (cherry picked from commit abaa465)
1 parent 7f14105 commit 62ae458

File tree

4 files changed

+25
-9
lines changed

4 files changed

+25
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
*
3636
* @author Gary Russell
3737
* @author Wang Zhiyang
38+
* @author Huijin Hong
3839
* @since 2.5
3940
*
4041
*/
@@ -124,4 +125,13 @@ public static boolean isCompletableFuture(Class<?> resultType) {
124125
return CompletableFuture.class.isAssignableFrom(resultType);
125126
}
126127

128+
/**
129+
* Return the true when type is {@code Continuation}.
130+
* @param parameterType {@code MethodParameter} parameter type.
131+
* @return type is {@code Continuation}.
132+
* @since 3.2.1
133+
*/
134+
public static boolean isKotlinContinuation(Class<?> parameterType) {
135+
return "kotlin.coroutines.Continuation".equals(parameterType.getName());
136+
}
127137
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* but for regular {@link HandlerMethodArgumentResolver} contract.
3131
*
3232
* @author Wang Zhiyang
33+
* @author Huijin Hong
3334
*
3435
* @since 3.2
3536
*
@@ -39,7 +40,7 @@ public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodA
3940

4041
@Override
4142
public boolean supportsParameter(MethodParameter parameter) {
42-
return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName());
43+
return AdapterUtils.isKotlinContinuation(parameter.getParameterType());
4344
}
4445

4546
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.kafka.common.TopicPartition;
3838

3939
import org.springframework.context.expression.MapAccessor;
40-
import org.springframework.core.KotlinDetector;
4140
import org.springframework.core.MethodParameter;
4241
import org.springframework.core.log.LogAccessor;
4342
import org.springframework.expression.BeanResolver;
@@ -90,6 +89,7 @@
9089
* @author Venil Noronha
9190
* @author Nathan Xu
9291
* @author Wang ZhiYang
92+
* @author Huijin Hong
9393
*/
9494
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, AsyncRepliesAware {
9595

@@ -763,8 +763,8 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
763763
isNotConvertible |= isAck;
764764
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
765765
isNotConvertible |= isConsumer;
766-
boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType());
767-
isNotConvertible |= isCoroutines;
766+
boolean isKotlinContinuation = AdapterUtils.isKotlinContinuation(methodParameter.getParameterType());
767+
isNotConvertible |= isKotlinContinuation;
768768
boolean isMeta = parameterIsType(parameterType, ConsumerRecordMetadata.class);
769769
this.hasMetadataParameter |= isMeta;
770770
isNotConvertible |= isMeta;
@@ -783,7 +783,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
783783
break;
784784
}
785785
}
786-
else if (isAck || isCoroutines || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
786+
else if (isAck || isKotlinContinuation || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
787787
allowedBatchParameters++;
788788
}
789789
}

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
3636
import org.springframework.kafka.core.KafkaTemplate
3737
import org.springframework.kafka.core.ProducerFactory
3838
import org.springframework.kafka.listener.*
39+
import org.springframework.kafka.support.converter.JsonMessageConverter
3940
import org.springframework.kafka.test.EmbeddedKafkaBroker
4041
import org.springframework.kafka.test.context.EmbeddedKafka
4142
import org.springframework.test.annotation.DirtiesContext
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit
4748

4849
/**
4950
* @author Gary Russell
51+
* @author Huijin Hong
5052
* @since 2.2
5153
*/
5254

@@ -63,7 +65,7 @@ class EnableKafkaKotlinTests {
6365

6466
@Test
6567
fun `test listener`() {
66-
this.template.send("kotlinTestTopic1", "foo")
68+
this.template.send("kotlinTestTopic1", "{\"data\":\"foo\"}")
6769
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue()
6870
assertThat(this.config.received).isEqualTo("foo")
6971
}
@@ -173,6 +175,7 @@ class EnableKafkaKotlinTests {
173175
= ConcurrentKafkaListenerContainerFactory()
174176
factory.consumerFactory = kcf()
175177
factory.setCommonErrorHandler(eh)
178+
factory.setRecordMessageConverter(JsonMessageConverter())
176179
return factory
177180
}
178181

@@ -186,9 +189,11 @@ class EnableKafkaKotlinTests {
186189
return factory
187190
}
188191

192+
data class TestKafkaMessage(val data: String)
193+
189194
@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic1"], containerFactory = "kafkaListenerContainerFactory")
190-
fun listen(value: String) {
191-
this.received = value
195+
fun listen(value: TestKafkaMessage) {
196+
this.received = value.data
192197
this.latch1.countDown()
193198
}
194199

0 commit comments

Comments
 (0)