Skip to content

Commit 517a826

Browse files
authored
Merge branch 'main' into NODE-4686/CLAM-logging-with-serverConnectionid
2 parents e9d4f8d + 86e2659 commit 517a826

File tree

64 files changed

+61818
-197
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+61818
-197
lines changed

.github/workflows/release-nightly.yml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
on:
2-
# TODO: We can reenable cron when needed
3-
# schedule:
4-
# # Timezone is UTC
5-
# # https://crontab.guru/#0_0_*_*_*
6-
# # At 00:00 every day.
7-
# - cron: '0 0 * * *'
2+
schedule:
3+
# Timezone is UTC
4+
# https://crontab.guru/#0_0_*_*_*
5+
# At 00:00 every day.
6+
- cron: '0 0 * * *'
87

98
# Allows us to manually trigger a nightly
109
# Since npm prevents duplicate releases we can run this at any time

src/bulk/common.ts

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -579,23 +579,18 @@ function executeCommands(
579579
}
580580

581581
try {
582-
if (isInsertBatch(batch)) {
583-
executeOperation(
584-
bulkOperation.s.collection.client,
585-
new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
586-
resultHandler
587-
);
588-
} else if (isUpdateBatch(batch)) {
589-
executeOperation(
590-
bulkOperation.s.collection.client,
591-
new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
592-
resultHandler
593-
);
594-
} else if (isDeleteBatch(batch)) {
595-
executeOperation(
596-
bulkOperation.s.collection.client,
597-
new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
598-
resultHandler
582+
const operation = isInsertBatch(batch)
583+
? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
584+
: isUpdateBatch(batch)
585+
? new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
586+
: isDeleteBatch(batch)
587+
? new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
588+
: null;
589+
590+
if (operation != null) {
591+
executeOperation(bulkOperation.s.collection.client, operation).then(
592+
result => resultHandler(undefined, result),
593+
error => resultHandler(error)
599594
);
600595
}
601596
} catch (err) {

src/operations/execute_operation.ts

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
} from '../sdam/server_selection';
2626
import type { Topology } from '../sdam/topology';
2727
import type { ClientSession } from '../sessions';
28-
import { type Callback, maybeCallback, supportsRetryableWrites } from '../utils';
28+
import { supportsRetryableWrites } from '../utils';
2929
import { AbstractOperation, Aspect } from './operation';
3030

3131
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
@@ -51,36 +51,23 @@ export interface ExecutionResult {
5151
* @internal
5252
*
5353
* @remarks
54-
* This method reduces large amounts of duplication in the entire codebase by providing
55-
* a single point for determining whether callbacks or promises should be used. Additionally
56-
* it allows for a single point of entry to provide features such as implicit sessions, which
54+
* Allows for a single point of entry to provide features such as implicit sessions, which
5755
* are required by the Driver Sessions specification in the event that a ClientSession is
58-
* not provided
56+
* not provided.
5957
*
60-
* @param topology - The topology to execute this operation on
58+
* The expectation is that this function:
59+
* - Connects the MongoClient if it has not already been connected
60+
* - Creates a session if none is provided and cleans up the session it creates
61+
* - Selects a server based on readPreference or various factors
62+
* - Retries an operation if it fails for certain errors, see {@link retryOperation}
63+
*
64+
* @typeParam T - The operation's type
65+
* @typeParam TResult - The type of the operation's result, calculated from T
66+
*
67+
* @param client - The MongoClient to execute this operation with
6168
* @param operation - The operation to execute
62-
* @param callback - The command result callback
6369
*/
64-
export function executeOperation<
65-
T extends AbstractOperation<TResult>,
66-
TResult = ResultTypeFromOperation<T>
67-
>(client: MongoClient, operation: T): Promise<TResult>;
68-
export function executeOperation<
69-
T extends AbstractOperation<TResult>,
70-
TResult = ResultTypeFromOperation<T>
71-
>(client: MongoClient, operation: T, callback: Callback<TResult>): void;
72-
export function executeOperation<
73-
T extends AbstractOperation<TResult>,
74-
TResult = ResultTypeFromOperation<T>
75-
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void;
76-
export function executeOperation<
77-
T extends AbstractOperation<TResult>,
78-
TResult = ResultTypeFromOperation<T>
79-
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void {
80-
return maybeCallback(() => executeOperationAsync(client, operation), callback);
81-
}
82-
83-
async function executeOperationAsync<
70+
export async function executeOperation<
8471
T extends AbstractOperation<TResult>,
8572
TResult = ResultTypeFromOperation<T>
8673
>(client: MongoClient, operation: T): Promise<TResult> {

src/sessions.ts

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -754,45 +754,46 @@ function endTransaction(
754754
command.recoveryToken = session.transaction.recoveryToken;
755755
}
756756

757+
const handleFirstCommandAttempt = (error?: Error) => {
758+
if (command.abortTransaction) {
759+
// always unpin on abort regardless of command outcome
760+
session.unpin();
761+
}
762+
763+
if (error instanceof MongoError && isRetryableWriteError(error)) {
764+
// SPEC-1185: apply majority write concern when retrying commitTransaction
765+
if (command.commitTransaction) {
766+
// per txns spec, must unpin session in this case
767+
session.unpin({ force: true });
768+
769+
command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
770+
w: 'majority'
771+
});
772+
}
773+
774+
executeOperation(
775+
session.client,
776+
new RunAdminCommandOperation(command, {
777+
session,
778+
readPreference: ReadPreference.primary,
779+
bypassPinningCheck: true
780+
})
781+
).then(() => commandHandler(), commandHandler);
782+
return;
783+
}
784+
785+
commandHandler(error);
786+
};
787+
757788
// send the command
758789
executeOperation(
759790
session.client,
760791
new RunAdminCommandOperation(command, {
761792
session,
762793
readPreference: ReadPreference.primary,
763794
bypassPinningCheck: true
764-
}),
765-
error => {
766-
if (command.abortTransaction) {
767-
// always unpin on abort regardless of command outcome
768-
session.unpin();
769-
}
770-
771-
if (error instanceof MongoError && isRetryableWriteError(error)) {
772-
// SPEC-1185: apply majority write concern when retrying commitTransaction
773-
if (command.commitTransaction) {
774-
// per txns spec, must unpin session in this case
775-
session.unpin({ force: true });
776-
777-
command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
778-
w: 'majority'
779-
});
780-
}
781-
782-
return executeOperation(
783-
session.client,
784-
new RunAdminCommandOperation(command, {
785-
session,
786-
readPreference: ReadPreference.primary,
787-
bypassPinningCheck: true
788-
}),
789-
commandHandler
790-
);
791-
}
792-
793-
commandHandler(error);
794-
}
795-
);
795+
})
796+
).then(() => handleFirstCommandAttempt(), handleFirstCommandAttempt);
796797
}
797798

798799
/** @public */

src/utils.ts

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -330,30 +330,6 @@ export function* makeCounter(seed = 0): Generator<number> {
330330
}
331331
}
332332

333-
/**
334-
* Helper for handling legacy callback support.
335-
*/
336-
export function maybeCallback<T>(promiseFn: () => Promise<T>, callback: null): Promise<T>;
337-
export function maybeCallback<T>(
338-
promiseFn: () => Promise<T>,
339-
callback?: Callback<T>
340-
): Promise<T> | void;
341-
export function maybeCallback<T>(
342-
promiseFn: () => Promise<T>,
343-
callback?: Callback<T> | null
344-
): Promise<T> | void {
345-
const promise = promiseFn();
346-
if (callback == null) {
347-
return promise;
348-
}
349-
350-
promise.then(
351-
result => callback(undefined, result),
352-
error => callback(error)
353-
);
354-
return;
355-
}
356-
357333
/**
358334
* Synchronously Generate a UUIDv4
359335
* @internal

test/integration/client-side-operations-timeout/.gitkeep

Whitespace-only changes.

0 commit comments

Comments
 (0)