Skip to content

Commit dfae1a2

Browse files
artembilangaryrussell
authored andcommitted
GH-8573: Fix KafkaMessageSource samples in docs (#8575)
Fixes #8573 * Also add a Kotlin DSL sample **Cherry-pick to `6.0.x` & `5.5.x`**
1 parent bdce2b9 commit dfae1a2

File tree

1 file changed

+49
-11
lines changed

1 file changed

+49
-11
lines changed

src/reference/asciidoc/kafka.adoc

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -373,15 +373,42 @@ The `KafkaMessageSource` provides a pollable channel adapter implementation.
373373
==== Java Configuration
374374

375375
====
376-
[source, java]
376+
[source, java, role="primary"]
377+
.Java DSL
378+
----
379+
@Bean
380+
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
381+
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
382+
e -> e.poller(Pollers.fixedDelay(5000)))
383+
.handle(System.out::println)
384+
.get();
385+
}
386+
----
387+
[source, kotlin, role="secondary"]
388+
.Kotlin
389+
----
390+
@Bean
391+
fun sourceFlow(cf: ConsumerFactory<String, String>) =
392+
integrationFlow(Kafka.inboundChannelAdapter(cf,
393+
ConsumerProperties(TEST_TOPIC3).also {
394+
it.groupId = "kotlinMessageSourceGroup"
395+
}),
396+
{ poller(Pollers.fixedDelay(100)) }) {
397+
handle { m ->
398+
399+
}
400+
}
401+
----
402+
[source, java, role="secondary"]
403+
.Java
377404
----
378405
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
379406
@Bean
380407
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
381-
KafkaMessageSource<String, String> source = new KafkaMessageSource<>(cf, "myTopic");
382-
source.setGroupId("myGroupId");
383-
source.setClientId("myClientId");
384-
return source;
408+
ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
409+
consumerProperties.setGroupId("myGroupId");
410+
consumerProperties.setClientId("myClientId");
411+
retunr new KafkaMessageSource<>(cf, consumerProperties);
385412
}
386413
----
387414
====
@@ -420,18 +447,29 @@ public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
420447
<int-kafka:inbound-channel-adapter
421448
id="adapter1"
422449
consumer-factory="consumerFactory"
450+
consumer-properties="consumerProperties1"
423451
ack-factory="ackFactory"
424-
topics="topic1"
425452
channel="inbound"
426-
client-id="client"
427-
group-id="group"
428453
message-converter="converter"
429454
payload-type="java.lang.String"
430455
raw-header="true"
431-
auto-startup="false"
432-
rebalance-listener="rebal">
456+
auto-startup="false">
433457
<int:poller fixed-delay="5000"/>
434458
</int-kafka:inbound-channel-adapter>
459+
460+
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
461+
<constructor-arg>
462+
<map>
463+
<entry key="max.poll.records" value="1"/>
464+
</map>
465+
</constructor-arg>
466+
</bean>
467+
468+
<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
469+
<constructor-arg name="topics" value="topic1"/>
470+
<property name="groupId" value="group"/>
471+
<property name="clientId" value="client"/>
472+
</bean>
435473
----
436474
====
437475

@@ -446,7 +484,7 @@ IMPORTANT: The gateway does not accept requests until the reply container has be
446484
It is suggested that you add a `ConsumerRebalanceListener` to the template's reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway.
447485

448486
The `KafkaProducerMessageHandler` `sendTimeoutExpression` default is `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
449-
This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful).
487+
This has been changed for consistency because you may get unexpected behavior (Spring may time out the `send()`, while it is actually, eventually, successful).
450488
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.
451489

452490
==== Java Configuration

0 commit comments

Comments
 (0)