Skip to content

Commit 2da8f1a

Browse files
fix multiple transform bug
1 parent 680c393 commit 2da8f1a

File tree

3 files changed

+65
-51
lines changed

3 files changed

+65
-51
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ export abstract class AbstractCursor<
361361
return true;
362362
}
363363

364-
const doc = await next<TSchema>(this, true);
364+
const doc = await next<TSchema>(this, true, false);
365365

366366
if (doc) {
367367
this[kDocuments].unshift(doc);
@@ -680,48 +680,47 @@ export abstract class AbstractCursor<
680680
}
681681
}
682682

683-
function nextDocument<T>(cursor: AbstractCursor<T>): T | null {
684-
const doc = cursor[kDocuments].shift();
685-
686-
if (doc && cursor[kTransform]) {
687-
return cursor[kTransform](doc) as T;
688-
}
689-
690-
return doc;
691-
}
692-
693683
/**
694684
* @param cursor - the cursor on which to call `next`
695685
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
696686
* is available. Generally, this flag is set to `false` because if the getMore returns no documents,
697687
* the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and
698688
* `tryNext`, for example) blocking is necessary because a getMore returning no documents does
699689
* not indicate the end of the cursor.
690+
* @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists)
700691
* @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means
701692
* the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer.
702693
*/
703-
async function next<T>(cursor: AbstractCursor<T>, blocking: boolean): Promise<T | null> {
694+
async function next<T>(
695+
cursor: AbstractCursor<T>,
696+
blocking: boolean,
697+
transform = true
698+
): Promise<T | null> {
704699
const cursorId = cursor[kId];
705700
if (cursor.closed) {
706701
return null;
707702
}
708703

709704
if (cursor[kDocuments].length !== 0) {
710-
return nextDocument<T>(cursor);
705+
const doc = cursor[kDocuments].shift();
706+
707+
if (doc != null && transform && cursor[kTransform]) {
708+
return cursor[kTransform](doc);
709+
}
710+
711+
return doc;
711712
}
712713

713714
if (cursorId == null) {
714715
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
715716
const init = promisify(cb => cursor[kInit](cb));
716717
await init();
717-
return next(cursor, blocking);
718+
return next(cursor, blocking, transform);
718719
}
719720

720721
if (cursorIsDead(cursor)) {
721-
try {
722-
await cleanupCursorAsync(cursor, undefined);
723-
// eslint-disable-next-line no-empty
724-
} catch {}
722+
// if the cursor is dead, we clean it up
723+
await cleanupCursorAsync(cursor);
725724
return null;
726725
}
727726

@@ -735,11 +734,8 @@ async function next<T>(cursor: AbstractCursor<T>, blocking: boolean): Promise<T
735734
try {
736735
response = await getMore(batchSize);
737736
} catch (error) {
738-
if (error || cursorIsDead(cursor)) {
739-
try {
740-
await cleanupCursorAsync(cursor, { error });
741-
// eslint-disable-next-line no-empty
742-
} catch {}
737+
if (error) {
738+
await cleanupCursorAsync(cursor, { error });
743739
throw error;
744740
}
745741
}
@@ -756,19 +752,40 @@ async function next<T>(cursor: AbstractCursor<T>, blocking: boolean): Promise<T
756752
cursor[kId] = cursorId;
757753
}
758754

755+
if (cursorIsDead(cursor)) {
756+
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
757+
// we intentionally clean up the cursor to release its session back into the pool before the cursor
758+
// is iterated. This prevents a cursor that is exhausted on the server from holding
759+
// onto a session indefinitely until the AbstractCursor is iterated.
760+
await cleanupCursorAsync(cursor);
761+
}
762+
759763
if (cursor[kDocuments].length === 0 && blocking === false) {
760764
return null;
761765
}
762766

763-
return next(cursor, blocking);
767+
return next(cursor, blocking, transform);
764768
}
765769

766770
function cursorIsDead(cursor: AbstractCursor): boolean {
767771
const cursorId = cursor[kId];
768772
return !!cursorId && cursorId.isZero();
769773
}
770774

771-
const cleanupCursorAsync = promisify(cleanupCursor);
775+
const cleanupCursorAsyncInternal = promisify(cleanupCursor);
776+
777+
async function cleanupCursorAsync<T>(
778+
cursor: AbstractCursor<T>,
779+
options: { needsToEmitClosed?: boolean; error?: AnyError } = {}
780+
): Promise<void> {
781+
try {
782+
await cleanupCursorAsyncInternal(cursor, options);
783+
} catch {
784+
// `cleanupCursor` never throws but we can't really test that.
785+
// so this is a hack to ensure that any upstream consumers
786+
// can safely guarantee on this wrapper never throwing.
787+
}
788+
}
772789

773790
function cleanupCursor(
774791
cursor: AbstractCursor,

test/integration/crud/misc_cursors.test.js

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1708,7 +1708,7 @@ describe('Cursor', function () {
17081708
expect(cursor).property('closed', false);
17091709

17101710
const willClose = once(cursor, 'close');
1711-
const willEnd = once(stream, 'end');
1711+
const willEnd = once(stream, 'close');
17121712

17131713
const dataEvents = on(stream, 'data');
17141714

@@ -1722,16 +1722,16 @@ describe('Cursor', function () {
17221722
// After 5 successful data events, destroy stream
17231723
stream.destroy();
17241724

1725-
// We should get an end event on the stream and a close event on the cursor
1725+
// We should get a a close event on the stream and a close event on the cursor
17261726
// We should **not** get an 'error' event,
17271727
// the following will throw if either stream or cursor emitted an 'error' event
17281728
await Promise.race([
17291729
willEnd,
1730-
sleep(100).then(() => Promise.reject(new Error('end event never emitted')))
1730+
sleep(100, { ref: false }).then(() => Promise.reject(new Error('end event never emitted')))
17311731
]);
17321732
await Promise.race([
17331733
willClose,
1734-
sleep(100).then(() => Promise.reject(new Error('close event never emitted')))
1734+
sleep(100, { ref: false }).then(() => Promise.reject(new Error('close event never emitted')))
17351735
]);
17361736
});
17371737

@@ -3589,7 +3589,7 @@ describe('Cursor', function () {
35893589
await client.close();
35903590
});
35913591

3592-
it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) {
3592+
it('should return implicit session to pool when client-side cursor exhausts results after a getMore', async function () {
35933593
const configuration = this.configuration;
35943594
const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 });
35953595

@@ -3604,25 +3604,22 @@ describe('Cursor', function () {
36043604
{ a: 9, b: 10 }
36053605
];
36063606

3607-
collection.insertMany(docs, err => {
3608-
expect(err).to.not.exist;
3609-
const cursor = collection.find({}, { batchSize: 3 });
3610-
cursor.next(function () {
3611-
expect(client.s.activeSessions.size).to.equal(1);
3612-
cursor.next(function () {
3613-
expect(client.s.activeSessions.size).to.equal(1);
3614-
cursor.next(function () {
3615-
expect(client.s.activeSessions.size).to.equal(1);
3616-
cursor.next(function () {
3617-
expect(client.s.activeSessions.size).to.equal(0);
3618-
cursor.close(() => {
3619-
client.close(done);
3620-
});
3621-
});
3622-
});
3623-
});
3624-
});
3625-
});
3607+
await collection.insertMany(docs);
3608+
3609+
// TODO - talk to Neal about this test
3610+
const cursor = await collection.find({}, { batchSize: 3 });
3611+
for (let i = 0; i < 3; ++i) {
3612+
await cursor.next();
3613+
expect(client.s.activeSessions.size).to.equal(1);
3614+
}
3615+
3616+
await cursor.next();
3617+
expect(client.s.activeSessions.size, 'session not checked in after cursor exhausted').to.equal(
3618+
0
3619+
);
3620+
3621+
await cursor.close();
3622+
await client.close();
36263623
});
36273624

36283625
describe('#clone', function () {

test/integration/node-specific/cursor_stream.test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,10 @@ describe('Cursor Streams', function () {
297297
stream.on('error', err => (error = err));
298298
cursor.on('close', function () {
299299
// NOTE: use `setImmediate` here because the stream implementation uses `nextTick` to emit the error
300-
setImmediate(() => {
300+
setTimeout(() => {
301301
expect(error).to.exist;
302302
client.close(done);
303-
});
303+
}, 50);
304304
});
305305

306306
stream.pipe(process.stdout);

0 commit comments

Comments
 (0)