Skip to content

feat(NODE-2014)!: return executor result from withSession and withTransaction #3783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
}

/** @public */
export type WithSessionCallback = (session: ClientSession) => Promise<any>;
export type WithSessionCallback<T = unknown> = (session: ClientSession) => Promise<T>;

/** @internal */
export interface MongoClientPrivate {
Expand Down Expand Up @@ -605,29 +605,30 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
}

/**
* Runs a given operation with an implicitly created session. The lifetime of the session
* will be handled without the need for user interaction.
* A convenience method for creating and handling the clean up of a ClientSession.
* The session will always be ended when the executor finishes.
*
* NOTE: presently the operation MUST return a Promise (either explicit or implicitly as an async function)
*
* @param options - Optional settings for the command
* @param callback - An callback to execute with an implicitly created session
* @param executor - An executor function that all operations using the provided session must be invoked in
* @param options - optional settings for the session
*/
async withSession(callback: WithSessionCallback): Promise<void>;
async withSession(options: ClientSessionOptions, callback: WithSessionCallback): Promise<void>;
async withSession(
optionsOrOperation: ClientSessionOptions | WithSessionCallback,
callback?: WithSessionCallback
): Promise<void> {
async withSession<T = any>(executor: WithSessionCallback<T>): Promise<T>;
async withSession<T = any>(
options: ClientSessionOptions,
executor: WithSessionCallback<T>
): Promise<T>;
async withSession<T = any>(
optionsOrExecutor: ClientSessionOptions | WithSessionCallback<T>,
executor?: WithSessionCallback<T>
): Promise<T> {
const options = {
// Always define an owner
owner: Symbol(),
// If it's an object inherit the options
...(typeof optionsOrOperation === 'object' ? optionsOrOperation : {})
...(typeof optionsOrExecutor === 'object' ? optionsOrExecutor : {})
};

const withSessionCallback =
typeof optionsOrOperation === 'function' ? optionsOrOperation : callback;
typeof optionsOrExecutor === 'function' ? optionsOrExecutor : executor;

if (withSessionCallback == null) {
throw new MongoInvalidArgumentError('Missing required callback parameter');
Expand All @@ -636,7 +637,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
const session = this.startSession(options);

try {
await withSessionCallback(session);
return await withSessionCallback(session);
} finally {
try {
await session.endSession();
Expand Down
66 changes: 34 additions & 32 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export interface ClientSessionOptions {
}

/** @public */
export type WithTransactionCallback<T = void> = (session: ClientSession) => Promise<T>;
export type WithTransactionCallback<T = any> = (session: ClientSession) => Promise<T>;

/** @public */
export type ClientSessionEvents = {
Expand Down Expand Up @@ -432,18 +432,16 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
}

/**
* Runs a provided callback within a transaction, retrying either the commitTransaction operation
* or entire transaction as needed (and when the error permits) to better ensure that
* the transaction can complete successfully.
* Starts a transaction and runs a provided function, ensuring the commitTransaction is always attempted when all operations run in the function have completed.
*
* **IMPORTANT:** This method requires the user to return a Promise, and `await` all operations.
* Any callbacks that do not return a Promise will result in undefined behavior.
*
* @remarks
* This function:
* - Will return the command response from the final commitTransaction if every operation is successful (can be used as a truthy object)
* - Will return `undefined` if the transaction is explicitly aborted with `await session.abortTransaction()`
* - Will throw if one of the operations throws or `throw` statement is used inside the `withTransaction` callback
* - If all operations successfully complete and the `commitTransaction` operation is successful, then this function will return the result of the provided function.
* - If the transaction is unable to complete or an error is thrown from within the provided function, then this function will throw an error.
* - If the transaction is manually aborted within the provided function it will not throw.
* - May be called multiple times if the driver needs to attempt to retry the operations.
*
* Checkout a descriptive example here:
* @see https://www.mongodb.com/developer/quickstart/node-transactions/
Expand All @@ -452,7 +450,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
* @param options - optional settings for the transaction
* @returns A raw command response or undefined
*/
async withTransaction<T = void>(
async withTransaction<T = any>(
fn: WithTransactionCallback<T>,
options?: TransactionOptions
): Promise<Document | undefined> {
Expand Down Expand Up @@ -543,25 +541,29 @@ function attemptTransactionCommit<T>(
session: ClientSession,
startTime: number,
fn: WithTransactionCallback<T>,
options?: TransactionOptions
result: any,
options: TransactionOptions
): Promise<T> {
return session.commitTransaction().catch((err: MongoError) => {
if (
err instanceof MongoError &&
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
!isMaxTimeMSExpiredError(err)
) {
if (err.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) {
return attemptTransactionCommit(session, startTime, fn, options);
}
return session.commitTransaction().then(
() => result,
(err: MongoError) => {
if (
err instanceof MongoError &&
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
!isMaxTimeMSExpiredError(err)
) {
if (err.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) {
return attemptTransactionCommit(session, startTime, fn, result, options);
}

if (err.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
return attemptTransaction(session, startTime, fn, options);
if (err.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
return attemptTransaction(session, startTime, fn, options);
}
}
}

throw err;
});
throw err;
}
);
}

const USER_EXPLICIT_TXN_END_STATES = new Set<TxnState>([
Expand All @@ -574,11 +576,11 @@ function userExplicitlyEndedTransaction(session: ClientSession) {
return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
}

function attemptTransaction<TSchema>(
function attemptTransaction<T>(
session: ClientSession,
startTime: number,
fn: WithTransactionCallback<TSchema>,
options?: TransactionOptions
fn: WithTransactionCallback<T>,
options: TransactionOptions = {}
): Promise<any> {
session.startTransaction(options);

Expand All @@ -591,18 +593,18 @@ function attemptTransaction<TSchema>(

if (!isPromiseLike(promise)) {
session.abortTransaction().catch(() => null);
throw new MongoInvalidArgumentError(
'Function provided to `withTransaction` must return a Promise'
return Promise.reject(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd you revert the throw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it ends up being equivalent only because the caller is now an async function, but throwing here is incorrect since it changes the expectation that this function is always promise returning. I don't feel strongly about keeping this, esp with the ticket to refactor this properly coming up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just curious, this change is fine. It'll be moot once Malik gets to this code anyways

new MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise')
);
}

return promise.then(
() => {
result => {
if (userExplicitlyEndedTransaction(session)) {
return;
return result;
}

return attemptTransactionCommit(session, startTime, fn, options);
return attemptTransactionCommit(session, startTime, fn, result, options);
},
err => {
function maybeRetryOrThrow(err: MongoError): Promise<any> {
Expand Down
8 changes: 8 additions & 0 deletions test/integration/sessions/sessions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ describe('Sessions Spec', function () {

describe('withSession', function () {
let client: MongoClient;

beforeEach(async function () {
client = await this.configuration.newClient().connect();
});
Expand Down Expand Up @@ -184,6 +185,13 @@ describe('Sessions Spec', function () {
expect(client.s.sessionPool.sessions).to.have.length(1);
expect(sessionWasEnded).to.be.true;
});

it('resolves with the value the callback returns', async () => {
const result = await client.withSession(async session => {
return client.db('test').collection('foo').find({}, { session }).toArray();
});
expect(result).to.be.an('array');
});
});

context('unacknowledged writes', () => {
Expand Down
55 changes: 49 additions & 6 deletions test/integration/transactions/transactions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
MongoNetworkError,
type ServerSessionPool
} from '../../mongodb';
import { type FailPoint } from '../../tools/utils';

describe('Transactions', function () {
describe('withTransaction', function () {
Expand Down Expand Up @@ -90,33 +91,33 @@ describe('Transactions', function () {
await client.close();
});

it('should return undefined when transaction is aborted explicitly', async () => {
it('returns result of executor when transaction is aborted explicitly', async () => {
const session = client.startSession();

const withTransactionResult = await session
.withTransaction(async session => {
await collection.insertOne({ a: 1 }, { session });
await collection.findOne({ a: 1 }, { session });
await session.abortTransaction();
return 'aborted!';
})
.finally(async () => await session.endSession());

expect(withTransactionResult).to.be.undefined;
expect(withTransactionResult).to.equal('aborted!');
});

it('should return raw command when transaction is successfully committed', async () => {
it('returns result of executor when transaction is successfully committed', async () => {
const session = client.startSession();

const withTransactionResult = await session
.withTransaction(async session => {
await collection.insertOne({ a: 1 }, { session });
await collection.findOne({ a: 1 }, { session });
return 'committed!';
})
.finally(async () => await session.endSession());

expect(withTransactionResult).to.exist;
expect(withTransactionResult).to.be.an('object');
expect(withTransactionResult).to.have.property('ok', 1);
expect(withTransactionResult).to.equal('committed!');
});

it('should throw when transaction is aborted due to an error', async () => {
Expand All @@ -136,6 +137,48 @@ describe('Transactions', function () {
});
}
);

context('when retried', { requires: { mongodb: '>=4.2.0', topology: '!single' } }, () => {
let client: MongoClient;
let collection: Collection<{ a: number }>;

beforeEach(async function () {
client = this.configuration.newClient();

await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 },
data: {
failCommands: ['commitTransaction'],
errorCode: 24,
errorLabels: ['TransientTransactionError'],
closeConnection: false
}
} as FailPoint);

collection = await client.db('withTransaction').createCollection('withTransactionRetry');
});

afterEach(async () => {
await client?.close();
});

it('returns the value of the final call to the executor', async () => {
const session = client.startSession();

let counter = 0;
const withTransactionResult = await session
.withTransaction(async session => {
await collection.insertOne({ a: 1 }, { session });
counter += 1;
return counter;
})
.finally(async () => await session.endSession());

expect(counter).to.equal(3);
expect(withTransactionResult).to.equal(3);
});
});
});

describe('startTransaction', function () {
Expand Down
15 changes: 4 additions & 11 deletions test/tools/unified-spec-runner/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -593,18 +593,11 @@ operations.set('withTransaction', async ({ entities, operation, client, testConf
maxCommitTimeMS: operation.arguments!.maxCommitTimeMS
};

let errorFromOperations = null;
const result = await session.withTransaction(async () => {
errorFromOperations = await (async () => {
for (const callbackOperation of operation.arguments!.callback) {
await executeOperationAndCheck(callbackOperation, entities, client, testConfig);
}
})().catch(error => error);
await session.withTransaction(async () => {
for (const callbackOperation of operation.arguments!.callback) {
await executeOperationAndCheck(callbackOperation, entities, client, testConfig);
}
}, options);

if (result == null || errorFromOperations) {
throw errorFromOperations ?? Error('transaction not committed');
}
});

operations.set('countDocuments', async ({ entities, operation }) => {
Expand Down
10 changes: 10 additions & 0 deletions test/types/sessions.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@ expectType<ClientSession>(
})
);
expectError(client.startSession({ defaultTransactionOptions: { readConcern: 1 } }));

let something: any;
expectType<number>(await client.withSession(async () => 2));
expectType<string>(await client.withSession<string>(async () => something));
const untypedFn: any = () => 2;
expectType<any>(await client.withSession(untypedFn));
const unknownFn: () => Promise<unknown> = async () => 2;
expectType<unknown>(await client.withSession(unknownFn));
// Not a promise returning function
expectError(await client.withSession(() => null));