Skip to content

Allow Aggregator complete groups based on timeout [INT-3420] #7390

Closed
@spring-operator

Description

@spring-operator

Eamonn Linehan opened INT-3420 and commented

I have a use case where I have a very large file that I would like to split into many messages, do some processing and then continue the workflow after all the messages split from the file have been processed.

To do this I have implemented a streaming message splitter similar to the patch described in this issue: #4662

Because the streaming splitter does not know the group size that header is not set on the split messages.

To be able to detect when all the processing is complete for these split messages I have to rely on a timeout. For example, if the completion of processing of split messages for a particular group has stopped for more then X seconds then we can assume that all the messages have finished and do the aggregation (I also added a payload transformation to a custom message store to prevent the entire payload being stored in memory).

However, setting the "group-timeout" on an aggregator expires rather than completes the group of messages removing all references to the group from the message store completely.

In this case, if an additional message arrives belonging to this same group it is an error. We have timed out the group too soon and an exception should be thrown. However currently a new group is formed with the same correlation ID and aggregation can happen again.

It should be possible to have a group released due to group-timeout to be marked as 'complete' instead of expired. This way, late arriving messages for an "expired" group would be sent to the discard channel.


Affects: 4.0 GA

Reference URL: http://forum.spring.io/forum/spring-projects/integration/749322-aggregator-allows-messages-sent-to-complete-groups-to-form-new-groups

Issue Links:

Referenced from: pull request #1216

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions