Skip to content

Commit 0668cd8

Browse files
fix(NODE-5374): do not apply cursor transform in Cursor.hasNext (#3746)
Co-authored-by: Durran Jordan <durran@gmail.com>
1 parent ccc5e30 commit 0668cd8

File tree

4 files changed

+313
-176
lines changed

4 files changed

+313
-176
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 99 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ export abstract class AbstractCursor<
361361
return true;
362362
}
363363

364-
const doc = await nextAsync<TSchema>(this, true);
364+
const doc = await next<TSchema>(this, { blocking: true, transform: false });
365365

366366
if (doc) {
367367
this[kDocuments].unshift(doc);
@@ -377,7 +377,7 @@ export abstract class AbstractCursor<
377377
throw new MongoCursorExhaustedError();
378378
}
379379

380-
return nextAsync(this, true);
380+
return next(this, { blocking: true, transform: true });
381381
}
382382

383383
/**
@@ -388,7 +388,7 @@ export abstract class AbstractCursor<
388388
throw new MongoCursorExhaustedError();
389389
}
390390

391-
return nextAsync(this, false);
391+
return next(this, { blocking: false, transform: true });
392392
}
393393

394394
/**
@@ -680,88 +680,112 @@ export abstract class AbstractCursor<
680680
}
681681
}
682682

683-
function nextDocument<T>(cursor: AbstractCursor<T>): T | null {
684-
const doc = cursor[kDocuments].shift();
685-
686-
if (doc && cursor[kTransform]) {
687-
return cursor[kTransform](doc) as T;
688-
}
689-
690-
return doc;
691-
}
692-
693-
const nextAsync = promisify(
694-
next as <T>(
695-
cursor: AbstractCursor<T>,
696-
blocking: boolean,
697-
callback: (e: Error, r: T | null) => void
698-
) => void
699-
);
700-
701683
/**
702684
* @param cursor - the cursor on which to call `next`
703685
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
704686
* is available. Generally, this flag is set to `false` because if the getMore returns no documents,
705687
* the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and
706688
* `tryNext`, for example) blocking is necessary because a getMore returning no documents does
707689
* not indicate the end of the cursor.
708-
* @param callback - callback to return the result to the caller
709-
* @returns
690+
* @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists)
691+
* @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means
692+
* the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer.
710693
*/
711-
export function next<T>(
694+
async function next<T>(
712695
cursor: AbstractCursor<T>,
713-
blocking: boolean,
714-
callback: Callback<T | null>
715-
): void {
696+
{
697+
blocking,
698+
transform
699+
}: {
700+
blocking: boolean;
701+
transform: boolean;
702+
}
703+
): Promise<T | null> {
716704
const cursorId = cursor[kId];
717705
if (cursor.closed) {
718-
return callback(undefined, null);
706+
return null;
719707
}
720708

721709
if (cursor[kDocuments].length !== 0) {
722-
callback(undefined, nextDocument<T>(cursor));
723-
return;
710+
const doc = cursor[kDocuments].shift();
711+
712+
if (doc != null && transform && cursor[kTransform]) {
713+
try {
714+
return cursor[kTransform](doc);
715+
} catch (error) {
716+
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
717+
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
718+
// error instead.
719+
});
720+
throw error;
721+
}
722+
}
723+
724+
return doc;
724725
}
725726

726727
if (cursorId == null) {
727728
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
728-
cursor[kInit](err => {
729-
if (err) return callback(err);
730-
return next(cursor, blocking, callback);
731-
});
732-
733-
return;
729+
const init = promisify(cb => cursor[kInit](cb));
730+
await init();
731+
return next(cursor, { blocking, transform });
734732
}
735733

736734
if (cursorIsDead(cursor)) {
737-
return cleanupCursor(cursor, undefined, () => callback(undefined, null));
735+
// if the cursor is dead, we clean it up
736+
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
737+
// and we should surface the error
738+
await cleanupCursorAsync(cursor, {});
739+
return null;
738740
}
739741

740742
// otherwise need to call getMore
741743
const batchSize = cursor[kOptions].batchSize || 1000;
742-
cursor._getMore(batchSize, (error, response) => {
743-
if (response) {
744-
const cursorId =
745-
typeof response.cursor.id === 'number'
746-
? Long.fromNumber(response.cursor.id)
747-
: typeof response.cursor.id === 'bigint'
748-
? Long.fromBigInt(response.cursor.id)
749-
: response.cursor.id;
744+
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
745+
cursor._getMore(batchSize, cb)
746+
);
750747

751-
cursor[kDocuments].pushMany(response.cursor.nextBatch);
752-
cursor[kId] = cursorId;
748+
let response: Document | undefined;
749+
try {
750+
response = await getMore(batchSize);
751+
} catch (error) {
752+
if (error) {
753+
await cleanupCursorAsync(cursor, { error }).catch(() => {
754+
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
755+
// error instead.
756+
});
757+
throw error;
753758
}
759+
}
754760

755-
if (error || cursorIsDead(cursor)) {
756-
return cleanupCursor(cursor, { error }, () => callback(error, nextDocument<T>(cursor)));
757-
}
761+
if (response) {
762+
const cursorId =
763+
typeof response.cursor.id === 'number'
764+
? Long.fromNumber(response.cursor.id)
765+
: typeof response.cursor.id === 'bigint'
766+
? Long.fromBigInt(response.cursor.id)
767+
: response.cursor.id;
758768

759-
if (cursor[kDocuments].length === 0 && blocking === false) {
760-
return callback(undefined, null);
761-
}
769+
cursor[kDocuments].pushMany(response.cursor.nextBatch);
770+
cursor[kId] = cursorId;
771+
}
762772

763-
next(cursor, blocking, callback);
764-
});
773+
if (cursorIsDead(cursor)) {
774+
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
775+
// we intentionally clean up the cursor to release its session back into the pool before the cursor
776+
// is iterated. This prevents a cursor that is exhausted on the server from holding
777+
// onto a session indefinitely until the AbstractCursor is iterated.
778+
//
779+
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
780+
// and we should surface the error
781+
await cleanupCursorAsync(cursor, {});
782+
}
783+
784+
if (cursor[kDocuments].length === 0 && blocking === false) {
785+
return null;
786+
}
787+
788+
return next(cursor, { blocking, transform });
765789
}
766790

