From a8cad265c6da08fef2d0c7bc3c3256fb3ab55f6e Mon Sep 17 00:00:00 2001 From: Jiandong Ma Date: Fri, 2 May 2025 22:55:20 +0800 Subject: [PATCH 1/3] Fix setReleaseStrategy with correct parameter propagation Fixes: #10003 Issue Link: https://github.com/spring-projects/spring-integration/issues/10003 Signed-off-by: Jiandong Ma --- .../AbstractCorrelatingMessageHandler.java | 2 +- .../integration/aggregator/AggregatorTests.java | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java index 68433fecd22..01d1f44d060 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java @@ -299,7 +299,7 @@ public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) { */ public void setReleasePartialSequences(boolean releasePartialSequences) { if (!this.releaseStrategySet && releasePartialSequences) { - setReleaseStrategy(new SequenceSizeReleaseStrategy()); + setReleaseStrategy(new SequenceSizeReleaseStrategy(releasePartialSequences)); } this.releasePartialSequences = releasePartialSequences; } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java index 12369cd16a8..858b6a5b8db 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -129,7 +129,6 @@ public void testAggPerf() throws InterruptedException, ExecutionException, Timeo } @Test - @Disabled("Time sensitive") public void testAggPerfDefaultPartial() throws InterruptedException, ExecutionException, TimeoutException { AggregatingMessageHandler handler = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor()); handler.setCorrelationStrategy(message -> "foo"); @@ -152,28 +151,30 @@ public void testAggPerfDefaultPartial() throws InterruptedException, ExecutionEx store.setMessageGroupFactory(messageGroupFactory); handler.setMessageStore(store); + handler.setBeanFactory(mock(BeanFactory.class)); + handler.afterPropertiesSet(); StopWatch stopwatch = new StopWatch(); stopwatch.start(); - for (int i = 0; i < 120000; i++) { - if (i % 10000 == 0) { + for (int i = 0; i < 1200; i++) { + if (i % 100 == 0) { stopwatch.stop(); logger.warn("Sent " + i + " in " + stopwatch.getTotalTimeSeconds() + - " (10k in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)"); + " (100 in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)"); stopwatch.start(); } handler.handleMessage(MessageBuilder.withPayload("foo") - .setSequenceSize(120000) + .setSequenceSize(1200) .setSequenceNumber(i + 1) .build()); } stopwatch.stop(); - logger.warn("Sent " + 120000 + " in " + stopwatch.getTotalTimeSeconds() + + logger.warn("Sent " + 1200 + " in " + stopwatch.getTotalTimeSeconds() + " (10k in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)"); Collection result = resultFuture.get(10, TimeUnit.SECONDS); assertThat(result).isNotNull(); - assertThat(result.size()).isEqualTo(120000); + assertThat(result.size()).isEqualTo(1); assertThat(stopwatch.getTotalTimeSeconds()).isLessThan(60.0); // actually < 2.0, was many minutes } From 6e183e532db4ed68f223c66b3850831cb0e48023 Mon Sep 17 00:00:00 2001 From: Jiandong Ma Date: Fri, 2 May 2025 23:25:55 +0800 Subject: [PATCH 2/3] address PR comments Signed-off-by: Jiandong Ma --- .../integration/aggregator/AggregatorTests.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java index 858b6a5b8db..17fd6c3086b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java @@ -129,10 +129,10 @@ public void testAggPerf() throws InterruptedException, ExecutionException, Timeo } @Test + @Disabled("Time sensitive") public void testAggPerfDefaultPartial() throws InterruptedException, ExecutionException, TimeoutException { AggregatingMessageHandler handler = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor()); handler.setCorrelationStrategy(message -> "foo"); - handler.setReleasePartialSequences(true); DirectChannel outputChannel = new DirectChannel(); handler.setOutputChannel(outputChannel); @@ -156,25 +156,25 @@ public void testAggPerfDefaultPartial() throws InterruptedException, ExecutionEx StopWatch stopwatch = new StopWatch(); stopwatch.start(); - for (int i = 0; i < 1200; i++) { - if (i % 100 == 0) { + for (int i = 0; i < 120000; i++) { + if (i % 10000 == 0) { stopwatch.stop(); logger.warn("Sent " + i + " in " + stopwatch.getTotalTimeSeconds() + - " (100 in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)"); + " (10k in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)"); stopwatch.start(); } handler.handleMessage(MessageBuilder.withPayload("foo") - .setSequenceSize(1200) + .setSequenceSize(120000) .setSequenceNumber(i + 1) .build()); } stopwatch.stop(); - logger.warn("Sent " + 1200 + " in " + stopwatch.getTotalTimeSeconds() + + logger.warn("Sent " + 120000 + " in " + stopwatch.getTotalTimeSeconds() + " (10k in " + stopwatch.lastTaskInfo().getTimeMillis() + "ms)"); Collection result = resultFuture.get(10, TimeUnit.SECONDS); assertThat(result).isNotNull(); - assertThat(result.size()).isEqualTo(1); + assertThat(result.size()).isEqualTo(120000); assertThat(stopwatch.getTotalTimeSeconds()).isLessThan(60.0); // actually < 2.0, was many minutes } From fac8baa93698d22ab9f7bc3cd6ae70806d65b0d5 Mon Sep 17 00:00:00 2001 From: Jiandong Ma Date: Fri, 2 May 2025 23:28:32 +0800 Subject: [PATCH 3/3] address PR comment Signed-off-by: Jiandong Ma --- .../springframework/integration/aggregator/AggregatorTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java index 17fd6c3086b..0e667a06d5f 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java @@ -151,8 +151,6 @@ public void testAggPerfDefaultPartial() throws InterruptedException, ExecutionEx store.setMessageGroupFactory(messageGroupFactory); handler.setMessageStore(store); - handler.setBeanFactory(mock(BeanFactory.class)); - handler.afterPropertiesSet(); StopWatch stopwatch = new StopWatch(); stopwatch.start();