Skip to content

Commit bdae852

Browse files
committed
feat(NODE-5019): add runCursorCommand API
1 parent d6d76b4 commit bdae852

File tree

11 files changed

+532
-10
lines changed

11 files changed

+532
-10
lines changed

src/cursor/run_command_cursor.ts

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
import type { BSONSerializeOptions, Document, Long } from '../bson';
2+
import type { Db } from '../db';
3+
import {
4+
MongoCursorExhaustedError,
5+
MongoRuntimeError,
6+
MongoUnexpectedServerResponseError
7+
} from '../error';
8+
import type { MongoClient } from '../mongo_client';
9+
import { executeOperation } from '../operations/execute_operation';
10+
import { GetMoreOperation } from '../operations/get_more';
11+
import { KillCursorsOperation } from '../operations/kill_cursors';
12+
import { RunCommandOperation } from '../operations/run_command';
13+
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
14+
import type { Server } from '../sdam/server';
15+
import type { ClientSession } from '../sessions';
16+
import { List, MongoDBNamespace, ns } from '../utils';
17+
18+
/** @public */
19+
export type RunCommandCursorOptions = {
20+
readPreference?: ReadPreferenceLike;
21+
session?: ClientSession;
22+
} & BSONSerializeOptions;
23+
24+
/** @internal */
25+
type RunCursorCommandResponse = {
26+
cursor: { id: bigint | Long | number; ns: string; firstBatch: Document[] };
27+
ok: 1;
28+
};
29+
30+
/** @public */
31+
export class RunCommandCursor {
32+
/** @internal */
33+
client: MongoClient;
34+
/** @internal */
35+
db: Db;
36+
/** @internal */
37+
options: RunCommandCursorOptions;
38+
/** @internal */
39+
documents: List<Document>;
40+
/** @internal */
41+
storedIterator: AsyncGenerator<Document> | null = null;
42+
/** @internal */
43+
getMoreOptions: { comment?: any; maxAwaitTimeMS?: number; batchSize?: number } = {};
44+
/** @internal */
45+
id: bigint | null = null;
46+
/** @internal */
47+
ns: MongoDBNamespace | null = null;
48+
/** @internal */
49+
server: Server | null = null;
50+
/** @internal */
51+
readPreference: ReadPreference;
52+
/** @internal */
53+
session: ClientSession;
54+
/** @internal */
55+
done = false;
56+
57+
readonly command: Readonly<Record<string, any>>;
58+
59+
set comment(comment: any) {
60+
this.getMoreOptions.comment = comment;
61+
}
62+
63+
get comment() {
64+
return this.getMoreOptions.comment;
65+
}
66+
67+
set maxTimeMS(maxAwaitTimeMS: number) {
68+
this.getMoreOptions.maxAwaitTimeMS = maxAwaitTimeMS;
69+
}
70+
71+
get maxTimeMS() {
72+
return this.getMoreOptions.maxAwaitTimeMS ?? 0;
73+
}
74+
75+
set batchSize(batchSize: number) {
76+
this.getMoreOptions.batchSize = batchSize;
77+
}
78+
79+
get batchSize() {
80+
return this.getMoreOptions.batchSize ?? 0;
81+
}
82+
83+
/** @internal */
84+
constructor(
85+
client: MongoClient,
86+
db: Db,
87+
command: Document | Map<string, any>,
88+
options: RunCommandCursorOptions = {}
89+
) {
90+
this.client = client;
91+
this.db = db;
92+
this.command =
93+
Symbol.toStringTag in command && command[Symbol.toStringTag] === 'Map'
94+
? Object.freeze(Object.fromEntries(command.entries()))
95+
: Object.freeze({ ...command });
96+
this.options = options;
97+
this.documents = new List();
98+
this.readPreference = ReadPreference.fromOptions(options) ?? ReadPreference.primary;
99+
this.session = options.session == null ? client.startSession({ owner: this }) : options.session;
100+
}
101+
102+
/** @internal */
103+
private static getIdFromResponse(response: { cursor: { id: number | Long | bigint } }): bigint {
104+
return typeof response.cursor.id === 'number'
105+
? BigInt(response.cursor.id)
106+
: typeof response.cursor.id === 'object'
107+
? response.cursor.id.toBigInt()
108+
: response.cursor.id;
109+
}
110+
111+
/** @internal */
112+
private async *iterator(): AsyncGenerator<Document> {
113+
if (this.done) {
114+
throw new MongoCursorExhaustedError('This cursor needs a nap');
115+
}
116+
117+
try {
118+
await this.runCommand();
119+
while (this.documents.length) {
120+
const doc = this.documents.shift();
121+
if (doc != null) {
122+
yield doc;
123+
}
124+
if (this.documents.length === 0 && this.id !== 0n) {
125+
await this.getMore();
126+
}
127+
}
128+
} finally {
129+
await this.close().catch(() => null);
130+
}
131+
}
132+
133+
/** @internal */
134+
private get asyncIterator(): AsyncGenerator<Document> {
135+
if (this.storedIterator == null) {
136+
this.storedIterator = this.iterator();
137+
}
138+
return this.storedIterator;
139+
}
140+
141+
/** @internal */
142+
private async runCommand() {
143+
const operation = new RunCommandOperation<RunCursorCommandResponse>(this.db, this.command, {
144+
...this.options,
145+
session: this.session,
146+
readPreference: this.readPreference
147+
});
148+
149+
const commandResponse = await executeOperation(this.client, operation);
150+
151+
if (commandResponse.cursor == null) {
152+
throw new MongoUnexpectedServerResponseError('command must return a cursor document');
153+
}
154+
155+
this.id = RunCommandCursor.getIdFromResponse(commandResponse);
156+
this.ns = ns(commandResponse.cursor.ns);
157+
this.documents.pushMany(commandResponse.cursor.firstBatch);
158+
this.server = operation.server;
159+
}
160+
161+
/** @internal */
162+
private async getMore() {
163+
if (this.ns == null || this.id == null || this.server == null) {
164+
throw new MongoRuntimeError('getMore cannot be invoked with null namespace, id, nor server');
165+
}
166+
167+
const getMoreResponse = await executeOperation(
168+
this.client,
169+
new GetMoreOperation(this.ns, this.id, this.server, {
170+
...this.options,
171+
session: this.session,
172+
readPreference: this.readPreference,
173+
...this.getMoreOptions
174+
})
175+
);
176+
177+
this.documents.pushMany(getMoreResponse.cursor.nextBatch);
178+
this.id = RunCommandCursor.getIdFromResponse(getMoreResponse);
179+
}
180+
181+
/** @internal */
182+
async start(): Promise<void> {
183+
const result = await this.asyncIterator.next();
184+
if (!result.done) this.documents.unshift(result.value);
185+
}
186+
187+
async *[Symbol.asyncIterator](): AsyncGenerator<Document> {
188+
if (this.done) {
189+
throw new MongoCursorExhaustedError('This cursor needs a nap');
190+
}
191+
yield* this.asyncIterator;
192+
}
193+
194+
async next(): Promise<Document | null> {
195+
if (this.done) {
196+
throw new MongoCursorExhaustedError('This cursor needs a nap');
197+
}
198+
199+
const result = await this.asyncIterator.next();
200+
201+
if (result.done) {
202+
return null;
203+
}
204+
205+
return result.value;
206+
}
207+
208+
async toArray() {
209+
const documents = new List();
210+
for await (const doc of this) {
211+
documents.push(doc);
212+
}
213+
return documents.toArray();
214+
}
215+
216+
async close() {
217+
if (this.done) {
218+
return;
219+
}
220+
if (this.id != null && this.id !== 0n && this.ns != null && this.server != null) {
221+
const options = this.session.hasEnded ? {} : { session: this.session };
222+
const killCursorsOperation = new KillCursorsOperation(this.id, this.ns, this.server, options);
223+
await executeOperation(this.client, killCursorsOperation).catch(() => null);
224+
}
225+
if (this.session.owner === this) {
226+
await this.session.endSession().catch(() => null);
227+
}
228+
this.done = true;
229+
}
230+
}

