diff --git a/src/bulk/common.ts b/src/bulk/common.ts index e4da3762656..8d29804b07d 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -2,14 +2,14 @@ import { PromiseProvider } from '../promise_provider'; import { Long, ObjectId, Document, BSONSerializeOptions, resolveBSONOptions } from '../bson'; import { MongoError, MongoWriteConcernError, AnyError } from '../error'; import { - applyWriteConcern, applyRetryableWrites, executeLegacyOperation, hasAtomicOperators, Callback, MongoDBNamespace, maxWireVersion, - getTopology + getTopology, + resolveOptions } from '../utils'; import { executeOperation } from '../operations/execute_operation'; import { InsertOperation } from '../operations/insert'; @@ -571,14 +571,10 @@ function executeCommands( executeCommands(bulkOperation, options, callback); } - const finalOptions = Object.assign( - { ordered: bulkOperation.isOrdered }, - bulkOperation.bsonOptions, - options - ); - if (bulkOperation.s.writeConcern != null) { - finalOptions.writeConcern = bulkOperation.s.writeConcern; - } + const finalOptions = resolveOptions(bulkOperation, { + ...options, + ordered: bulkOperation.isOrdered + }); if (finalOptions.bypassDocumentValidation !== true) { delete finalOptions.bypassDocumentValidation; @@ -935,10 +931,9 @@ export abstract class BulkOperationBase { // + 1 bytes for null terminator const maxKeySize = (maxWriteBatchSize - 1).toString(10).length + 2; - // Final options for retryable writes and write concern + // Final options for retryable writes let finalOptions = Object.assign({}, options); finalOptions = applyRetryableWrites(finalOptions, collection.s.db); - finalOptions = applyWriteConcern(finalOptions, { collection: collection }, options); // Final results const bulkResult: BulkResult = { @@ -983,7 +978,7 @@ export abstract class BulkOperationBase { // Options options: finalOptions, // BSON options - bsonOptions: resolveBSONOptions(options, collection), + bsonOptions: resolveBSONOptions(options), // Current operation currentOp, // Executed @@ -1169,19 +1164,18 @@ export abstract class BulkOperationBase { return this.s.bsonOptions; } + get writeConcern(): WriteConcern | undefined { + return this.s.writeConcern; + } + /** An internal helper method. Do not invoke directly. Will be going away in the future */ - execute( - _writeConcern?: WriteConcern, - options?: BulkWriteOptions, - callback?: Callback - ): Promise | void { + execute(options?: BulkWriteOptions, callback?: Callback): Promise | void { if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; - if (typeof _writeConcern === 'function') { - callback = _writeConcern as Callback; - } else if (_writeConcern && typeof _writeConcern === 'object') { - this.s.writeConcern = _writeConcern; + const writeConcern = WriteConcern.fromOptions(options); + if (writeConcern) { + this.s.writeConcern = writeConcern; } if (this.s.executed) { diff --git a/src/collection.ts b/src/collection.ts index abc1ca6da32..63409148625 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -315,7 +315,7 @@ export class Collection implements OperationParent { return executeOperation( getTopology(this), - new InsertManyOperation(this, docs, options), + new InsertManyOperation(this, docs, resolveOptions(this, options)), callback ); } @@ -375,7 +375,7 @@ export class Collection implements OperationParent { return executeOperation( getTopology(this), - new BulkWriteOperation(this, operations, options), + new BulkWriteOperation(this, operations, resolveOptions(this, options)), callback ); } @@ -1312,12 +1312,12 @@ export class Collection implements OperationParent { /** Initiate an Out of order batch write operation. All operations will be buffered into insert/update/remove commands executed out of order. */ initializeUnorderedBulkOp(options?: BulkWriteOptions): any { - return new UnorderedBulkOperation(this, options ?? {}); + return new UnorderedBulkOperation(this, resolveOptions(this, options)); } /** Initiate an In order bulk write operation. Operations will be serially executed in the order they are added, creating a new operation for each switch in types. */ initializeOrderedBulkOp(options?: BulkWriteOptions): any { - return new OrderedBulkOperation(this, options ?? {}); + return new OrderedBulkOperation(this, resolveOptions(this, options)); } /** Get the db scoped logger */ diff --git a/src/operations/bulk_write.ts b/src/operations/bulk_write.ts index 6838f6fe5eb..e2c829fbbe0 100644 --- a/src/operations/bulk_write.ts +++ b/src/operations/bulk_write.ts @@ -1,7 +1,5 @@ -import { applyRetryableWrites, applyWriteConcern, Callback } from '../utils'; -import { OperationBase } from './operation'; -import { resolveBSONOptions } from '../bson'; -import { WriteConcern } from '../write_concern'; +import { applyRetryableWrites, Callback } from '../utils'; +import { Aspect, defineAspects, OperationBase } from './operation'; import type { Collection } from '../collection'; import type { BulkOperationBase, @@ -25,9 +23,6 @@ export class BulkWriteOperation extends OperationBase): void { @@ -50,15 +45,11 @@ export class BulkWriteOperation extends OperationBase { + bulk.execute(finalOptions, (err, r) => { // We have connection level error if (!r && err) { return callback(err); @@ -69,3 +60,5 @@ export class BulkWriteOperation extends OperationBase extends OperationBase { ns: MongoDBNamespace; - readPreference: ReadPreference; readConcern?: ReadConcern; writeConcern?: WriteConcern; explain: boolean; @@ -73,21 +70,13 @@ export abstract class CommandOperation< : new MongoDBNamespace('admin', '$cmd'); } - this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION) - ? ReadPreference.primary - : ReadPreference.fromOptions(options) ?? ReadPreference.primary; this.readConcern = ReadConcern.fromOptions(options); this.writeConcern = WriteConcern.fromOptions(options); - this.bsonOptions = resolveBSONOptions(options); this.explain = false; this.fullResponse = options && typeof options.fullResponse === 'boolean' ? options.fullResponse : false; - // TODO: A lot of our code depends on having the read preference in the options. This should - // go away, but also requires massive test rewrites. - this.options.readPreference = this.readPreference; - // TODO(NODE-2056): make logger another "inheritable" property if (parent && parent.logger) { this.logger = parent.logger; diff --git a/src/operations/common_functions.ts b/src/operations/common_functions.ts index 69b1befc0d4..3b24a996a06 100644 --- a/src/operations/common_functions.ts +++ b/src/operations/common_functions.ts @@ -1,11 +1,5 @@ import { MongoError } from '../error'; -import { - applyRetryableWrites, - applyWriteConcern, - decorateWithCollation, - Callback, - getTopology -} from '../utils'; +import { applyRetryableWrites, decorateWithCollation, Callback, getTopology } from '../utils'; import type { Document } from '../bson'; import type { Db } from '../db'; import type { ClientSession } from '../sessions'; @@ -136,10 +130,9 @@ export function removeDocuments( // Create an empty options object if the provided one is null options = options || {}; - // Final options for retryable writes and write concern + // Final options for retryable writes let finalOptions = Object.assign({}, options); finalOptions = applyRetryableWrites(finalOptions, coll.s.db); - finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options); // If selector is null set empty if (selector == null) selector = {}; @@ -218,16 +211,9 @@ export function updateDocuments( if (document == null || typeof document !== 'object') return callback(new TypeError('document must be a valid JavaScript object')); - // Final options for retryable writes and write concern + // Final options for retryable writes let finalOptions = Object.assign({}, options); finalOptions = applyRetryableWrites(finalOptions, coll.s.db); - finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options); - - // Do we return the actual result document - // Either use override on the function, or go back to default on either the collection - // level or db - finalOptions.serializeFunctions = - options.serializeFunctions || coll.bsonOptions.serializeFunctions; // Execute the operation const op: Document = { q: selector, u: document }; diff --git a/src/operations/find_and_modify.ts b/src/operations/find_and_modify.ts index 58c21856997..05794c64d69 100644 --- a/src/operations/find_and_modify.ts +++ b/src/operations/find_and_modify.ts @@ -3,7 +3,6 @@ import { maxWireVersion, applyRetryableWrites, decorateWithCollation, - applyWriteConcern, hasAtomicOperators, Callback } from '../utils'; @@ -107,9 +106,8 @@ export class FindAndModifyOperation extends CommandOperation): void { @@ -73,3 +70,5 @@ function mapInsertManyResults(docs: Document[], r: BulkWriteResult): InsertManyR return finalResult; } + +defineAspects(InsertManyOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/operation.ts b/src/operations/operation.ts index f34b162fb75..c11ff4d846b 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -1,6 +1,6 @@ -import { ReadPreference } from '../read_preference'; +import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { ClientSession } from '../sessions'; -import type { Document, BSONSerializeOptions } from '../bson'; +import { Document, BSONSerializeOptions, resolveBSONOptions } from '../bson'; import type { MongoDBNamespace, Callback } from '../utils'; import type { Server } from '../sdam/server'; @@ -24,6 +24,9 @@ export interface OperationOptions extends BSONSerializeOptions { explain?: boolean; willRetryWrites?: boolean; + + /** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */ + readPreference?: ReadPreferenceLike; } /** @@ -49,7 +52,16 @@ export abstract class OperationBase< constructor(options: T = {} as T) { this.options = Object.assign({}, options); - this.readPreference = ReadPreference.primary; + + this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION) + ? ReadPreference.primary + : ReadPreference.fromOptions(options) ?? ReadPreference.primary; + // TODO: A lot of our code depends on having the read preference in the options. This should + // go away, but also requires massive test rewrites. + this.options.readPreference = this.readPreference; + + // Pull the BSON serialize options from the already-resolved options + this.bsonOptions = resolveBSONOptions(options); } abstract execute(server: Server, callback: Callback): void;