Description
Version 2.17.28
When using the flatMapIterable
method on QueryPublisher
(the return from queryPaginator
method on the Async DDB client) there is a bug in the underlying FlatteningSubscriber
in AWS SDK.
When we take the publisher produced from this, and feed it into reactor, we see, by how we happened to have set up our Flux, that because of some un-related flatMap / publish; reactor makes an initial request
call for X
and then immediately another request
call for Y
.
What this seems to do is, enter your request
method on the in-line implementation of Subscriber
in FlatteningSubscriber
twice, back to back.
This means the first one makes a request "up" to the mapper, and the second one immediately then hit's the else
and goes to fulfillDemand()
. This then sees that the currentBatch
is empty, of course because we've not even had a page back yet in this double tap. So... it also does a request "up" for another page!
now our demand is at X+Y
and we sit and wait, when onNext
is called by the mapper because a QueryResponse came back with a page of results, we are now destined to failure if the back pressure builds. The upstream demand is now 2 pages, which I think the code makes no account for because it wouldn't normally happen unless you hit the request
method multiple times before the first page, or if an empty page happened to end at the exact amount of demand being met.
In my example X is 8 and Y is 256 making 264 demand.
onNext is called for the first page, as I said, and also called again with the subsequent page, but is locked out.
page 1 in my case had 79 results, and when the batch was empty and demand was high, it then requested another (3rd) page at the end of fulfillDemand
then it unlocks and the queued up onNext
for page 2 comes in, and enters fulfillDemand
in my case page 2 had 21 results. so at the end, 100 have been sent onward and the batch is empty so it, too, requests another page, the 4th now.
it then exits and unlocks for the queued up 3rd page to come in to onNext
where it enters again fulfillDemand
this page has 225 results. so the 164 remaining demand is satisfied,
demand is left at 0 because no more requests have come in as the downstream is saturated with IO backpressure.
so we have 61 items left on this 3rd page in the currentBatch list.
as soon as demand hit 0, the fulfillDemand
completed and so did the onNext
which called it for page 3
but a 4th page was requested by the 2nd page completing, and page 4 comes in onNext
and ....
destroys the currentBatch by replacing it with the 4th page, not checking if it was empty and adding to it because you presume that it would be empty to get a request, wrong in the case of more than 1 request
call happening before the 1st page of results...
so, those 61 items? gone.
repeatable, every time, but I can't give you a query that you can run on any arbritrary database to see this.
But I just read the code and figured this would happen when I went looking for my missing 61 orders!
My solution, was not to use the AWS SDK V2 flatMapIterable
on the QueryPublisher
and instead hand over that publisher to project reactor right away and had Flux.flatMapIterable
take over the work, because it manages and not replaces it's own queue, it would seem, instead of a batch.
I could try make some code but, if you read and understand this, this relies on
- more results than demand
- demand not rising initially with backpressure
- double tap request (seems very very easy to double or triple tap with project reactor or even if manually making some consumer)
- there being a number of results remaining on page N where N is not the final page of results (more than 1 page for sure)
So; The cause is that a double (or more) tap to request
before the first page hits onNext
sets you up for failure, by requesting more pages. And when a page comes in no check is made for empty currentBatch
it is lost if not empty.
Resolution: simplest would be to concatenate currentBatch with the incoming iter, if it happens not to be empty as it should be. Or, disallow more requests upstream if we didn't yet get a page, record that a request is made but not satisfied yet, and add the delta demand but don't push more demand up. That seems less easy but more logical, if a request has been made up for 1
(page/iterable) then until one comes down don't request more because the code cannot handle more than one hitting onNext
if the demand ever gets met.