From 4638e0ab566af59d59a224d8c3827294c20dc51a Mon Sep 17 00:00:00 2001 From: Hana Pearlman Date: Mon, 9 Nov 2020 15:04:09 -0500 Subject: [PATCH 1/6] use resolveOptions with bulk ops --- src/bulk/common.ts | 25 ++++++++++++------------- src/collection.ts | 8 ++++---- src/operations/bulk_write.ts | 9 +++------ src/operations/common_functions.ts | 14 +++----------- src/operations/find_and_modify.ts | 4 +--- src/operations/insert.ts | 5 ++--- src/operations/insert_many.ts | 4 ++-- 7 files changed, 27 insertions(+), 42 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index e4da3762656..83eb2633150 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -2,14 +2,14 @@ import { PromiseProvider } from '../promise_provider'; import { Long, ObjectId, Document, BSONSerializeOptions, resolveBSONOptions } from '../bson'; import { MongoError, MongoWriteConcernError, AnyError } from '../error'; import { - applyWriteConcern, applyRetryableWrites, executeLegacyOperation, hasAtomicOperators, Callback, MongoDBNamespace, maxWireVersion, - getTopology + getTopology, + resolveOptions } from '../utils'; import { executeOperation } from '../operations/execute_operation'; import { InsertOperation } from '../operations/insert'; @@ -571,14 +571,10 @@ function executeCommands( executeCommands(bulkOperation, options, callback); } - const finalOptions = Object.assign( - { ordered: bulkOperation.isOrdered }, - bulkOperation.bsonOptions, - options - ); - if (bulkOperation.s.writeConcern != null) { - finalOptions.writeConcern = bulkOperation.s.writeConcern; - } + const finalOptions = resolveOptions(bulkOperation, { + ordered: bulkOperation.isOrdered, + ...options + }); if (finalOptions.bypassDocumentValidation !== true) { delete finalOptions.bypassDocumentValidation; @@ -935,10 +931,9 @@ export abstract class BulkOperationBase { // + 1 bytes for null terminator const maxKeySize = (maxWriteBatchSize - 1).toString(10).length + 2; - // Final options for retryable writes and write concern + // Final options for retryable writes let finalOptions = Object.assign({}, options); finalOptions = applyRetryableWrites(finalOptions, collection.s.db); - finalOptions = applyWriteConcern(finalOptions, { collection: collection }, options); // Final results const bulkResult: BulkResult = { @@ -983,7 +978,7 @@ export abstract class BulkOperationBase { // Options options: finalOptions, // BSON options - bsonOptions: resolveBSONOptions(options, collection), + bsonOptions: resolveBSONOptions(options), // Current operation currentOp, // Executed @@ -1169,6 +1164,10 @@ export abstract class BulkOperationBase { return this.s.bsonOptions; } + get writeConcern(): WriteConcern | undefined { + return this.s.writeConcern; + } + /** An internal helper method. Do not invoke directly. Will be going away in the future */ execute( _writeConcern?: WriteConcern, diff --git a/src/collection.ts b/src/collection.ts index abc1ca6da32..63409148625 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -315,7 +315,7 @@ export class Collection implements OperationParent { return executeOperation( getTopology(this), - new InsertManyOperation(this, docs, options), + new InsertManyOperation(this, docs, resolveOptions(this, options)), callback ); } @@ -375,7 +375,7 @@ export class Collection implements OperationParent { return executeOperation( getTopology(this), - new BulkWriteOperation(this, operations, options), + new BulkWriteOperation(this, operations, resolveOptions(this, options)), callback ); } @@ -1312,12 +1312,12 @@ export class Collection implements OperationParent { /** Initiate an Out of order batch write operation. All operations will be buffered into insert/update/remove commands executed out of order. */ initializeUnorderedBulkOp(options?: BulkWriteOptions): any { - return new UnorderedBulkOperation(this, options ?? {}); + return new UnorderedBulkOperation(this, resolveOptions(this, options)); } /** Initiate an In order bulk write operation. Operations will be serially executed in the order they are added, creating a new operation for each switch in types. */ initializeOrderedBulkOp(options?: BulkWriteOptions): any { - return new OrderedBulkOperation(this, options ?? {}); + return new OrderedBulkOperation(this, resolveOptions(this, options)); } /** Get the db scoped logger */ diff --git a/src/operations/bulk_write.ts b/src/operations/bulk_write.ts index 6838f6fe5eb..1a9798362e6 100644 --- a/src/operations/bulk_write.ts +++ b/src/operations/bulk_write.ts @@ -1,4 +1,4 @@ -import { applyRetryableWrites, applyWriteConcern, Callback } from '../utils'; +import { applyRetryableWrites, Callback } from '../utils'; import { OperationBase } from './operation'; import { resolveBSONOptions } from '../bson'; import { WriteConcern } from '../write_concern'; @@ -26,8 +26,8 @@ export class BulkWriteOperation extends OperationBase): void { @@ -50,11 +50,8 @@ export class BulkWriteOperation extends OperationBase): void { From 46e09ae470732ad481a4869699fee2f623e65bfe Mon Sep 17 00:00:00 2001 From: Hana Pearlman Date: Mon, 9 Nov 2020 16:23:30 -0500 Subject: [PATCH 2/6] delete leftover serialize --- src/operations/common_functions.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/operations/common_functions.ts b/src/operations/common_functions.ts index 93fab69056b..3b24a996a06 100644 --- a/src/operations/common_functions.ts +++ b/src/operations/common_functions.ts @@ -215,12 +215,6 @@ export function updateDocuments( let finalOptions = Object.assign({}, options); finalOptions = applyRetryableWrites(finalOptions, coll.s.db); - // Do we return the actual result document - // Either use override on the function, or go back to default on either the collection - // level or db - finalOptions.serializeFunctions = - options.serializeFunctions || coll.bsonOptions.serializeFunctions; - // Execute the operation const op: Document = { q: selector, u: document }; op.upsert = options.upsert !== void 0 ? !!options.upsert : false; From c8af9897020e483ef76da165a3eeb6e213b674e5 Mon Sep 17 00:00:00 2001 From: Hana Pearlman Date: Mon, 9 Nov 2020 16:23:44 -0500 Subject: [PATCH 3/6] move bson resolve to operation base --- src/operations/bulk_write.ts | 4 ---- src/operations/command.ts | 3 +-- src/operations/insert_many.ts | 5 +---- src/operations/operation.ts | 5 ++++- 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/operations/bulk_write.ts b/src/operations/bulk_write.ts index 1a9798362e6..9bbeb9316cd 100644 --- a/src/operations/bulk_write.ts +++ b/src/operations/bulk_write.ts @@ -1,6 +1,5 @@ import { applyRetryableWrites, Callback } from '../utils'; import { OperationBase } from './operation'; -import { resolveBSONOptions } from '../bson'; import { WriteConcern } from '../write_concern'; import type { Collection } from '../collection'; import type { @@ -25,9 +24,6 @@ export class BulkWriteOperation extends OperationBase): void { diff --git a/src/operations/command.ts b/src/operations/command.ts index 0d89485c680..95fbeb011f6 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -7,7 +7,7 @@ import { commandSupportsReadConcern } from '../sessions'; import { MongoError } from '../error'; import type { Logger } from '../logger'; import type { Server } from '../sdam/server'; -import { BSONSerializeOptions, Document, resolveBSONOptions } from '../bson'; +import type { BSONSerializeOptions, Document } from '../bson'; import type { CollationOptions } from '../cmap/wire_protocol/write_command'; import type { ReadConcernLike } from './../read_concern'; @@ -78,7 +78,6 @@ export abstract class CommandOperation< : ReadPreference.fromOptions(options) ?? ReadPreference.primary; this.readConcern = ReadConcern.fromOptions(options); this.writeConcern = WriteConcern.fromOptions(options); - this.bsonOptions = resolveBSONOptions(options); this.explain = false; this.fullResponse = diff --git a/src/operations/insert_many.ts b/src/operations/insert_many.ts index ecea6194faa..7f0481aac2f 100644 --- a/src/operations/insert_many.ts +++ b/src/operations/insert_many.ts @@ -4,7 +4,7 @@ import { MongoError } from '../error'; import { prepareDocs } from './common_functions'; import type { Callback } from '../utils'; import type { Collection } from '../collection'; -import { ObjectId, Document, resolveBSONOptions } from '../bson'; +import type { ObjectId, Document } from '../bson'; import type { BulkWriteResult, BulkWriteOptions } from '../bulk/common'; import type { Server } from '../sdam/server'; @@ -30,9 +30,6 @@ export class InsertManyOperation extends OperationBase): void { diff --git a/src/operations/operation.ts b/src/operations/operation.ts index f34b162fb75..e49c73a8c9b 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -1,6 +1,6 @@ import { ReadPreference } from '../read_preference'; import type { ClientSession } from '../sessions'; -import type { Document, BSONSerializeOptions } from '../bson'; +import { Document, BSONSerializeOptions, resolveBSONOptions } from '../bson'; import type { MongoDBNamespace, Callback } from '../utils'; import type { Server } from '../sdam/server'; @@ -50,6 +50,9 @@ export abstract class OperationBase< constructor(options: T = {} as T) { this.options = Object.assign({}, options); this.readPreference = ReadPreference.primary; + + // Pull the BSON serialize options from the already-resolved options + this.bsonOptions = resolveBSONOptions(options); } abstract execute(server: Server, callback: Callback): void; From 67e571b33b1473158da1f7f78afe5971dda38851 Mon Sep 17 00:00:00 2001 From: Hana Pearlman Date: Mon, 9 Nov 2020 17:13:46 -0500 Subject: [PATCH 4/6] bug fix for read preference on writes --- src/cmap/wire_protocol/write_command.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/cmap/wire_protocol/write_command.ts b/src/cmap/wire_protocol/write_command.ts index 18fbcecf3dd..4718e6de1ba 100644 --- a/src/cmap/wire_protocol/write_command.ts +++ b/src/cmap/wire_protocol/write_command.ts @@ -4,6 +4,7 @@ import { command, CommandOptions } from './command'; import type { Server } from '../../sdam/server'; import type { Document, BSONSerializeOptions } from '../../bson'; import type { WriteConcern } from '../../write_concern'; +import { ReadPreference } from '../../read_preference'; /** @public */ export interface CollationOptions { @@ -64,6 +65,12 @@ export function writeCommand( writeCommand.bypassDocumentValidation = options.bypassDocumentValidation; } + // A read preference may be present in the options, inherited from a transaction or parent. + // For write commands, we need to ensure it is ReadPreference.primary, so we override it here. + if (options.readPreference) { + options.readPreference = ReadPreference.primary; + } + const commandOptions = Object.assign( { checkKeys: type === 'insert', From d231eb9c731ac9675d2bd55430eca65429231044 Mon Sep 17 00:00:00 2001 From: Hana Pearlman Date: Wed, 11 Nov 2020 11:52:15 -0500 Subject: [PATCH 5/6] respond to comments --- src/bulk/common.ts | 18 +++++++----------- src/cmap/wire_protocol/write_command.ts | 7 ------- src/operations/bulk_write.ts | 8 ++++---- src/operations/command.ts | 12 +----------- src/operations/insert_many.ts | 4 +++- src/operations/operation.ts | 13 +++++++++++-- 6 files changed, 26 insertions(+), 36 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 83eb2633150..e3de8acc750 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -21,6 +21,7 @@ import type { Topology } from '../sdam/topology'; import type { CommandOperationOptions } from '../operations/command'; import type { CollationOptions } from '../cmap/wire_protocol/write_command'; import type { Hint } from '../operations/operation'; +import { ReadPreference } from '../read_preference'; // Error codes const WRITE_CONCERN_ERROR = 64; @@ -572,8 +573,8 @@ function executeCommands( } const finalOptions = resolveOptions(bulkOperation, { - ordered: bulkOperation.isOrdered, - ...options + ...options, + ordered: bulkOperation.isOrdered }); if (finalOptions.bypassDocumentValidation !== true) { @@ -1169,18 +1170,13 @@ export abstract class BulkOperationBase { } /** An internal helper method. Do not invoke directly. Will be going away in the future */ - execute( - _writeConcern?: WriteConcern, - options?: BulkWriteOptions, - callback?: Callback - ): Promise | void { + execute(options?: BulkWriteOptions, callback?: Callback): Promise | void { if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; - if (typeof _writeConcern === 'function') { - callback = _writeConcern as Callback; - } else if (_writeConcern && typeof _writeConcern === 'object') { - this.s.writeConcern = _writeConcern; + const writeConcern = WriteConcern.fromOptions(options); + if (writeConcern) { + this.s.writeConcern = writeConcern; } if (this.s.executed) { diff --git a/src/cmap/wire_protocol/write_command.ts b/src/cmap/wire_protocol/write_command.ts index 4718e6de1ba..18fbcecf3dd 100644 --- a/src/cmap/wire_protocol/write_command.ts +++ b/src/cmap/wire_protocol/write_command.ts @@ -4,7 +4,6 @@ import { command, CommandOptions } from './command'; import type { Server } from '../../sdam/server'; import type { Document, BSONSerializeOptions } from '../../bson'; import type { WriteConcern } from '../../write_concern'; -import { ReadPreference } from '../../read_preference'; /** @public */ export interface CollationOptions { @@ -65,12 +64,6 @@ export function writeCommand( writeCommand.bypassDocumentValidation = options.bypassDocumentValidation; } - // A read preference may be present in the options, inherited from a transaction or parent. - // For write commands, we need to ensure it is ReadPreference.primary, so we override it here. - if (options.readPreference) { - options.readPreference = ReadPreference.primary; - } - const commandOptions = Object.assign( { checkKeys: type === 'insert', diff --git a/src/operations/bulk_write.ts b/src/operations/bulk_write.ts index 9bbeb9316cd..e2c829fbbe0 100644 --- a/src/operations/bulk_write.ts +++ b/src/operations/bulk_write.ts @@ -1,6 +1,5 @@ import { applyRetryableWrites, Callback } from '../utils'; -import { OperationBase } from './operation'; -import { WriteConcern } from '../write_concern'; +import { Aspect, defineAspects, OperationBase } from './operation'; import type { Collection } from '../collection'; import type { BulkOperationBase, @@ -48,10 +47,9 @@ export class BulkWriteOperation extends OperationBase { + bulk.execute(finalOptions, (err, r) => { // We have connection level error if (!r && err) { return callback(err); @@ -62,3 +60,5 @@ export class BulkWriteOperation extends OperationBase extends OperationBase { ns: MongoDBNamespace; - readPreference: ReadPreference; readConcern?: ReadConcern; writeConcern?: WriteConcern; explain: boolean; @@ -73,9 +70,6 @@ export abstract class CommandOperation< : new MongoDBNamespace('admin', '$cmd'); } - this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION) - ? ReadPreference.primary - : ReadPreference.fromOptions(options) ?? ReadPreference.primary; this.readConcern = ReadConcern.fromOptions(options); this.writeConcern = WriteConcern.fromOptions(options); @@ -83,10 +77,6 @@ export abstract class CommandOperation< this.fullResponse = options && typeof options.fullResponse === 'boolean' ? options.fullResponse : false; - // TODO: A lot of our code depends on having the read preference in the options. This should - // go away, but also requires massive test rewrites. - this.options.readPreference = this.readPreference; - // TODO(NODE-2056): make logger another "inheritable" property if (parent && parent.logger) { this.logger = parent.logger; diff --git a/src/operations/insert_many.ts b/src/operations/insert_many.ts index 7f0481aac2f..b9ec116efe0 100644 --- a/src/operations/insert_many.ts +++ b/src/operations/insert_many.ts @@ -1,4 +1,4 @@ -import { OperationBase } from './operation'; +import { Aspect, defineAspects, OperationBase } from './operation'; import { BulkWriteOperation } from './bulk_write'; import { MongoError } from '../error'; import { prepareDocs } from './common_functions'; @@ -70,3 +70,5 @@ function mapInsertManyResults(docs: Document[], r: BulkWriteResult): InsertManyR return finalResult; } + +defineAspects(InsertManyOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/operation.ts b/src/operations/operation.ts index e49c73a8c9b..c11ff4d846b 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -1,4 +1,4 @@ -import { ReadPreference } from '../read_preference'; +import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { ClientSession } from '../sessions'; import { Document, BSONSerializeOptions, resolveBSONOptions } from '../bson'; import type { MongoDBNamespace, Callback } from '../utils'; @@ -24,6 +24,9 @@ export interface OperationOptions extends BSONSerializeOptions { explain?: boolean; willRetryWrites?: boolean; + + /** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */ + readPreference?: ReadPreferenceLike; } /** @@ -49,7 +52,13 @@ export abstract class OperationBase< constructor(options: T = {} as T) { this.options = Object.assign({}, options); - this.readPreference = ReadPreference.primary; + + this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION) + ? ReadPreference.primary + : ReadPreference.fromOptions(options) ?? ReadPreference.primary; + // TODO: A lot of our code depends on having the read preference in the options. This should + // go away, but also requires massive test rewrites. + this.options.readPreference = this.readPreference; // Pull the BSON serialize options from the already-resolved options this.bsonOptions = resolveBSONOptions(options); From 3e7b7d110c529444923ffe5c36f422a632999c36 Mon Sep 17 00:00:00 2001 From: Hana Pearlman Date: Wed, 11 Nov 2020 12:01:19 -0500 Subject: [PATCH 6/6] lint --- src/bulk/common.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index e3de8acc750..8d29804b07d 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -21,7 +21,6 @@ import type { Topology } from '../sdam/topology'; import type { CommandOperationOptions } from '../operations/command'; import type { CollationOptions } from '../cmap/wire_protocol/write_command'; import type { Hint } from '../operations/operation'; -import { ReadPreference } from '../read_preference'; // Error codes const WRITE_CONCERN_ERROR = 64;