Skip to content

Commit bb83edb

Browse files
committed
perf: workaround creating many AbortControllers
1 parent c36c103 commit bb83edb

File tree

4 files changed

+101
-28
lines changed

4 files changed

+101
-28
lines changed

src/cmap/connection.ts

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { on } from 'stream';
21
import { clearTimeout, setTimeout } from 'timers';
32
import { promisify } from 'util';
43

@@ -61,6 +60,7 @@ import type { ClientMetadata } from './handshake/client_metadata';
6160
import { MessageStream, type OperationDescription } from './message_stream';
6261
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
6362
import { decompressResponse } from './wire_protocol/compression';
63+
import { onData } from './wire_protocol/on_data';
6464
import { getReadPreference, isSharded } from './wire_protocol/shared';
6565

6666
/** @internal */
@@ -1052,9 +1052,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
10521052
signal: this.controller.signal
10531053
});
10541054

1055-
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
1056-
this.controller = new AbortController();
1057-
10581055
if (options.noResponse) {
10591056
yield { ok: 1 };
10601057
return;
@@ -1080,9 +1077,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
10801077
}
10811078
}
10821079

1083-
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
1084-
this.controller = new AbortController();
1085-
10861080
yield document;
10871081
this.controller.signal.throwIfAborted();
10881082

@@ -1205,11 +1199,11 @@ const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
12051199
*/
12061200
export async function* readWireProtocolMessages(
12071201
connection: ModernConnection,
1208-
{ signal }: { signal?: AbortSignal } = {}
1202+
{ signal }: { signal: AbortSignal }
12091203
): AsyncGenerator<Buffer> {
12101204
const bufferPool = new BufferPool();
12111205
const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize;
1212-
for await (const [chunk] of on(connection.socket, 'data', { signal })) {
1206+
for await (const chunk of onData(connection.socket, { signal })) {
12131207
if (connection.delayedTimeoutId) {
12141208
clearTimeout(connection.delayedTimeoutId);
12151209
connection.delayedTimeoutId = null;
@@ -1277,7 +1271,7 @@ export async function writeCommand(
12771271
*/
12781272
export async function* readMany(
12791273
connection: ModernConnection,
1280-
options: { signal?: AbortSignal } = {}
1274+
options: { signal: AbortSignal }
12811275
): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
12821276
for await (const message of readWireProtocolMessages(connection, options)) {
12831277
const response = await decompressResponse(message);
@@ -1288,19 +1282,3 @@ export async function* readMany(
12881282
}
12891283
}
12901284
}
1291-
1292-
/**
1293-
* @internal
1294-
*
1295-
* Reads a single wire protocol message out of a connection.
1296-
*/
1297-
export async function read(
1298-
connection: ModernConnection,
1299-
options: { signal?: AbortSignal } = {}
1300-
): Promise<OpMsgResponse | OpQueryResponse> {
1301-
for await (const value of readMany(connection, options)) {
1302-
return value;
1303-
}
1304-
1305-
throw new MongoRuntimeError('unable to read message off of connection');
1306-
}

src/cmap/wire_protocol/on_data.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import { type EventEmitter } from 'events';
2+
3+
import { List, promiseWithResolvers } from '../../utils';
4+
5+
export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) {
6+
const signal = options.signal;
7+
signal.throwIfAborted();
8+
9+
// Preparing controlling queues and variables
10+
const unconsumedEvents = new List<Buffer>();
11+
const unconsumedPromises = new List<
12+
Omit<ReturnType<typeof promiseWithResolvers<IteratorResult<Buffer>>>, 'promise'>
13+
>();
14+
let error: Error | null = null;
15+
let finished = false;
16+
17+
const iterator: AsyncGenerator<Buffer> = {
18+
next() {
19+
// First, we consume all unread events
20+
const value = unconsumedEvents.shift();
21+
if (value != null) {
22+
return Promise.resolve({ value, done: false });
23+
}
24+
25+
// Then we error, if an error happened
26+
// This happens one time if at all, because after 'error'
27+
// we stop listening
28+
if (error != null) {
29+
const p = Promise.reject(error);
30+
// Only the first element errors
31+
error = null;
32+
return p;
33+
}
34+
35+
// If the iterator is finished, resolve to done
36+
if (finished) return closeHandler();
37+
38+
// Wait until an event happens
39+
const { promise, resolve, reject } = promiseWithResolvers<IteratorResult<Buffer>>();
40+
unconsumedPromises.push({ resolve, reject });
41+
return promise;
42+
},
43+
44+
return() {
45+
return closeHandler();
46+
},
47+
48+
throw(err: Error) {
49+
errorHandler(err);
50+
return Promise.resolve({ value: undefined, done: true });
51+
},
52+
53+
[Symbol.asyncIterator]() {
54+
return this;
55+
}
56+
};
57+
58+
// Adding event handlers
59+
emitter.on('data', eventHandler);
60+
emitter.on('error', errorHandler);
61+
signal.addEventListener('abort', abortListener, { once: true });
62+
63+
return iterator;
64+
65+
function abortListener() {
66+
errorHandler(signal.reason);
67+
}
68+
69+
function eventHandler(value: Buffer) {
70+
const promise = unconsumedPromises.shift();
71+
if (promise != null) promise.resolve({ value, done: false });
72+
else unconsumedEvents.push(value);
73+
}
74+
75+
function errorHandler(err: Error) {
76+
const promise = unconsumedPromises.shift();
77+
if (promise != null) promise.reject(err);
78+
else error = err;
79+
void closeHandler();
80+
}
81+
82+
function closeHandler() {
83+
// Adding event handlers
84+
emitter.off('data', eventHandler);
85+
emitter.off('error', errorHandler);
86+
signal.removeEventListener('abort', abortListener);
87+
finished = true;
88+
const doneResult = { value: undefined, done: finished } as const;
89+
90+
for (const promise of unconsumedPromises) {
91+
promise.resolve(doneResult);
92+
}
93+
94+
return Promise.resolve(doneResult);
95+
}
96+
}

src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ export type {
270270
DestroyOptions,
271271
ModernConnection,
272272
ProxyOptions,
273-
read,
274273
readMany,
275274
writeCommand
276275
} from './cmap/connection';

src/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1330,7 +1330,7 @@ export async function abortable<T>(
13301330
}
13311331
}
13321332

1333-
function promiseWithResolvers<T>() {
1333+
export function promiseWithResolvers<T>() {
13341334
let resolve: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[0];
13351335
let reject: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[1];
13361336
const promise = new Promise<T>(function withResolversExecutor(promiseResolve, promiseReject) {

0 commit comments

Comments
 (0)