767791
function cursorIsDead(cursor: AbstractCursor): boolean {
@@ -781,6 +805,10 @@ function cleanupCursor(
781805
const server = cursor[kServer];
782806
const session = cursor[kSession];
783807
const error = options?.error;
808+
809+
// Cursors only emit closed events once the client-side cursor has been exhausted fully or there
810+
// was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we
811+
// cleanup the cursor but don't emit a `close` event.
784812
const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0;
785813

786814
if (error) {
@@ -881,8 +909,21 @@ class ReadableCursorStream extends Readable {
881909
}
882910

883911
private _readNext() {
884-
next(this._cursor, true, (err, result) => {
885-
if (err) {
912+
next(this._cursor, { blocking: true, transform: true }).then(
913+
result => {
914+
if (result == null) {
915+
this.push(null);
916+
} else if (this.destroyed) {
917+
this._cursor.close().catch(() => null);
918+
} else {
919+
if (this.push(result)) {
920+
return this._readNext();
921+
}
922+
923+
this._readInProgress = false;
924+
}
925+
},
926+
err => {
886927
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
887928
// desired behavior is that a stream ends cleanly when a user explicitly closes
888929
// a client during iteration. Alternatively, we could do the "right" thing and
@@ -911,18 +952,6 @@ class ReadableCursorStream extends Readable {
911952
// See NODE-4475.
912953
return this.destroy(err);
913954
}
914-
915-
if (result == null) {
916-
this.push(null);
917-
} else if (this.destroyed) {
918-
this._cursor.close().catch(() => null);
919-
} else {
920-
if (this.push(result)) {
921-
return this._readNext();
922-
}
923-
924-
this._readInProgress = false;
925-
}
926-
});
955+
);
927956
}
928957
}

test/integration/crud/misc_cursors.test.js

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1708,7 +1708,6 @@ describe('Cursor', function () {
17081708
expect(cursor).property('closed', false);
17091709

17101710
const willClose = once(cursor, 'close');
1711-
const willEnd = once(stream, 'end');
17121711

17131712
const dataEvents = on(stream, 'data');
17141713

@@ -1722,13 +1721,9 @@ describe('Cursor', function () {
17221721
// After 5 successful data events, destroy stream
17231722
stream.destroy();
17241723

1725-
// We should get an end event on the stream and a close event on the cursor
1726-
// We should **not** get an 'error' event,
1724+
// We should get a close event on the stream and a close event on the cursor
1725+
// We should **not** get an 'error' or an 'end' event,
17271726
// the following will throw if either stream or cursor emitted an 'error' event
1728-
await Promise.race([
1729-
willEnd,
1730-
sleep(100).then(() => Promise.reject(new Error('end event never emitted')))
1731-
]);
17321727
await Promise.race([
17331728
willClose,
17341729
sleep(100).then(() => Promise.reject(new Error('close event never emitted')))
@@ -3589,11 +3584,8 @@ describe('Cursor', function () {
35893584
await client.close();
35903585
});
35913586

3592-
it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) {
3593-
const configuration = this.configuration;
3594-
const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 });
3595-
3596-
const db = client.db(configuration.db);
3587+
it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () {
3588+
const db = client.db(this.configuration.db);
35973589
const collection = db.collection('cursor_session_tests2');
35983590

35993591
const docs = [
@@ -3604,25 +3596,20 @@ describe('Cursor', function () {
36043596
{ a: 9, b: 10 }
36053597
];
36063598

3607-
collection.insertMany(docs, err => {
3608-
expect(err).to.not.exist;
3609-
const cursor = collection.find({}, { batchSize: 3 });
3610-
cursor.next(function () {
3611-
expect(client.s.activeSessions.size).to.equal(1);
3612-
cursor.next(function () {
3613-
expect(client.s.activeSessions.size).to.equal(1);
3614-
cursor.next(function () {
3615-
expect(client.s.activeSessions.size).to.equal(1);
3616-
cursor.next(function () {
3617-
expect(client.s.activeSessions.size).to.equal(0);
3618-
cursor.close(() => {
3619-
client.close(done);
3620-
});
3621-
});
3622-
});
3623-
});
3624-
});
3625-
});
3599+
await collection.insertMany(docs);
3600+
3601+
const cursor = await collection.find({}, { batchSize: 3 });
3602+
for (let i = 0; i < 3; ++i) {
3603+
await cursor.next();
3604+
expect(client.s.activeSessions.size).to.equal(1);
3605+
}
3606+
3607+
await cursor.next();
3608+
expect(client.s.activeSessions.size, 'session not checked in after cursor exhausted').to.equal(
3609+
0
3610+
);
3611+
3612+
await cursor.close();
36263613
});
36273614

36283615
describe('#clone', function () {

0 commit comments

Comments
 (0)