Skip to content

Commit 0998a95

Browse files
committed
Fix KafkaDslTests for splitter race condition
It looks like `Stream.generate()` might producer items not in the expected order. So, use `Stream.toList()` to be sure in the sequence size, and then `resequence()` to be sure that items are emitted downstream in the proper sequence order.
1 parent 8d7eb7c commit 0998a95

File tree

1 file changed

+2
-1
lines changed
  • spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl

1 file changed

+2
-1
lines changed

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,8 @@ public PollableChannel futuresChannel() {
349349
public IntegrationFlow sendToKafkaFlow(
350350
KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandlerTopic2) {
351351
return f -> f
352-
.splitWith(s -> s.function(p -> Stream.generate(() -> p).limit(101).iterator()))
352+
.splitWith(s -> s.function(p -> Stream.generate(() -> p).limit(101).toList()))
353+
.resequence()
353354
.enrichHeaders(h -> h.header(KafkaIntegrationHeaders.FUTURE_TOKEN, "foo"))
354355
.publishSubscribeChannel(c -> c
355356
.subscribe(sf -> sf.handle(

0 commit comments

Comments
 (0)