Skip to content

Commit fe4821c

Browse files
authored
refactor: remove cursorState from Cursor and wire protcol methods (#2549)
Cursors presently have an internal state tracking object called `cursorState` which has a high degree of coupling with multiple components in the system, and is source of great confusion and bugs. This patch removes the `cursorState` completely, mounting its relevant properties directly on the `CoreCursor` type and removing it from the signature of the `query`, `killCursors`, and `getMore` wire protocol methods. Many changes were required as part of this refactorinig, but most notable is cleanup around cursor `close`. The internal call to kill the cursor raced with the `_endsession` method, which uncovered a number of failures in our test suite when fixed. NODE-2810
1 parent a2c113f commit fe4821c

23 files changed

+1146
-1275
lines changed

src/change_stream.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,8 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
451451
}
452452

453453
cacheResumeToken(resumeToken: ResumeToken): void {
454-
if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
455-
this.resumeToken = this.cursorState.postBatchResumeToken;
454+
if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
455+
this.resumeToken = this.postBatchResumeToken;
456456
} else {
457457
this.resumeToken = resumeToken;
458458
}
@@ -462,7 +462,7 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
462462
_processBatch(batchName: string, response?: Document): void {
463463
const cursor = response?.cursor || {};
464464
if (cursor.postBatchResumeToken) {
465-
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
465+
this.postBatchResumeToken = cursor.postBatchResumeToken;
466466

467467
if (cursor[batchName].length === 0) {
468468
this.resumeToken = cursor.postBatchResumeToken;

src/cmap/connection.ts

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,18 @@ import {
1212
MongoWriteConcernError
1313
} from '../error';
1414
import type { BinMsg, WriteProtocolMessageType, Response } from './commands';
15-
import type { Document } from '../bson';
15+
import type { Document, Long } from '../bson';
1616
import type { AutoEncrypter } from '../deps';
1717
import type { ConnectionOptions as TLSConnectionOptions } from 'tls';
1818
import type { TcpNetConnectOpts, IpcNetConnectOpts } from 'net';
1919
import type { Server } from '../sdam/server';
2020
import type { MongoCredentials } from './auth/mongo_credentials';
2121
import type { CommandOptions } from './wire_protocol/command';
22-
import type { QueryOptions } from './wire_protocol/query';
23-
import type { InternalCursorState } from '../cursor/core_cursor';
2422
import type { GetMoreOptions } from './wire_protocol/get_more';
2523
import type { InsertOptions, UpdateOptions, RemoveOptions } from './wire_protocol/index';
2624
import type { Stream } from './connect';
2725
import type { LoggerOptions } from '../logger';
26+
import type { FindOptions } from '../operations/find';
2827

2928
const kStream = Symbol('stream');
3029
const kQueue = Symbol('queue');
@@ -247,28 +246,16 @@ export class Connection extends EventEmitter {
247246
wp.command(makeServerTrampoline(this), ns, cmd, options as CommandOptions, callback);
248247
}
249248

250-
query(
251-
ns: string,
252-
cmd: Document,
253-
cursorState: InternalCursorState,
254-
options: QueryOptions,
255-
callback: Callback
256-
): void {
257-
wp.query(makeServerTrampoline(this), ns, cmd, cursorState, options, callback);
249+
query(ns: string, cmd: Document, options: FindOptions, callback: Callback): void {
250+
wp.query(makeServerTrampoline(this), ns, cmd, options, callback);
258251
}
259252

260-
getMore(
261-
ns: string,
262-
cursorState: InternalCursorState,
263-
batchSize: number,
264-
options: GetMoreOptions,
265-
callback: Callback
266-
): void {
267-
wp.getMore(makeServerTrampoline(this), ns, cursorState, batchSize, options, callback);
253+
getMore(ns: string, cursorId: Long, options: GetMoreOptions, callback: Callback<Document>): void {
254+
wp.getMore(makeServerTrampoline(this), ns, cursorId, options, callback);
268255
}
269256

270-
killCursors(ns: string, cursorState: InternalCursorState, callback: Callback): void {
271-
wp.killCursors(makeServerTrampoline(this), ns, cursorState, callback);
257+
killCursors(ns: string, cursorIds: Long[], options: CommandOptions, callback: Callback): void {
258+
wp.killCursors(makeServerTrampoline(this), ns, cursorIds, options, callback);
272259
}
273260

274261
insert(ns: string, ops: Document[], options: InsertOptions, callback: Callback): void {

src/cmap/wire_protocol/get_more.ts

Lines changed: 26 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,64 @@
11
import { GetMore } from '../commands';
22
import { Long, Document, pluckBSONSerializeOptions } from '../../bson';
3-
import { MongoError, MongoNetworkError } from '../../error';
3+
import { MongoError } from '../../error';
44
import { applyCommonQueryOptions } from './shared';
55
import { maxWireVersion, collectionNamespace, Callback } from '../../utils';
66
import { command, CommandOptions } from './command';
77
import type { Server } from '../../sdam/server';
8-
import type { InternalCursorState } from '../../cursor/core_cursor';
98

109
/** @internal */
11-
export type GetMoreOptions = CommandOptions;
10+
export interface GetMoreOptions extends CommandOptions {
11+
batchSize?: number;
12+
maxTimeMS?: number;
13+
maxAwaitTimeMS?: number;
14+
comment?: Document;
15+
}
1216

1317
export function getMore(
1418
server: Server,
1519
ns: string,
16-
cursorState: InternalCursorState,
17-
batchSize: number,
20+
cursorId: Long,
1821
options: GetMoreOptions,
1922
callback: Callback<Document>
2023
): void {
2124
options = options || {};
2225

26+
const fullResult = typeof options.fullResult === 'boolean' ? options.fullResult : false;
2327
const wireVersion = maxWireVersion(server);
24-
const queryCallback: Callback<Document> = function (err, response) {
25-
if (err || !response) return callback(err);
26-
27-
// If we have a timed out query or a cursor that was killed
28-
if (response.cursorNotFound) {
29-
return callback(new MongoNetworkError('cursor killed or timed out'));
30-
}
31-
32-
if (wireVersion < 4) {
33-
const cursorId =
34-
typeof response.cursorId === 'number'
35-
? Long.fromNumber(response.cursorId)
36-
: response.cursorId;
37-
38-
cursorState.documents = response.documents;
39-
cursorState.cursorId = cursorId;
40-
41-
callback();
42-
return;
43-
}
44-
45-
// We have an error detected
46-
if (response.ok === 0) {
47-
return callback(new MongoError(response));
48-
}
49-
50-
// Ensure we have a Long valid cursor id
51-
const cursorId =
52-
typeof response.cursor.id === 'number'
53-
? Long.fromNumber(response.cursor.id)
54-
: response.cursor.id;
55-
56-
cursorState.documents = response.cursor.nextBatch;
57-
cursorState.cursorId = cursorId;
58-
59-
callback(undefined, response);
60-
};
61-
62-
if (!cursorState.cursorId) {
28+
if (!cursorId) {
6329
callback(new MongoError('Invalid internal cursor state, no known cursor id'));
6430
return;
6531
}
6632

67-
const cursorId =
68-
cursorState.cursorId instanceof Long
69-
? cursorState.cursorId
70-
: Long.fromNumber((cursorState.cursorId as unknown) as number);
71-
7233
if (wireVersion < 4) {
73-
const getMoreOp = new GetMore(ns, cursorId, { numberToReturn: batchSize });
34+
const getMoreOp = new GetMore(ns, cursorId, { numberToReturn: options.batchSize });
7435
const queryOptions = applyCommonQueryOptions(
7536
{},
76-
Object.assign({ bsonOptions: pluckBSONSerializeOptions(options) }, cursorState)
37+
Object.assign(options, { ...pluckBSONSerializeOptions(options) })
7738
);
7839

7940
queryOptions.fullResult = true;
8041
queryOptions.command = true;
81-
server.s.pool.write(getMoreOp, queryOptions, queryCallback);
42+
server.s.pool.write(getMoreOp, queryOptions, (err, response) => {
43+
if (fullResult) return callback(err, response);
44+
if (err) return callback(err);
45+
callback(undefined, { cursor: { id: response.cursorId, nextBatch: response.documents } });
46+
});
47+
8248
return;
8349
}
8450

8551
const getMoreCmd: Document = {
8652
getMore: cursorId,
87-
collection: collectionNamespace(ns),
88-
batchSize: Math.abs(batchSize)
53+
collection: collectionNamespace(ns)
8954
};
9055

91-
if (cursorState.cmd.tailable && typeof cursorState.cmd.maxAwaitTimeMS === 'number') {
92-
getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS;
56+
if (typeof options.batchSize === 'number') {
57+
getMoreCmd.batchSize = Math.abs(options.batchSize);
58+
}
59+
60+
if (typeof options.maxAwaitTimeMS === 'number') {
61+
getMoreCmd.maxTimeMS = options.maxAwaitTimeMS;
9362
}
9463

9564
const commandOptions = Object.assign(
@@ -100,9 +69,5 @@ export function getMore(
10069
options
10170
);
10271

103-
if (cursorState.session) {
104-
commandOptions.session = cursorState.session;
105-
}
106-
107-
command(server, ns, getMoreCmd, commandOptions, queryCallback);
72+
command(server, ns, getMoreCmd, commandOptions, callback);
10873
}

src/cmap/wire_protocol/kill_cursors.ts

Lines changed: 10 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,52 +3,28 @@ import { maxWireVersion, collectionNamespace, Callback } from '../../utils';
33
import { command, CommandOptions } from './command';
44
import { MongoError, MongoNetworkError } from '../../error';
55
import type { Server } from '../../sdam/server';
6-
import type { InternalCursorState } from '../../cursor/core_cursor';
7-
import type { ClientSession } from '../../sessions';
8-
9-
interface KillCursorOptions {
10-
session?: ClientSession;
11-
immediateRelease: boolean;
12-
noResponse: boolean;
13-
}
6+
import type { Long } from '../../bson';
147

158
export function killCursors(
169
server: Server,
1710
ns: string,
18-
cursorState: InternalCursorState,
11+
cursorIds: Long[],
12+
options: CommandOptions,
1913
callback: Callback
2014
): void {
2115
callback = typeof callback === 'function' ? callback : () => undefined;
22-
23-
if (!cursorState.cursorId) {
24-
callback(new MongoError('Invalid internal cursor state, no known cursor id'));
25-
return;
16+
if (!cursorIds || !Array.isArray(cursorIds)) {
17+
throw new TypeError('Invalid list of cursor ids provided: ' + cursorIds);
2618
}
2719

28-
const cursorIds = [cursorState.cursorId];
29-
3020
if (maxWireVersion(server) < 4) {
3121
const pool = server.s.pool;
3222
const killCursor = new KillCursor(ns, cursorIds);
33-
const options: KillCursorOptions = {
34-
immediateRelease: true,
35-
noResponse: true
36-
};
3723

38-
if (typeof cursorState.session === 'object') {
39-
options.session = cursorState.session;
40-
}
41-
42-
if (pool && pool.isConnected()) {
43-
try {
44-
pool.write(killCursor, options, callback);
45-
} catch (err) {
46-
if (typeof callback === 'function') {
47-
callback(err, null);
48-
} else {
49-
console.warn(err);
50-
}
51-
}
24+
try {
25+
pool.write(killCursor, { noResponse: true, ...options }, callback);
26+
} catch (err) {
27+
callback(err);
5228
}
5329

5430
return;
@@ -59,12 +35,7 @@ export function killCursors(
5935
cursors: cursorIds
6036
};
6137

62-
const options: CommandOptions = { fullResult: true };
63-
if (typeof cursorState.session === 'object') {
64-
options.session = cursorState.session;
65-
}
66-
67-
command(server, ns, killCursorCmd, options, (err, response) => {
38+
command(server, ns, killCursorCmd, { fullResult: true, ...options }, (err, response) => {
6839
if (err || !response) {
6940
return callback(err);
7041
}

0 commit comments

Comments
 (0)