src/db.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { Collection, CollectionOptions } from './collection';
55
import * as CONSTANTS from './constants';
66
import { AggregationCursor } from './cursor/aggregation_cursor';
77
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
8+
import { RunCommandCursor, type RunCommandCursorOptions } from './cursor/run_command_cursor';
89
import { MongoAPIError, MongoInvalidArgumentError } from './error';
910
import type { MongoClient, PkFactory } from './mongo_client';
1011
import type { TODO_NODE_3286 } from './mongo_types';
@@ -523,6 +524,22 @@ export class Db {
523524

524525
return new ChangeStream<TSchema, TChange>(this, pipeline, resolveOptions(this, options));
525526
}
527+
528+
/**
529+
* A low level cursor API providing basic driver functionality.
530+
* - ClientSession management
531+
* - ReadPreference for server selection
532+
* - Running getMores automatically when a local batch is exhausted
533+
*
534+
* @param command - The command that will start a cursor on the server.
535+
* @param options - Configurations for running the command, bson options will apply to getMores
536+
*/
537+
runCursorCommand(
538+
command: Document | Map<string, any>,
539+
options?: RunCommandCursorOptions
540+
): RunCommandCursor {
541+
return new RunCommandCursor(this.s.client, this, command, options);
542+
}
526543
}
527544

528545
// TODO(NODE-3484): Refactor into MongoDBNamespace

