Skip to content

refactor: consolidate options inheritance for bulk ops #2617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { PromiseProvider } from '../promise_provider';
import { Long, ObjectId, Document, BSONSerializeOptions, resolveBSONOptions } from '../bson';
import { MongoError, MongoWriteConcernError, AnyError } from '../error';
import {
applyWriteConcern,
applyRetryableWrites,
executeLegacyOperation,
hasAtomicOperators,
Callback,
MongoDBNamespace,
maxWireVersion,
getTopology
getTopology,
resolveOptions
} from '../utils';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
Expand Down Expand Up @@ -571,14 +571,10 @@ function executeCommands(
executeCommands(bulkOperation, options, callback);
}

const finalOptions = Object.assign(
{ ordered: bulkOperation.isOrdered },
bulkOperation.bsonOptions,
options
);
if (bulkOperation.s.writeConcern != null) {
finalOptions.writeConcern = bulkOperation.s.writeConcern;
}
const finalOptions = resolveOptions(bulkOperation, {
...options,
ordered: bulkOperation.isOrdered
});

if (finalOptions.bypassDocumentValidation !== true) {
delete finalOptions.bypassDocumentValidation;
Expand Down Expand Up @@ -935,10 +931,9 @@ export abstract class BulkOperationBase {
// + 1 bytes for null terminator
const maxKeySize = (maxWriteBatchSize - 1).toString(10).length + 2;

// Final options for retryable writes and write concern
// Final options for retryable writes
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, collection.s.db);
finalOptions = applyWriteConcern(finalOptions, { collection: collection }, options);

// Final results
const bulkResult: BulkResult = {
Expand Down Expand Up @@ -983,7 +978,7 @@ export abstract class BulkOperationBase {
// Options
options: finalOptions,
// BSON options
bsonOptions: resolveBSONOptions(options, collection),
bsonOptions: resolveBSONOptions(options),
// Current operation
currentOp,
// Executed
Expand Down Expand Up @@ -1169,19 +1164,18 @@ export abstract class BulkOperationBase {
return this.s.bsonOptions;
}

get writeConcern(): WriteConcern | undefined {
return this.s.writeConcern;
}

/** An internal helper method. Do not invoke directly. Will be going away in the future */
execute(
_writeConcern?: WriteConcern,
options?: BulkWriteOptions,
callback?: Callback<BulkWriteResult>
): Promise<void> | void {
execute(options?: BulkWriteOptions, callback?: Callback<BulkWriteResult>): Promise<void> | void {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

if (typeof _writeConcern === 'function') {
callback = _writeConcern as Callback;
} else if (_writeConcern && typeof _writeConcern === 'object') {
this.s.writeConcern = _writeConcern;
const writeConcern = WriteConcern.fromOptions(options);
if (writeConcern) {
this.s.writeConcern = writeConcern;
}

if (this.s.executed) {
Expand Down
8 changes: 4 additions & 4 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ export class Collection implements OperationParent {

return executeOperation(
getTopology(this),
new InsertManyOperation(this, docs, options),
new InsertManyOperation(this, docs, resolveOptions(this, options)),
callback
);
}
Expand Down Expand Up @@ -375,7 +375,7 @@ export class Collection implements OperationParent {

return executeOperation(
getTopology(this),
new BulkWriteOperation(this, operations, options),
new BulkWriteOperation(this, operations, resolveOptions(this, options)),
callback
);
}
Expand Down Expand Up @@ -1312,12 +1312,12 @@ export class Collection implements OperationParent {

/** Initiate an Out of order batch write operation. All operations will be buffered into insert/update/remove commands executed out of order. */
initializeUnorderedBulkOp(options?: BulkWriteOptions): any {
return new UnorderedBulkOperation(this, options ?? {});
return new UnorderedBulkOperation(this, resolveOptions(this, options));
}

/** Initiate an In order bulk write operation. Operations will be serially executed in the order they are added, creating a new operation for each switch in types. */
initializeOrderedBulkOp(options?: BulkWriteOptions): any {
return new OrderedBulkOperation(this, options ?? {});
return new OrderedBulkOperation(this, resolveOptions(this, options));
}

/** Get the db scoped logger */
Expand Down
17 changes: 5 additions & 12 deletions src/operations/bulk_write.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { applyRetryableWrites, applyWriteConcern, Callback } from '../utils';
import { OperationBase } from './operation';
import { resolveBSONOptions } from '../bson';
import { WriteConcern } from '../write_concern';
import { applyRetryableWrites, Callback } from '../utils';
import { Aspect, defineAspects, OperationBase } from './operation';
import type { Collection } from '../collection';
import type {
BulkOperationBase,
Expand All @@ -25,9 +23,6 @@ export class BulkWriteOperation extends OperationBase<BulkWriteOptions, BulkWrit

this.collection = collection;
this.operations = operations;

// Assign BSON serialize options to OperationBase, preferring options over collection options
this.bsonOptions = resolveBSONOptions(options, collection);
}

execute(server: Server, callback: Callback<BulkWriteResult>): void {
Expand All @@ -50,15 +45,11 @@ export class BulkWriteOperation extends OperationBase<BulkWriteOptions, BulkWrit
return callback(err);
}

// Final options for retryable writes and write concern
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);

const writeCon = WriteConcern.fromOptions(finalOptions);

// Execute the bulk
bulk.execute(writeCon, finalOptions, (err, r) => {
bulk.execute(finalOptions, (err, r) => {
// We have connection level error
if (!r && err) {
return callback(err);
Expand All @@ -69,3 +60,5 @@ export class BulkWriteOperation extends OperationBase<BulkWriteOptions, BulkWrit
});
}
}

defineAspects(BulkWriteOperation, [Aspect.WRITE_OPERATION]);
15 changes: 2 additions & 13 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { Aspect, OperationBase, OperationOptions } from './operation';
import { ReadConcern } from '../read_concern';
import { WriteConcern, WriteConcernOptions } from '../write_concern';
import { maxWireVersion, MongoDBNamespace, Callback } from '../utils';
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import type { ReadPreference } from '../read_preference';
import { commandSupportsReadConcern } from '../sessions';
import { MongoError } from '../error';
import type { Logger } from '../logger';
import type { Server } from '../sdam/server';
import { BSONSerializeOptions, Document, resolveBSONOptions } from '../bson';
import type { BSONSerializeOptions, Document } from '../bson';
import type { CollationOptions } from '../cmap/wire_protocol/write_command';
import type { ReadConcernLike } from './../read_concern';

Expand All @@ -19,8 +19,6 @@ export interface CommandOperationOptions extends OperationOptions, WriteConcernO
fullResponse?: boolean;
/** Specify a read concern and level for the collection. (only MongoDB 3.2 or higher supported) */
readConcern?: ReadConcernLike;
/** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
readPreference?: ReadPreferenceLike;
/** Collation */
collation?: CollationOptions;
maxTimeMS?: number;
Expand Down Expand Up @@ -51,7 +49,6 @@ export abstract class CommandOperation<
TResult = Document
> extends OperationBase<T> {
ns: MongoDBNamespace;
readPreference: ReadPreference;
readConcern?: ReadConcern;
writeConcern?: WriteConcern;
explain: boolean;
Expand All @@ -73,21 +70,13 @@ export abstract class CommandOperation<
: new MongoDBNamespace('admin', '$cmd');
}

this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION)
? ReadPreference.primary
: ReadPreference.fromOptions(options) ?? ReadPreference.primary;
this.readConcern = ReadConcern.fromOptions(options);
this.writeConcern = WriteConcern.fromOptions(options);
this.bsonOptions = resolveBSONOptions(options);

this.explain = false;
this.fullResponse =
options && typeof options.fullResponse === 'boolean' ? options.fullResponse : false;

// TODO: A lot of our code depends on having the read preference in the options. This should
// go away, but also requires massive test rewrites.
this.options.readPreference = this.readPreference;

// TODO(NODE-2056): make logger another "inheritable" property
if (parent && parent.logger) {
this.logger = parent.logger;
Expand Down
20 changes: 3 additions & 17 deletions src/operations/common_functions.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import { MongoError } from '../error';
import {
applyRetryableWrites,
applyWriteConcern,
decorateWithCollation,
Callback,
getTopology
} from '../utils';
import { applyRetryableWrites, decorateWithCollation, Callback, getTopology } from '../utils';
import type { Document } from '../bson';
import type { Db } from '../db';
import type { ClientSession } from '../sessions';
Expand Down Expand Up @@ -136,10 +130,9 @@ export function removeDocuments(
// Create an empty options object if the provided one is null
options = options || {};

// Final options for retryable writes and write concern
// Final options for retryable writes
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);

// If selector is null set empty
if (selector == null) selector = {};
Expand Down Expand Up @@ -218,16 +211,9 @@ export function updateDocuments(
if (document == null || typeof document !== 'object')
return callback(new TypeError('document must be a valid JavaScript object'));

// Final options for retryable writes and write concern
// Final options for retryable writes
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);

// Do we return the actual result document
// Either use override on the function, or go back to default on either the collection
// level or db
finalOptions.serializeFunctions =
options.serializeFunctions || coll.bsonOptions.serializeFunctions;

// Execute the operation
const op: Document = { q: selector, u: document };
Expand Down
4 changes: 1 addition & 3 deletions src/operations/find_and_modify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
maxWireVersion,
applyRetryableWrites,
decorateWithCollation,
applyWriteConcern,
hasAtomicOperators,
Callback
} from '../utils';
Expand Down Expand Up @@ -107,9 +106,8 @@ export class FindAndModifyOperation extends CommandOperation<FindAndModifyOption
// No check on the documents
options.checkKeys = false;

// Final options for retryable writes and write concern
// Final options for retryable writes
options = applyRetryableWrites(options, coll.s.db);
options = applyWriteConcern(options, { db: coll.s.db, collection: coll }, options);

// Decorate the findAndModify command with the write Concern
if (options.writeConcern) {
Expand Down
5 changes: 2 additions & 3 deletions src/operations/insert.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { MongoError } from '../error';
import { defineAspects, Aspect, OperationBase } from './operation';
import { CommandOperation } from './command';
import { applyRetryableWrites, applyWriteConcern, Callback, MongoDBNamespace } from '../utils';
import { applyRetryableWrites, Callback, MongoDBNamespace } from '../utils';
import { prepareDocs } from './common_functions';
import type { Server } from '../sdam/server';
import type { Collection } from '../collection';
Expand Down Expand Up @@ -98,10 +98,9 @@ function insertDocuments(
// Ensure we are operating on an array op docs
docs = Array.isArray(docs) ? docs : [docs];

// Final options for retryable writes and write concern
// Final options for retryable writes
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);

// If keep going set unordered
if (finalOptions.keepGoing === true) finalOptions.ordered = false;
Expand Down
9 changes: 4 additions & 5 deletions src/operations/insert_many.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { OperationBase } from './operation';
import { Aspect, defineAspects, OperationBase } from './operation';
import { BulkWriteOperation } from './bulk_write';
import { MongoError } from '../error';
import { prepareDocs } from './common_functions';
import type { Callback } from '../utils';
import type { Collection } from '../collection';
import { ObjectId, Document, resolveBSONOptions } from '../bson';
import type { ObjectId, Document } from '../bson';
import type { BulkWriteResult, BulkWriteOptions } from '../bulk/common';
import type { Server } from '../sdam/server';

Expand All @@ -30,9 +30,6 @@ export class InsertManyOperation extends OperationBase<BulkWriteOptions, InsertM

this.collection = collection;
this.docs = docs;

// Assign BSON serialize options to OperationBase, preferring options over collection options
this.bsonOptions = resolveBSONOptions(options, collection);
}

execute(server: Server, callback: Callback<InsertManyResult>): void {
Expand Down Expand Up @@ -73,3 +70,5 @@ function mapInsertManyResults(docs: Document[], r: BulkWriteResult): InsertManyR

return finalResult;
}

defineAspects(InsertManyOperation, [Aspect.WRITE_OPERATION]);
18 changes: 15 additions & 3 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ReadPreference } from '../read_preference';
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import type { Document, BSONSerializeOptions } from '../bson';
import { Document, BSONSerializeOptions, resolveBSONOptions } from '../bson';
import type { MongoDBNamespace, Callback } from '../utils';
import type { Server } from '../sdam/server';

Expand All @@ -24,6 +24,9 @@ export interface OperationOptions extends BSONSerializeOptions {

explain?: boolean;
willRetryWrites?: boolean;

/** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
readPreference?: ReadPreferenceLike;
}

/**
Expand All @@ -49,7 +52,16 @@ export abstract class OperationBase<

constructor(options: T = {} as T) {
this.options = Object.assign({}, options);
this.readPreference = ReadPreference.primary;

this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION)
? ReadPreference.primary
: ReadPreference.fromOptions(options) ?? ReadPreference.primary;
// TODO: A lot of our code depends on having the read preference in the options. This should
// go away, but also requires massive test rewrites.
this.options.readPreference = this.readPreference;

// Pull the BSON serialize options from the already-resolved options
this.bsonOptions = resolveBSONOptions(options);
}

abstract execute(server: Server, callback: Callback<TResult>): void;
Expand Down