Skip to content

RFC: handle batch messages in parallel in batch module #1540

Closed
@jeromevdl

Description

@jeromevdl

Is your feature request related to a problem? Please describe.

At the moment, the batch module processes messages in sequence (code), which could be improved with a parallel processing for better performance.

Describe the solution you'd like

  • The BatchMessageHandler could provide a processBatchInParallel method with the same signature as processBatch but with a different behaviour (parallel processing instead of serial)
  • Instead of iterating through the list of messages, we could use a CompletableFuture. It would be something like this (probably not that easy but that's a start):
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

List<CompletableFuture<Optional<SQSBatchResponse.BatchItemFailure>>> collect = event.getRecords().stream()
        .map(sqsMessage -> CompletableFuture.supplyAsync(
                () -> processTheMessageAndReturnOptionalOfBatchItemFailure(sqsMessage, context), executor)
        ).collect(Collectors.toList());

CompletableFuture<List<Optional<SQSBatchResponse.BatchItemFailure>>> listCompletableFuture = CompletableFuture
        .allOf(collect.toArray(new CompletableFuture[0]))
        .thenApply(unused -> collect
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

List<SQSBatchResponse.BatchItemFailure> batchItemFailures =
        listCompletableFuture.get().stream().filter(Optional::isPresent).map(Optional::get)
                .collect(Collectors.toList());

Describe alternatives you've considered
streams provide a parallel method which is based on the number of vCPUs (Runtime.getRuntime().availableProcessors()). Using CompletableFuture, we can define the number of executors, potentially more than the number of vCPUs. We should probably perform some load tests on Lambda to check if that's actually better, because parallel is probably much easier to implement.

Additional context

Metadata

Metadata

Assignees

Type

No type

Projects

Status

Coming soon

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions