Skip to content

Commit 1f855a4

Browse files
committed
fix: use async interruptable interval for server monitoring
The existing implementation of monitoring check scheduling is prone to failure during high load due to an error recording the last check time. Refactoring to use the newly introduced async interruptable interval timer eliminates this bug, as well as optimizes in cases where multiple simultaneous requests were not debounced. NODE-2643
1 parent 9e12cd5 commit 1f855a4

File tree

3 files changed

+51
-60
lines changed

3 files changed

+51
-60
lines changed

lib/core/sdam/monitor.js

Lines changed: 48 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const Connection = require('../../cmap/connection').Connection;
88
const common = require('./common');
99
const makeStateMachine = require('../utils').makeStateMachine;
1010
const MongoError = require('../error').MongoError;
11+
const makeInterruptableAsyncInterval = require('../../utils').makeInterruptableAsyncInterval;
1112

1213
const sdamEvents = require('./events');
1314
const ServerHeartbeatStartedEvent = sdamEvents.ServerHeartbeatStartedEvent;
@@ -18,7 +19,6 @@ const kServer = Symbol('server');
1819
const kMonitorId = Symbol('monitorId');
1920
const kConnection = Symbol('connection');
2021
const kCancellationToken = Symbol('cancellationToken');
21-
const kLastCheckTime = Symbol('lastCheckTime');
2222

