From 001005233f02cab49e884b8c2986256cc0a42989 Mon Sep 17 00:00:00 2001 From: abilan Date: Wed, 7 Jun 2023 16:59:05 -0400 Subject: [PATCH] Fix DSL for inner bean names generation The `IntegrationFlowBeanPostProcessor.processIntegrationComponentSpec()` uses a wrong `generateBeanName()` for component to register making the provided `id` as a prefix * Use `generateBeanName(Object instance, String prefix, @Nullable String fallbackId, boolean useFlowIdAsPrefix)` instead to properly "fallback" to the provided name **Cherry-pick to `6.1.x`** --- .../IntegrationFlowBeanPostProcessor.java | 2 +- .../integration/kafka/dsl/KafkaDslTests.java | 28 +++++++++++-------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java index cf91e734f45..0c1cbb058d0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java @@ -356,7 +356,7 @@ private void processIntegrationComponentSpec(String beanName, IntegrationCompone .filter(component -> noBeanPresentForComponent(component.getKey(), beanName)) .forEach(component -> registerComponent(component.getKey(), - generateBeanName(component.getKey(), component.getValue()))); + generateBeanName(component.getKey(), beanName, component.getValue(), false))); } } diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java index bf2037ee1de..bc4ef5e0296 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java @@ -244,8 +244,10 @@ void testGateways() throws Exception { } @Test - void channels(@Autowired MessageChannel topic6Channel, @Autowired PollableKafkaChannel topic8Channel, - @Autowired PollableKafkaChannel topic9Channel) { + void channels(@Qualifier("topic6Channel") MessageChannel topic6Channel, + @Qualifier("topic8Channel") PollableKafkaChannel topic8Channel, + @Qualifier("topic9Channel") PollableKafkaChannel topic9Channel) { + topic6Channel.send(new GenericMessage<>("foo")); Message received = topic8Channel.receive(); assertThat(received) @@ -344,7 +346,8 @@ public PollableChannel futuresChannel() { } @Bean - public IntegrationFlow sendToKafkaFlow() { + public IntegrationFlow sendToKafkaFlow( + KafkaProducerMessageHandlerSpec kafkaMessageHandlerTopic2) { return f -> f .split(p -> Stream.generate(() -> p).limit(101).iterator(), null) .enrichHeaders(h -> h.header(KafkaIntegrationHeaders.FUTURE_TOKEN, "foo")) @@ -353,17 +356,15 @@ public IntegrationFlow sendToKafkaFlow() { kafkaMessageHandler(producerFactory(), TEST_TOPIC1) .timestampExpression("T(Long).valueOf('1487694048633')"), e -> e.id("kafkaProducer1"))) - .subscribe(sf -> sf.handle( - kafkaMessageHandler(producerFactory(), TEST_TOPIC2) - .flush(msg -> true) - .timestamp(m -> 1487694048644L), - e -> e.id("kafkaProducer2"))) + .subscribe(sf -> sf.handle(kafkaMessageHandlerTopic2, e -> e.id("kafkaProducer2"))) ); } @Bean - public DefaultKafkaHeaderMapper mapper() { - return new DefaultKafkaHeaderMapper(); + public KafkaProducerMessageHandlerSpec kafkaMessageHandlerTopic2() { + return kafkaMessageHandler(producerFactory(), TEST_TOPIC2) + .flush(msg -> true) + .timestamp(m -> 1487694048644L); } private KafkaProducerMessageHandlerSpec kafkaMessageHandler( @@ -382,6 +383,11 @@ public DefaultKafkaHeaderMapper mapper() { .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic)); } + @Bean + public DefaultKafkaHeaderMapper mapper() { + return new DefaultKafkaHeaderMapper(); + } + @Bean public IntegrationFlow sourceFlow() { @@ -427,7 +433,7 @@ public KafkaMessageSource channelSource(ConsumerFactory template, ConcurrentKafkaListenerContainerFactory containerFactory, - KafkaMessageSource channelSource, + @Qualifier("channelSource") KafkaMessageSource channelSource, PublishSubscribeKafkaChannel publishSubscribeKafkaChannel) { return IntegrationFlow.from(topic6Channel(template, containerFactory))