diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/TaskletStepExceptionTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/TaskletStepExceptionTests.java index 5bc1fc695f..d4a01e8fc9 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/TaskletStepExceptionTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/TaskletStepExceptionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2008-2022 the original author or authors. + * Copyright 2008-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,6 +63,7 @@ * @author David Turanski * @author Mahmoud Ben Hassine * @author Parikshit Dutta + * @author Elimelec Burghelea */ class TaskletStepExceptionTests { @@ -212,8 +213,8 @@ public void close() throws ItemStreamException { taskletStep.execute(stepExecution); assertEquals(FAILED, stepExecution.getStatus()); - assertTrue(stepExecution.getFailureExceptions().contains(taskletException)); - assertTrue(stepExecution.getFailureExceptions().contains(exception)); + assertEquals(stepExecution.getFailureExceptions().get(0), taskletException); + assertEquals(stepExecution.getFailureExceptions().get(1).getSuppressed()[0], exception); assertEquals(2, jobRepository.getUpdateCount()); } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java index ef6e917b70..e429a30e42 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/TaskletStepTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2023 the original author or authors. + * Copyright 2006-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -761,7 +761,7 @@ public void close() throws ItemStreamException { Throwable ex = stepExecution.getFailureExceptions().get(0); // The original rollback was caused by this one: - assertEquals("Bar", ex.getMessage()); + assertEquals("Bar", ex.getSuppressed()[0].getMessage()); } @Test @@ -791,7 +791,7 @@ public void close() throws ItemStreamException { assertEquals("", msg); Throwable ex = stepExecution.getFailureExceptions().get(0); // The original rollback was caused by this one: - assertEquals("Bar", ex.getMessage()); + assertEquals("Bar", ex.getSuppressed()[0].getMessage()); } /** diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemReader.java index 06148a346c..8da25504fc 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemReader.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 the original author or authors. + * Copyright 2024-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package org.springframework.batch.item.support; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -27,6 +28,7 @@ * implementation is not thread-safe. * * @author Mahmoud Ben Hassine + * @author Elimelec Burghelea * @param type of objects to read * @since 5.2 */ @@ -79,8 +81,22 @@ public void update(ExecutionContext executionContext) throws ItemStreamException @Override public void close() throws ItemStreamException { + List exceptions = new ArrayList<>(); + for (ItemStreamReader delegate : delegates) { - delegate.close(); + try { + delegate.close(); + } + catch (Exception e) { + exceptions.add(e); + } + } + + if (!exceptions.isEmpty()) { + String message = String.format("Failed to close %d delegate(s) due to exceptions", exceptions.size()); + ItemStreamException holder = new ItemStreamException(message); + exceptions.forEach(holder::addSuppressed); + throw holder; } } diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemStream.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemStream.java index e773bf8616..8cb2a1c83c 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemStream.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/support/CompositeItemStream.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2022 the original author or authors. + * Copyright 2006-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ * * @author Dave Syer * @author Mahmoud Ben Hassine - * + * @author Elimelec Burghelea */ public class CompositeItemStream implements ItemStream { @@ -102,13 +102,26 @@ public void update(ExecutionContext executionContext) { /** * Broadcast the call to close. * @throws ItemStreamException thrown if one of the {@link ItemStream}s in the list - * fails to close. This is a sequential operation so all itemStreams in the list after - * the one that failed to close will remain open. + * fails to close. */ @Override public void close() throws ItemStreamException { + List exceptions = new ArrayList<>(); + for (ItemStream itemStream : streams) { - itemStream.close(); + try { + itemStream.close(); + } + catch (Exception e) { + exceptions.add(e); + } + } + + if (!exceptions.isEmpty()) { + String message = String.format("Failed to close %d delegate(s) due to exceptions", exceptions.size()); + ItemStreamException holder = new ItemStreamException(message); + exceptions.forEach(holder::addSuppressed); + throw holder; } } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemReaderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemReaderTests.java index 3775c4299c..70091a0afc 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemReaderTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemReaderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 the original author or authors. + * Copyright 2024-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,14 @@ import java.util.Arrays; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemStreamReader; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -32,6 +35,7 @@ * Test class for {@link CompositeItemReader}. * * @author Mahmoud Ben Hassine + * @author Elimelec Burghelea */ public class CompositeItemReaderTests { @@ -107,4 +111,27 @@ void testCompositeItemReaderClose() { verify(reader2).close(); } + @Test + void testCompositeItemReaderCloseWithDelegateThatThrowsException() { + // given + ItemStreamReader reader1 = mock(); + ItemStreamReader reader2 = mock(); + CompositeItemReader compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2)); + + doThrow(new ItemStreamException("A failure")).when(reader1).close(); + + // when + try { + compositeItemReader.close(); + Assertions.fail("Expected an ItemStreamException"); + } + catch (ItemStreamException ignored) { + + } + + // then + verify(reader1).close(); + verify(reader2).close(); + } + } \ No newline at end of file diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemStreamTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemStreamTests.java index 5f1be03821..3861ca0f8d 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemStreamTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/CompositeItemStreamTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2022 the original author or authors. + * Copyright 2006-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,19 +15,25 @@ */ package org.springframework.batch.item.support; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + import java.util.ArrayList; import java.util.List; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStream; +import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemStreamSupport; -import static org.junit.jupiter.api.Assertions.assertEquals; - /** * @author Dave Syer - * + * @author Elimelec Burghelea */ class CompositeItemStreamTests { @@ -90,6 +96,40 @@ public void close() { assertEquals(1, list.size()); } + @Test + void testClose2Delegates() { + ItemStream reader1 = Mockito.mock(ItemStream.class); + ItemStream reader2 = Mockito.mock(ItemStream.class); + manager.register(reader1); + manager.register(reader2); + + manager.close(); + + verify(reader1, times(1)).close(); + verify(reader2, times(1)).close(); + } + + @Test + void testClose2DelegatesThatThrowsException() { + ItemStream reader1 = Mockito.mock(ItemStream.class); + ItemStream reader2 = Mockito.mock(ItemStream.class); + manager.register(reader1); + manager.register(reader2); + + doThrow(new ItemStreamException("A failure")).when(reader1).close(); + + try { + manager.close(); + Assertions.fail("Expected an ItemStreamException"); + } + catch (ItemStreamException ignored) { + + } + + verify(reader1, times(1)).close(); + verify(reader2, times(1)).close(); + } + @Test void testCloseDoesNotUnregister() { manager.setStreams(new ItemStream[] { new ItemStreamSupport() {