Description
Eamonn Linehan opened INT-3420 and commented
I have a use case where I have a very large file that I would like to split into many messages, do some processing and then continue the workflow after all the messages split from the file have been processed.
To do this I have implemented a streaming message splitter similar to the patch described in this issue: #4662
Because the streaming splitter does not know the group size that header is not set on the split messages.
To be able to detect when all the processing is complete for these split messages I have to rely on a timeout. For example, if the completion of processing of split messages for a particular group has stopped for more then X seconds then we can assume that all the messages have finished and do the aggregation (I also added a payload transformation to a custom message store to prevent the entire payload being stored in memory).
However, setting the "group-timeout" on an aggregator expires rather than completes the group of messages removing all references to the group from the message store completely.
In this case, if an additional message arrives belonging to this same group it is an error. We have timed out the group too soon and an exception should be thrown. However currently a new group is formed with the same correlation ID and aggregation can happen again.
It should be possible to have a group released due to group-timeout to be marked as 'complete' instead of expired. This way, late arriving messages for an "expired" group would be sent to the discard channel.
Affects: 4.0 GA
Reference URL: http://forum.spring.io/forum/spring-projects/integration/749322-aggregator-allows-messages-sent-to-complete-groups-to-form-new-groups
Issue Links:
-
Time-based ReleaseStrategy [INT-2595] #6575 Time-based ReleaseStrategy
-
support Iterator for splitters [INT-651] #4662 support Iterator for splitters
-
Add
expire-groups-upon-timeout
to the <resequencer> [INT-3529] #7493 Addexpire-groups-upon-timeout
to the <resequencer>
Referenced from: pull request #1216