diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 4322fdde096..53289da1d33 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -360,22 +360,37 @@ The `KafkaMessageSource` provides a pollable channel adapter implementation. ---- @Bean public IntegrationFlow flow(ConsumerFactory cf) { - return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, "myTopic") - .groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000))) + return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")), + e -> e.poller(Pollers.fixedDelay(5000))) .handle(System.out::println) .get(); } ---- +[source, kotlin, role="secondary"] +.Kotlin +---- +@Bean +fun sourceFlow(cf: ConsumerFactory) = + integrationFlow(Kafka.inboundChannelAdapter(cf, + ConsumerProperties(TEST_TOPIC3).also { + it.groupId = "kotlinMessageSourceGroup" + }), + { poller(Pollers.fixedDelay(100)) }) { + handle { m -> + + } + } +---- [source, java, role="secondary"] .Java ---- @InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000")) @Bean public KafkaMessageSource source(ConsumerFactory cf) { - KafkaMessageSource source = new KafkaMessageSource<>(cf, "myTopic"); - source.setGroupId("myGroupId"); - source.setClientId("myClientId"); - return source; + ConsumerProperties consumerProperties = new ConsumerProperties("myTopic"); + consumerProperties.setGroupId("myGroupId"); + consumerProperties.setClientId("myClientId"); + retunr new KafkaMessageSource<>(cf, consumerProperties); } ---- [source, xml, role="secondary"] @@ -384,18 +399,29 @@ public KafkaMessageSource source(ConsumerFactory + auto-startup="false"> + + + + + + + + + + + + + + ---- ==== @@ -421,7 +447,7 @@ IMPORTANT: The gateway does not accept requests until the reply container has be It is suggested that you add a `ConsumerRebalanceListener` to the template's reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway. The `KafkaProducerMessageHandler` `sendTimeoutExpression` default is `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework. -This has been changed for consistency because you may get unexpected behavior (Spring may timeout the `send()`, while it is actually, eventually, successful). +This has been changed for consistency because you may get unexpected behavior (Spring may time out the `send()`, while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures. [[kafka-outbound-gateway-configuration]]