src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { AggregationCursor } from './cursor/aggregation_cursor';
88
import { FindCursor } from './cursor/find_cursor';
99
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
1010
import { ListIndexesCursor } from './cursor/list_indexes_cursor';
11+
import type { RunCommandCursor } from './cursor/run_command_cursor';
1112
import { Db } from './db';
1213
import { GridFSBucket } from './gridfs';
1314
import { GridFSBucketReadStream } from './gridfs/download';
@@ -87,6 +88,7 @@ export {
8788
ListIndexesCursor,
8889
MongoClient,
8990
OrderedBulkOperation,
91+
RunCommandCursor,
9092
UnorderedBulkOperation
9193
};
9294

@@ -274,6 +276,7 @@ export type {
274276
ChangeStreamAggregateRawResult,
275277
ChangeStreamCursorOptions
276278
} from './cursor/change_stream_cursor';
279+
export type { RunCommandCursorOptions } from './cursor/run_command_cursor';
277280
export type { DbOptions, DbPrivate } from './db';
278281
export type { AutoEncrypter, AutoEncryptionOptions, AutoEncryptionTlsOptions } from './deps';
279282
export type { Encrypter, EncrypterOptions } from './encrypter';

src/operations/get_more.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export interface GetMoreOptions extends OperationOptions {
2626
* @internal
2727
*/
2828
export interface GetMoreCommand {
29-
getMore: Long;
29+
getMore: Long | bigint;
3030
collection: string;
3131
batchSize?: number;
3232
maxTimeMS?: number;
@@ -36,10 +36,15 @@ export interface GetMoreCommand {
3636

3737
/** @internal */
3838
export class GetMoreOperation extends AbstractOperation {
39-
cursorId: Long;
39+
cursorId: Long | bigint;
4040
override options: GetMoreOptions;
4141

42-
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) {
42+
constructor(
43+
ns: MongoDBNamespace,
44+
cursorId: Long | bigint,
45+
server: Server,
46+
options: GetMoreOptions
47+
) {
4348
super(options);
4449

4550
this.options = options;
@@ -63,7 +68,11 @@ export class GetMoreOperation extends AbstractOperation {
6368
);
6469
}
6570

66-
if (this.cursorId == null || this.cursorId.isZero()) {
71+
if (
72+
this.cursorId == null ||
73+
this.cursorId === 0n ||
74+
(typeof this.cursorId === 'object' && this.cursorId.isZero())
75+
) {
6776
return callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
6877
}
6978

src/operations/kill_cursors.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@ import { AbstractOperation, Aspect, defineAspects, OperationOptions } from './op
1111
*/
1212
interface KillCursorsCommand {
1313
killCursors: string;
14-
cursors: Long[];
14+
cursors: (Long | bigint)[];
1515
comment?: unknown;
1616
}
1717

1818
export class KillCursorsOperation extends AbstractOperation {
19-
cursorId: Long;
20-
21-
constructor(cursorId: Long, ns: MongoDBNamespace, server: Server, options: OperationOptions) {
19+
cursorId: Long | bigint;
20+
21+
constructor(
22+
cursorId: Long | bigint,
23+
ns: MongoDBNamespace,
24+
server: Server,
25+
options: OperationOptions
26+
) {
2227
super(options);
2328
this.ns = ns;
2429
this.cursorId = cursorId;

src/sessions.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { ConnectionPoolMetrics } from './cmap/metrics';
66
import { isSharded } from './cmap/wire_protocol/shared';
77
import { PINNED, UNPINNED } from './constants';
88
import type { AbstractCursor } from './cursor/abstract_cursor';
9+
import type { RunCommandCursor } from './cursor/run_command_cursor';
910
import {
1011
AnyError,
1112
MongoAPIError,
@@ -53,7 +54,7 @@ export interface ClientSessionOptions {
5354
defaultTransactionOptions?: TransactionOptions;
5455

5556
/** @internal */
56-
owner?: symbol | AbstractCursor;
57+
owner?: symbol | AbstractCursor | RunCommandCursor;
5758
/** @internal */
5859
explicit?: boolean;
5960
/** @internal */
@@ -108,7 +109,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
108109
operationTime?: Timestamp;
109110
explicit: boolean;
110111
/** @internal */
111-
owner?: symbol | AbstractCursor;
112+
owner?: symbol | AbstractCursor | RunCommandCursor;
112113
defaultTransactionOptions: TransactionOptions;
113114
transaction: Transaction;
114115
/** @internal */

test/mongodb.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ export * from '../src/cursor/change_stream_cursor';
138138
export * from '../src/cursor/find_cursor';
139139
export * from '../src/cursor/list_collections_cursor';
140140
export * from '../src/cursor/list_indexes_cursor';
141+
export * from '../src/cursor/run_command_cursor';
141142
export * from '../src/db';
142143
export * from '../src/deps';
143144
export * from '../src/encrypter';

0 commit comments

Comments
 (0)