Closed
Description
Is your feature request related to a problem? Please describe.
At the moment, the batch module processes messages in sequence (code), which could be improved with a parallel processing for better performance.
Describe the solution you'd like
- The
BatchMessageHandler
could provide aprocessBatchInParallel
method with the same signature asprocessBatch
but with a different behaviour (parallel processing instead of serial) - Instead of iterating through the list of messages, we could use a
CompletableFuture
. It would be something like this (probably not that easy but that's a start):
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
List<CompletableFuture<Optional<SQSBatchResponse.BatchItemFailure>>> collect = event.getRecords().stream()
.map(sqsMessage -> CompletableFuture.supplyAsync(
() -> processTheMessageAndReturnOptionalOfBatchItemFailure(sqsMessage, context), executor)
).collect(Collectors.toList());
CompletableFuture<List<Optional<SQSBatchResponse.BatchItemFailure>>> listCompletableFuture = CompletableFuture
.allOf(collect.toArray(new CompletableFuture[0]))
.thenApply(unused -> collect
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
List<SQSBatchResponse.BatchItemFailure> batchItemFailures =
listCompletableFuture.get().stream().filter(Optional::isPresent).map(Optional::get)
.collect(Collectors.toList());
Describe alternatives you've considered
streams provide a parallel
method which is based on the number of vCPUs (Runtime.getRuntime().availableProcessors()
). Using CompletableFuture
, we can define the number of executors, potentially more than the number of vCPUs. We should probably perform some load tests on Lambda to check if that's actually better, because parallel is probably much easier to implement.
Additional context
Metadata
Metadata
Assignees
Type
Projects
Status
Coming soon