2323
const STATE_CLOSED = common.STATE_CLOSED;
2424
const STATE_CLOSING = common.STATE_CLOSING;
@@ -33,6 +33,10 @@ const stateTransition = makeStateMachine({
3333

3434
const INVALID_REQUEST_CHECK_STATES = new Set([STATE_CLOSING, STATE_CLOSED, STATE_MONITORING]);
3535

36+
function isInCloseState(monitor) {
37+
return monitor.s.state === STATE_CLOSED || monitor.s.state === STATE_CLOSING;
38+
}
39+
3640
class Monitor extends EventEmitter {
3741
constructor(server, options) {
3842
super(options);
@@ -41,6 +45,7 @@ class Monitor extends EventEmitter {
4145
this[kConnection] = undefined;
4246
this[kCancellationToken] = new EventEmitter();
4347
this[kCancellationToken].setMaxListeners(Infinity);
48+
this[kMonitorId] = null;
4449
this.s = {
4550
state: STATE_CLOSED
4651
};
@@ -89,39 +94,34 @@ class Monitor extends EventEmitter {
8994
return;
9095
}
9196

92-
monitorServer(this);
97+
// start
98+
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
99+
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
100+
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
101+
interval: heartbeatFrequencyMS,
102+
minInterval: minHeartbeatFrequencyMS,
103+
immediate: true
104+
});
93105
}
94106

95107
requestCheck() {
96108
if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
97109
return;
98110
}
99111

100-
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
101-
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
102-
const remainingTime = heartbeatFrequencyMS - calculateDurationInMs(this[kLastCheckTime]);
103-
if (remainingTime > minHeartbeatFrequencyMS && this[kMonitorId]) {
104-
clearTimeout(this[kMonitorId]);
105-
rescheduleMonitoring(this, minHeartbeatFrequencyMS);
106-
return;
107-
}
108-
109-
if (this[kMonitorId]) {
110-
clearTimeout(this[kMonitorId]);
111-
}
112-
113-
monitorServer(this);
112+
this[kMonitorId].wake();
114113
}
115114

116115
close() {
117-
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
116+
if (isInCloseState(this)) {
118117
return;
119118
}
120119

121120
stateTransition(this, STATE_CLOSING);
122121
this[kCancellationToken].emit('cancel');
123122
if (this[kMonitorId]) {
124-
clearTimeout(this[kMonitorId]);
123+
this[kMonitorId].stop();
124+
this[kMonitorId] = null;
125125
}
126126

127127
if (this[kConnection]) {
@@ -186,7 +186,7 @@ function checkServer(monitor, callback) {
186186
return;
187187
}
188188

189-
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) {
189+
if (isInCloseState(monitor)) {
190190
conn.destroy({ force: true });
191191
failureHandler(new MongoError('monitor was destroyed'));
192192
return;
@@ -198,52 +198,44 @@ function checkServer(monitor, callback) {
198198
}
199199

200200
function monitorServer(monitor) {
201-
stateTransition(monitor, STATE_MONITORING);
202-
203-
// TODO: the next line is a legacy event, remove in v4
204-
process.nextTick(() => monitor.emit('monitoring', monitor[kServer]));
201+
return callback => {
202+
stateTransition(monitor, STATE_MONITORING);
203+
function done() {
204+
if (!isInCloseState(monitor)) {
205+
stateTransition(monitor, STATE_IDLE);
206+
}
205207

206-
checkServer(monitor, e0 => {
207-
if (e0 == null) {
208-
rescheduleMonitoring(monitor);
209-
return;
208+
callback();
210209
}
211210

212-
// otherwise an error occured on initial discovery, also bail
213-
if (monitor[kServer].description.type === ServerType.Unknown) {
214-
monitor.emit('resetServer', e0);
215-
rescheduleMonitoring(monitor);
216-
return;
217-
}
211+
// TODO: the next line is a legacy event, remove in v4
212+
process.nextTick(() => monitor.emit('monitoring', monitor[kServer]));
218213

219-
// According to the SDAM specification's "Network error during server check" section, if
220-
// an ismaster call fails we reset the server's pool. If a server was once connected,
221-
// change its type to `Unknown` only after retrying once.
222-
monitor.emit('resetConnectionPool');
223-
224-
checkServer(monitor, e1 => {
225-
if (e1) {
226-
monitor.emit('resetServer', e1);
214+
checkServer(monitor, e0 => {
215+
if (e0 == null) {
216+
return done();
227217
}
228218

229-
rescheduleMonitoring(monitor);
230-
});
231-
});
232-
}
219+
// otherwise an error occured on initial discovery, also bail
220+
if (monitor[kServer].description.type === ServerType.Unknown) {
221+
monitor.emit('resetServer', e0);
222+
return done();
223+
}
233224

234-
function rescheduleMonitoring(monitor, ms) {
235-
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
236-
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) {
237-
return;
238-
}
225+
// According to the SDAM specification's "Network error during server check" section, if
226+
// an ismaster call fails we reset the server's pool. If a server was once connected,
227+
// change its type to `Unknown` only after retrying once.
228+
monitor.emit('resetConnectionPool');
239229

240-
stateTransition(monitor, STATE_IDLE);
230+
checkServer(monitor, e1 => {
231+
if (e1) {
232+
monitor.emit('resetServer', e1);
233+
}
241234

242-
monitor[kLastCheckTime] = process.hrtime();
243-
monitor[kMonitorId] = setTimeout(() => {
244-
monitor[kMonitorId] = undefined;
245-
monitor.requestCheck();
246-
}, ms || heartbeatFrequencyMS);
235+
done();
236+
});
237+
});
238+
};
247239
}
248240

249241
module.exports = {

test/unit/sdam/server_selection/select_servers.test.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ describe('selectServer', function() {
8585
let completed = 0;
8686
function finish() {
8787
completed++;
88-
console.log(completed);
8988
if (completed === toSelect) done();
9089
}
9190

test/unit/utils.test.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ describe('utils', function() {
3737
});
3838

3939
context('makeInterruptableAsyncInterval', function() {
40-
const roundToNearestMultipleOfTen = x => Math.floor(x / 10) * 10;
40+
const roundToNearestMultipleOfTen = x => Math.round(x / 10) * 10;
4141

4242
it('should execute a method in an repeating interval', function(done) {
4343
let lastTime = now();
@@ -76,7 +76,7 @@ describe('utils', function() {
7676

7777
setTimeout(() => {
7878
const roundedMarks = marks.map(roundToNearestMultipleOfTen);
79-
expect(roundedMarks[0]).to.equal(10);
79+
expect(roundedMarks[0]).to.be.lessThan(50);
8080
executor.stop();
8181
done();
8282
}, 50);
@@ -100,7 +100,7 @@ describe('utils', function() {
100100

101101
setTimeout(() => {
102102
const roundedMarks = marks.map(roundToNearestMultipleOfTen);
103-
expect(roundedMarks[0]).to.equal(10);
103+
expect(roundedMarks[0]).to.be.lessThan(50);
104104
expect(roundedMarks.slice(1).every(mark => mark === 50)).to.be.true;
105105
executor.stop();
106106
done();

0 commit comments

Comments
 (0)