Skip to content

Commit 1c3defe

Browse files
committed
feat(NODE-3083): support aggregate writes on secondaries
1 parent 699b6c8 commit 1c3defe

File tree

8 files changed

+176
-8
lines changed

8 files changed

+176
-8
lines changed

src/operations/aggregate.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { CommandOperation, CommandOperationOptions, CollationOptions } from './command';
2-
import { ReadPreference } from '../read_preference';
32
import { MongoInvalidArgumentError } from '../error';
43
import { maxWireVersion, MongoDBNamespace } from '../utils';
54
import { Aspect, defineAspects, Hint } from './operation';
@@ -65,7 +64,7 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
6564
}
6665

6766
if (this.hasWriteStage) {
68-
this.readPreference = ReadPreference.primary;
67+
this.trySecondaryWrite = true;
6968
}
7069

7170
if (this.explain && this.writeConcern) {

src/operations/execute_operation.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type { Topology } from '../sdam/topology';
1717
import type { ClientSession } from '../sessions';
1818
import type { Document } from '../bson';
1919
import { supportsRetryableWrites } from '../utils';
20+
import { secondaryWritableServerSelector } from '../sdam/server_selection';
2021

2122
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
2223
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -181,8 +182,18 @@ function executeWithServerSelection(
181182
return;
182183
}
183184

185+
let selector;
186+
187+
// If operation should try to write to secondary use the custom server selector
188+
// otherwise provide the read preference.
189+
if (operation.trySecondaryWrite) {
190+
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
191+
} else {
192+
selector = readPreference;
193+
}
194+
184195
// select a new server, and attempt to retry the operation
185-
topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => {
196+
topology.selectServer(selector, serverSelectionOptions, (e?: any, server?: any) => {
186197
if (
187198
e ||
188199
(operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||

src/operations/operation.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export abstract class AbstractOperation<TResult = any> {
4949
readPreference: ReadPreference;
5050
server!: Server;
5151
bypassPinningCheck: boolean;
52+
trySecondaryWrite = false;
5253

5354
// BSON serialization options
5455
bsonOptions?: BSONSerializeOptions;

src/sdam/server_selection.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import type { ServerDescription, TagSet } from './server_description';
88
const IDLE_WRITE_PERIOD = 10000;
99
const SMALLEST_MAX_STALENESS_SECONDS = 90;
1010

11+
// Minimum version to try writes on secondaries.
12+
export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
13+
1114
/** @public */
1215
export type ServerSelector = (
1316
topologyDescription: TopologyDescription,
@@ -28,6 +31,24 @@ export function writableServerSelector(): ServerSelector {
2831
);
2932
}
3033

34+
/**
35+
* Returns a server selector that uses a read preference to select a
36+
* server potentially for a write on a secondary.
37+
*/
38+
export function secondaryWritableServerSelector(
39+
wireVersion?: number,
40+
readPreference?: ReadPreference
41+
): ServerSelector {
42+
// If server version < 5.0, read preference always primary.
43+
// If server version >= 5.0...
44+
// - If read preference is supplied, use that.
45+
// - If no read preference is supplied, use primary.
46+
if (!readPreference || (wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)) {
47+
return readPreferenceServerSelector(ReadPreference.primary);
48+
}
49+
return readPreferenceServerSelector(readPreference);
50+
}
51+
3152
/**
3253
* Reduces the passed in array of servers by the rules of the "Max Staleness" specification
3354
* found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst

src/sdam/topology.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
797797
return result;
798798
}
799799

800+
get commonWireVersion(): number | undefined {
801+
return this.description.commonWireVersion;
802+
}
803+
800804
get logicalSessionTimeoutMinutes(): number | undefined {
801805
return this.description.logicalSessionTimeoutMinutes;
802806
}

test/functional/crud_spec.test.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,15 +424,11 @@ describe('CRUD spec v1', function () {
424424
}
425425
});
426426

427-
// TODO: Unskip when implementing NODE-3083.
428-
const SKIP = ['aggregate-write-readPreference', 'db-aggregate-write-readPreference'];
429-
430427
describe('CRUD unified', function () {
431428
for (const crudSpecTest of loadSpecTests('crud/unified')) {
432429
expect(crudSpecTest).to.exist;
433430
const testDescription = String(crudSpecTest.description);
434-
const spec = SKIP.includes(testDescription) ? context.skip : context;
435-
spec(testDescription, function () {
431+
context(testDescription, function () {
436432
for (const test of crudSpecTest.tests) {
437433
it(String(test.description), {
438434
metadata: { sessions: { skipLeakTests: true } },
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
'use strict';
2+
3+
const { expect } = require('chai');
4+
const { AggregateOperation } = require('../../../src/operations/aggregate');
5+
6+
describe('AggregateOperation', function () {
7+
const db = 'test';
8+
9+
describe('#constructor', function () {
10+
context('when out is in the options', function () {
11+
const operation = new AggregateOperation(db, [], { out: 'test', dbName: db });
12+
13+
it('sets trySecondaryWrite to true', function () {
14+
expect(operation.trySecondaryWrite).to.be.true;
15+
});
16+
});
17+
18+
context('when $out is the last stage', function () {
19+
const operation = new AggregateOperation(db, [{ $out: 'test' }], { dbName: db });
20+
21+
it('sets trySecondaryWrite to true', function () {
22+
expect(operation.trySecondaryWrite).to.be.true;
23+
});
24+
});
25+
26+
context('when $merge is the last stage', function () {
27+
const operation = new AggregateOperation(db, [{ $merge: { into: 'test' } }], { dbName: db });
28+
29+
it('sets trySecondaryWrite to true', function () {
30+
expect(operation.trySecondaryWrite).to.be.true;
31+
});
32+
});
33+
34+
context('when no writable stages', function () {
35+
const operation = new AggregateOperation(db, [], { dbName: db });
36+
37+
it('sets trySecondaryWrite to false', function () {
38+
expect(operation.trySecondaryWrite).to.be.false;
39+
});
40+
});
41+
});
42+
});
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
'use strict';
2+
3+
const { expect } = require('chai');
4+
const { ObjectId } = require('../../../src/bson');
5+
const { ReadPreference } = require('../../../src/read_preference');
6+
const {
7+
secondaryWritableServerSelector,
8+
MIN_SECONDARY_WRITE_WIRE_VERSION
9+
} = require('../../../src/sdam/server_selection');
10+
const { ServerDescription } = require('../../../src/sdam/server_description');
11+
const { TopologyDescription } = require('../../../src/sdam/topology_description');
12+
const { TopologyType } = require('../../../src/sdam/common');
13+
14+
describe('ServerSelector', function () {
15+
describe('#secondaryWritableServerSelector', function () {
16+
const primary = new ServerDescription('127.0.0.1:27017', {
17+
setName: 'test',
18+
isWritablePrimary: true,
19+
ok: 1
20+
});
21+
const secondary = new ServerDescription('127.0.0.1:27018', {
22+
setName: 'test',
23+
secondary: true,
24+
ok: 1
25+
});
26+
const serverDescriptions = new Map();
27+
serverDescriptions.set('127.0.0.1:27017', primary);
28+
serverDescriptions.set('127.0.0.1:27018', secondary);
29+
30+
context('when the common server version is >= 5.0', function () {
31+
const topologyDescription = new TopologyDescription(
32+
TopologyType.ReplicaSetWithPrimary,
33+
serverDescriptions,
34+
'test',
35+
MIN_SECONDARY_WRITE_WIRE_VERSION,
36+
new ObjectId(),
37+
MIN_SECONDARY_WRITE_WIRE_VERSION
38+
);
39+
40+
context('when a read preference is provided', function () {
41+
const selector = secondaryWritableServerSelector(
42+
MIN_SECONDARY_WRITE_WIRE_VERSION,
43+
ReadPreference.secondary
44+
);
45+
const server = selector(topologyDescription, Array.from(serverDescriptions.values()));
46+
47+
it('uses the provided read preference', function () {
48+
expect(server).to.deep.equal([secondary]);
49+
});
50+
});
51+
52+
context('when a read preference is not provided', function () {
53+
const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION);
54+
const server = selector(topologyDescription, Array.from(serverDescriptions.values()));
55+
56+
it('selects a primary', function () {
57+
expect(server).to.deep.equal([primary]);
58+
});
59+
});
60+
});
61+
62+
context('when the common server version is < 5.0', function () {
63+
const topologyDescription = new TopologyDescription(
64+
TopologyType.ReplicaSetWithPrimary,
65+
serverDescriptions,
66+
'test',
67+
MIN_SECONDARY_WRITE_WIRE_VERSION - 1,
68+
new ObjectId(),
69+
MIN_SECONDARY_WRITE_WIRE_VERSION - 1
70+
);
71+
72+
context('when a read preference is provided', function () {
73+
const selector = secondaryWritableServerSelector(
74+
MIN_SECONDARY_WRITE_WIRE_VERSION - 1,
75+
ReadPreference.secondary
76+
);
77+
const server = selector(topologyDescription, Array.from(serverDescriptions.values()));
78+
79+
it('selects a primary', function () {
80+
expect(server).to.deep.equal([primary]);
81+
});
82+
});
83+
84+
context('when read preference is not provided', function () {
85+
const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1);
86+
const server = selector(topologyDescription, Array.from(serverDescriptions.values()));
87+
88+
it('selects a primary', function () {
89+
expect(server).to.deep.equal([primary]);
90+
});
91+
});
92+
});
93+
});
94+
});

0 commit comments

Comments
 (0)