diff --git a/src/mongo_logger.ts b/src/mongo_logger.ts index 23f270bd4a3..390179bf4da 100644 --- a/src/mongo_logger.ts +++ b/src/mongo_logger.ts @@ -1,5 +1,4 @@ -import type { Writable } from 'stream'; -import { inspect } from 'util'; +import { inspect, promisify } from 'util'; import { type Document, EJSON, type EJSONOptions, type ObjectId } from './bson'; import type { CommandStartedEvent } from './cmap/command_monitoring_events'; @@ -58,7 +57,7 @@ import type { ServerSelectionSucceededEvent, WaitingForSuitableServerEvent } from './sdam/server_selection_events'; -import { HostAddress, parseUnsignedInteger } from './utils'; +import { HostAddress, isPromiseLike, parseUnsignedInteger } from './utils'; /** @internal */ export const SeverityLevel = Object.freeze({ @@ -192,16 +191,19 @@ export interface MongoLoggerOptions { /** Max length of embedded EJSON docs. Setting to 0 disables truncation. Defaults to 1000. */ maxDocumentLength: number; /** Destination for log messages. */ - logDestination: Writable | MongoDBLogWritable; + logDestination: MongoDBLogWritable; + /** For internal check to see if error should stop logging. */ + logDestinationIsStdErr: boolean; } /** * Parses a string as one of SeverityLevel + * @internal * * @param s - the value to be parsed * @returns one of SeverityLevel if value can be parsed as such, otherwise null */ -function parseSeverityFromString(s?: string): SeverityLevel | null { +export function parseSeverityFromString(s?: string): SeverityLevel | null { const validSeverities: string[] = Object.values(SeverityLevel); const lowerSeverity = s?.toLowerCase(); @@ -217,10 +219,10 @@ export function createStdioLogger(stream: { write: NodeJS.WriteStream['write']; }): MongoDBLogWritable { return { - write: (log: Log): unknown => { - stream.write(inspect(log, { compact: true, breakLength: Infinity }), 'utf-8'); + write: promisify((log: Log, cb: (error?: Error) => void): unknown => { + stream.write(inspect(log, { compact: true, breakLength: Infinity }), 'utf-8', cb); return; - } + }) }; } @@ -237,26 +239,26 @@ export function createStdioLogger(stream: { function resolveLogPath( { MONGODB_LOG_PATH }: MongoLoggerEnvOptions, { mongodbLogPath }: MongoLoggerMongoClientOptions -): MongoDBLogWritable { +): { mongodbLogPath: MongoDBLogWritable; mongodbLogPathIsStdErr: boolean } { if (typeof mongodbLogPath === 'string' && /^stderr$/i.test(mongodbLogPath)) { - return createStdioLogger(process.stderr); + return { mongodbLogPath: createStdioLogger(process.stderr), mongodbLogPathIsStdErr: true }; } if (typeof mongodbLogPath === 'string' && /^stdout$/i.test(mongodbLogPath)) { - return createStdioLogger(process.stdout); + return { mongodbLogPath: createStdioLogger(process.stdout), mongodbLogPathIsStdErr: false }; } if (typeof mongodbLogPath === 'object' && typeof mongodbLogPath?.write === 'function') { - return mongodbLogPath; + return { mongodbLogPath: mongodbLogPath, mongodbLogPathIsStdErr: false }; } if (MONGODB_LOG_PATH && /^stderr$/i.test(MONGODB_LOG_PATH)) { - return createStdioLogger(process.stderr); + return { mongodbLogPath: createStdioLogger(process.stderr), mongodbLogPathIsStdErr: true }; } if (MONGODB_LOG_PATH && /^stdout$/i.test(MONGODB_LOG_PATH)) { - return createStdioLogger(process.stdout); + return { mongodbLogPath: createStdioLogger(process.stdout), mongodbLogPathIsStdErr: false }; } - return createStdioLogger(process.stderr); + return { mongodbLogPath: createStdioLogger(process.stderr), mongodbLogPathIsStdErr: true }; } function resolveSeverityConfiguration( @@ -281,7 +283,7 @@ export interface Log extends Record { /** @internal */ export interface MongoDBLogWritable { - write(log: Log): void; + write(log: Log): PromiseLike | unknown; } function compareSeverity(s0: SeverityLevel, s1: SeverityLevel): 1 | 0 | -1 { @@ -415,10 +417,10 @@ export function stringifyWithMaxLen( ): string { let strToTruncate = ''; - if (typeof value === 'function') { - strToTruncate = value.toString(); - } else { - strToTruncate = EJSON.stringify(value, options); + try { + strToTruncate = typeof value !== 'function' ? EJSON.stringify(value, options) : value.name; + } catch (e) { + strToTruncate = `Extended JSON serialization failed with: ${e.message}`; } return maxDocumentLength !== 0 && strToTruncate.length > maxDocumentLength @@ -455,7 +457,7 @@ function attachCommandFields( ) { log.commandName = commandEvent.commandName; log.requestId = commandEvent.requestId; - log.driverConnectionId = commandEvent?.connectionId; + log.driverConnectionId = commandEvent.connectionId; const { host, port } = HostAddress.fromString(commandEvent.address).toHostPort(); log.serverHost = host; log.serverPort = port; @@ -463,7 +465,7 @@ function attachCommandFields( log.serviceId = commandEvent.serviceId.toHexString(); } log.databaseName = commandEvent.databaseName; - log.serverConnectionId = commandEvent?.serverConnectionId; + log.serverConnectionId = commandEvent.serverConnectionId; return log; } @@ -497,7 +499,8 @@ function attachServerHeartbeatFields( return log; } -function defaultLogTransform( +/** @internal */ +export function defaultLogTransform( logObject: LoggableEvent | Record, maxDocumentLength: number = DEFAULT_MAX_DOCUMENT_LENGTH ): Omit { @@ -509,7 +512,7 @@ function defaultLogTransform( return log; case SERVER_SELECTION_FAILED: log = attachServerSelectionFields(log, logObject, maxDocumentLength); - log.failure = logObject.failure.message; + log.failure = logObject.failure?.message; return log; case SERVER_SELECTION_SUCCEEDED: log = attachServerSelectionFields(log, logObject, maxDocumentLength); @@ -536,7 +539,7 @@ function defaultLogTransform( log = attachCommandFields(log, logObject); log.message = 'Command failed'; log.durationMS = logObject.duration; - log.failure = logObject.failure.message ?? '(redacted)'; + log.failure = logObject.failure?.message ?? '(redacted)'; return log; case CONNECTION_POOL_CREATED: log = attachConnectionFields(log, logObject); @@ -562,7 +565,7 @@ function defaultLogTransform( log = attachConnectionFields(log, logObject); log.message = 'Connection pool cleared'; if (logObject.serviceId?._bsontype === 'ObjectId') { - log.serviceId = logObject.serviceId.toHexString(); + log.serviceId = logObject.serviceId?.toHexString(); } return log; case CONNECTION_POOL_CLOSED: @@ -666,7 +669,7 @@ function defaultLogTransform( log = attachServerHeartbeatFields(log, logObject); log.message = 'Server heartbeat failed'; log.durationMS = logObject.duration; - log.failure = logObject.failure.message; + log.failure = logObject.failure?.message; return log; case TOPOLOGY_OPENING: log = attachSDAMFields(log, logObject); @@ -700,7 +703,9 @@ function defaultLogTransform( export class MongoLogger { componentSeverities: Record; maxDocumentLength: number; - logDestination: MongoDBLogWritable | Writable; + logDestination: MongoDBLogWritable; + logDestinationIsStdErr: boolean; + pendingLog: PromiseLike | unknown = null; /** * This method should be used when logging errors that do not have a public driver API for @@ -732,12 +737,44 @@ export class MongoLogger { this.componentSeverities = options.componentSeverities; this.maxDocumentLength = options.maxDocumentLength; this.logDestination = options.logDestination; + this.logDestinationIsStdErr = options.logDestinationIsStdErr; } willLog(severity: SeverityLevel, component: MongoLoggableComponent): boolean { return compareSeverity(severity, this.componentSeverities[component]) <= 0; } + turnOffSeverities() { + for (const key of Object.values(MongoLoggableComponent)) { + this.componentSeverities[key as MongoLoggableComponent] = SeverityLevel.OFF; + } + } + + private logWriteFailureHandler(error: Error) { + if (this.logDestinationIsStdErr) { + this.turnOffSeverities(); + this.clearPendingLog(); + return; + } + this.logDestination = createStdioLogger(process.stderr); + this.logDestinationIsStdErr = true; + this.clearPendingLog(); + this.error(MongoLoggableComponent.CLIENT, { + toLog: function () { + return { + message: 'User input for mongodbLogPath is now invalid. Logging is halted.', + error: error.message + }; + } + }); + this.turnOffSeverities(); + this.clearPendingLog(); + } + + private clearPendingLog() { + this.pendingLog = null; + } + private log( severity: SeverityLevel, component: MongoLoggableComponent, @@ -755,7 +792,25 @@ export class MongoLogger { logMessage = { ...logMessage, ...defaultLogTransform(message, this.maxDocumentLength) }; } } - this.logDestination.write(logMessage); + + if (isPromiseLike(this.pendingLog)) { + this.pendingLog = this.pendingLog + .then(() => this.logDestination.write(logMessage)) + .then(this.clearPendingLog.bind(this), this.logWriteFailureHandler.bind(this)); + return; + } + + try { + const logResult = this.logDestination.write(logMessage); + if (isPromiseLike(logResult)) { + this.pendingLog = logResult.then( + this.clearPendingLog.bind(this), + this.logWriteFailureHandler.bind(this) + ); + } + } catch (error) { + this.logWriteFailureHandler(error); + } } /** @@ -776,10 +831,12 @@ export class MongoLogger { clientOptions: MongoLoggerMongoClientOptions ): MongoLoggerOptions { // client options take precedence over env options + const resolvedLogPath = resolveLogPath(envOptions, clientOptions); const combinedOptions = { ...envOptions, ...clientOptions, - mongodbLogPath: resolveLogPath(envOptions, clientOptions) + mongodbLogPath: resolvedLogPath.mongodbLogPath, + mongodbLogPathIsStdErr: resolvedLogPath.mongodbLogPathIsStdErr }; const defaultSeverity = resolveSeverityConfiguration( combinedOptions.mongodbLogComponentSeverities?.default, @@ -820,7 +877,8 @@ export class MongoLogger { combinedOptions.mongodbLogMaxDocumentLength ?? parseUnsignedInteger(combinedOptions.MONGODB_LOG_MAX_DOCUMENT_LENGTH) ?? 1000, - logDestination: combinedOptions.mongodbLogPath + logDestination: combinedOptions.mongodbLogPath, + logDestinationIsStdErr: combinedOptions.mongodbLogPathIsStdErr }; } } diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 2ffae442415..8c92f08b625 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -22,14 +22,15 @@ export type ServerSelector = ( * Returns a server selector that selects for writable servers */ export function writableServerSelector(): ServerSelector { - return ( + return function writableServer( topologyDescription: TopologyDescription, servers: ServerDescription[] - ): ServerDescription[] => - latencyWindowReducer( + ): ServerDescription[] { + return latencyWindowReducer( topologyDescription, servers.filter((s: ServerDescription) => s.isWritable) ); + }; } /** @@ -37,10 +38,10 @@ export function writableServerSelector(): ServerSelector { * if it is in a state that it can have commands sent to it. */ export function sameServerSelector(description?: ServerDescription): ServerSelector { - return ( + return function sameServerSelector( topologyDescription: TopologyDescription, servers: ServerDescription[] - ): ServerDescription[] => { + ): ServerDescription[] { if (!description) return []; // Filter the servers to match the provided description only if // the type is not unknown. @@ -265,11 +266,11 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se throw new MongoInvalidArgumentError('Invalid read preference specified'); } - return ( + return function readPreferenceServers( topologyDescription: TopologyDescription, servers: ServerDescription[], deprioritized: ServerDescription[] = [] - ): ServerDescription[] => { + ): ServerDescription[] { const commonWireVersion = topologyDescription.commonWireVersion; if ( commonWireVersion && diff --git a/src/utils.ts b/src/utils.ts index 25a1cc422c4..677c681396e 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -171,8 +171,13 @@ export function applyRetryableWrites(target: T, db * @param value - An object that could be a promise * @returns true if the provided value is a Promise */ -export function isPromiseLike(value?: PromiseLike | void): value is Promise { - return !!value && typeof value.then === 'function'; +export function isPromiseLike(value?: unknown): value is PromiseLike { + return ( + value != null && + typeof value === 'object' && + 'then' in value && + typeof value.then === 'function' + ); } /** diff --git a/test/unit/mongo_logger.test.ts b/test/unit/mongo_logger.test.ts index 90a36437494..7a6e1b2da9b 100644 --- a/test/unit/mongo_logger.test.ts +++ b/test/unit/mongo_logger.test.ts @@ -19,14 +19,17 @@ import { CONNECTION_POOL_CREATED, CONNECTION_POOL_READY, CONNECTION_READY, + createStdioLogger, DEFAULT_MAX_DOCUMENT_LENGTH, type Log, type MongoDBLogWritable, MongoLogger, type MongoLoggerOptions, + parseSeverityFromString, SeverityLevel, stringifyWithMaxLen } from '../mongodb'; +import { sleep } from '../tools/utils'; class BufferingStream extends Writable { buffer: any[] = []; @@ -53,7 +56,7 @@ describe('meta tests for BufferingStream', function () { }); }); -describe('class MongoLogger', function () { +describe('class MongoLogger', async function () { describe('#constructor()', function () { it('assigns each property from the options object onto the logging class', function () { const componentSeverities: MongoLoggerOptions['componentSeverities'] = { @@ -63,7 +66,8 @@ describe('class MongoLogger', function () { const logger = new MongoLogger({ componentSeverities, maxDocumentLength: 10, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false }); expect(logger).to.have.property('componentSeverities', componentSeverities); @@ -81,7 +85,8 @@ describe('class MongoLogger', function () { } as { buffer: any[]; write: (log: Log) => void }; const logger = new MongoLogger({ componentSeverities: { command: 'error' } as any, - logDestination + logDestination, + logDestinationIsStdErr: false } as any); logger.error('command', 'Hello world!'); @@ -101,7 +106,8 @@ describe('class MongoLogger', function () { const logger = new MongoLogger({ componentSeverities: { command: 'error' } as any, - logDestination + logDestination, + logDestinationIsStdErr: false } as any); logger.error('command', 'Hello world!'); @@ -644,7 +650,8 @@ describe('class MongoLogger', function () { componentSeverities: { topology: 'off' } as any, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false } as any); logger[severityLevel]('topology', 'message'); @@ -658,7 +665,8 @@ describe('class MongoLogger', function () { componentSeverities: { command: severityLevel } as any, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false } as any); for (let i = index + 1; i < severities.length; i++) { @@ -677,7 +685,8 @@ describe('class MongoLogger', function () { componentSeverities: { command: severityLevel } as any, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false } as any); // Calls all severity logging methods with a level less than or equal to what severityLevel @@ -702,7 +711,8 @@ describe('class MongoLogger', function () { const stream = new BufferingStream(); const logger = new MongoLogger({ componentSeverities: { command: severityLevel } as any, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false } as any); logger[severityLevel]('command', obj); @@ -718,7 +728,8 @@ describe('class MongoLogger', function () { const stream = new BufferingStream(); const logger = new MongoLogger({ componentSeverities: { command: severityLevel } as any, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false } as any); logger[severityLevel]('command', obj); @@ -738,7 +749,8 @@ describe('class MongoLogger', function () { const stream = new BufferingStream(); const logger = new MongoLogger({ componentSeverities: { command: severityLevel } as any, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false } as any); logger[severityLevel]('command', obj); @@ -756,7 +768,8 @@ describe('class MongoLogger', function () { const stream = new BufferingStream(); const logger = new MongoLogger({ componentSeverities: { command: severityLevel } as any, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false } as any); logger[severityLevel]('command', message); @@ -776,7 +789,8 @@ describe('class MongoLogger', function () { command: 'trace', connection: 'trace' } as any, - logDestination: stream + logDestination: stream, + logDestinationIsStdErr: false } as any); }); @@ -1227,6 +1241,12 @@ describe('class MongoLogger', function () { }); }); }); + + context('when invalid severity is passed into parseSeverityFromString', function () { + it('should not throw', function () { + expect(parseSeverityFromString('notARealSeverityLevel')).to.equal(null); + }); + }); }); } }); @@ -1276,6 +1296,235 @@ describe('class MongoLogger', function () { }).to.not.throw(); }); }); + + context('EJSON stringify invalid inputs', function () { + const errorInputs = [ + { + name: 'Map with non-string keys', + input: new Map([ + [1, 'one'], + [2, 'two'], + [3, 'three'] + ]) + }, + { + name: 'Object with invalid _bsontype', + input: { _bsontype: 'i will never be a real bson type' } + } + ]; + for (const errorInput of errorInputs) { + context(`when value is ${errorInput.name}`, function () { + it('should output default error message, with no error thrown', function () { + expect(stringifyWithMaxLen(errorInput.input, 40)).to.equal( + 'Extended JSON serialization failed with:...' + ); + }); + }); + } + }); + + context('when given function as input', function () { + it('should output function.name', function () { + expect( + stringifyWithMaxLen(function randomFunc() { + return 1; + }, DEFAULT_MAX_DOCUMENT_LENGTH) + ).to.equal('randomFunc'); + }); + }); + }); + }); + + describe('log', async function () { + let componentSeverities: MongoLoggerOptions['componentSeverities']; + + beforeEach(function () { + componentSeverities = { + command: 'trace', + topology: 'trace', + serverSelection: 'trace', + connection: 'trace', + client: 'trace' + } as any; + }); + + describe('sync stream failure handling', function () { + context('when stream is not stderr', function () { + let stderrStub; + + beforeEach(function () { + stderrStub = sinon.stub(process.stderr); + }); + + afterEach(function () { + sinon.restore(); + }); + + context('when stream is user defined and stream.write throws', function () { + it('should catch error, not crash application, warn user, and start writing to stderr', function () { + const stream = { + write(_log) { + throw Error('This writable always throws'); + } + }; + const logger = new MongoLogger({ + componentSeverities, + maxDocumentLength: 1000, + logDestination: stream, + logDestinationIsStdErr: false + }); + // print random message at the debug level + logger.debug('client', 'random message'); + let stderrStubCall = stderrStub.write.getCall(0).args[0]; + stderrStubCall = stderrStubCall.slice(stderrStubCall.search('c:')); + expect(stderrStubCall).to.equal( + `c: 'client', s: 'error', message: 'User input for mongodbLogPath is now invalid. Logging is halted.', error: 'This writable always throws' }` + ); + + // logging is halted + logger.debug('client', 'random message 2'); + const stderrStubCall2 = stderrStub.write.getCall(1); + expect(stderrStubCall2).to.be.null; + expect(Object.keys(logger.componentSeverities).every(key => key === SeverityLevel.OFF)); + }); + }); + }); + }); + describe('async stream failure handling', async function () { + context('when stream is not stderr', function () { + let stderrStub; + + beforeEach(function () { + stderrStub = sinon.stub(process.stderr); + }); + + afterEach(function () { + sinon.restore(); + }); + + context('when stream user defined stream and stream.write throws async', async function () { + it('should catch error, not crash application, warn user, and start writing to stderr', async function () { + const stream = { + async write(_log) { + await sleep(500); + throw Error('This writable always throws, but only after at least 500ms'); + } + }; + const logger = new MongoLogger({ + componentSeverities, + maxDocumentLength: 1000, + logDestination: stream, + logDestinationIsStdErr: false + }); + // print random message at the debug level + logger.debug('client', 'random message'); + + // before timeout resolves, no error + expect(stderrStub.write.getCall(0)).to.be.null; + + // manually wait for timeout to end + await sleep(600); + + // stderr now contains the error message + let stderrStubCall = stderrStub.write.getCall(0).args[0]; + stderrStubCall = stderrStubCall.slice(stderrStubCall.search('c:')); + expect(stderrStubCall).to.equal( + `c: 'client', s: 'error', message: 'User input for mongodbLogPath is now invalid. Logging is halted.', error: 'This writable always throws, but only after at least 500ms' }` + ); + + // no more logging in the future + logger.debug('client', 'random message 2'); + const stderrStubCall2 = stderrStub.write.getCall(1); + expect(stderrStubCall2).to.be.null; + expect(Object.keys(logger.componentSeverities).every(key => key === SeverityLevel.OFF)); + }); + }); + + context('when stream is stdout and stdout.write throws', async function () { + it('should catch error, not crash application, warn user, and start writing to stderr', async function () { + sinon.stub(process.stdout, 'write').throws(new Error('I am stdout and do not work')); + // print random message at the debug level + const logger = new MongoLogger({ + componentSeverities, + maxDocumentLength: 1000, + logDestination: createStdioLogger(process.stdout), + logDestinationIsStdErr: false + }); + logger.debug('client', 'random message'); + + // manually wait for promise to resolve (takes extra time with promisify) + await sleep(600); + + let stderrStubCall = stderrStub.write.getCall(0).args[0]; + stderrStubCall = stderrStubCall.slice(stderrStubCall.search('c:')); + expect(stderrStubCall).to.equal( + `c: 'client', s: 'error', message: 'User input for mongodbLogPath is now invalid. Logging is halted.', error: 'I am stdout and do not work' }` + ); + + // logging is halted + logger.debug('client', 'random message 2'); + const stderrStubCall2 = stderrStub.write.getCall(1); + expect(stderrStubCall2).to.be.null; + expect(Object.keys(logger.componentSeverities).every(key => key === SeverityLevel.OFF)); + }); + }); + }); + + context('when stream is stderr', function () { + context('when stderr.write throws', function () { + beforeEach(function () { + sinon.stub(process.stderr, 'write').throws(new Error('fake stderr failure')); + }); + afterEach(function () { + sinon.restore(); + }); + + it('should not throw error and turn off severities', function () { + // print random message at the debug level + const logger = new MongoLogger({ + componentSeverities, + maxDocumentLength: 1000, + logDestination: createStdioLogger(process.stderr), + logDestinationIsStdErr: true + }); + expect(() => logger.debug('client', 'random message')).to.not.throw(Error); + expect(Object.keys(logger.componentSeverities).every(key => key === SeverityLevel.OFF)); + }); + }); + }); + }); + context('when async stream has multiple logs with different timeouts', async function () { + it('should preserve their order', async function () { + const stream = { + buffer: [], + async write(log) { + if (log.message === 'longer timeout') { + await sleep(2000); + } else if (log.message === 'shorter timeout') { + await sleep(500); + } + this.buffer.push(log.message); + } + }; + const logger = new MongoLogger({ + componentSeverities, + maxDocumentLength: 1000, + logDestination: stream, + logDestinationIsStdErr: false + }); + + logger.debug('client', 'longer timeout'); + logger.debug('client', 'shorter timeout'); + logger.debug('client', 'no timeout'); + + expect(stream.buffer.length).to.equal(0); + + await sleep(2100); + expect(stream.buffer).to.deep.equal(['longer timeout']); + + await sleep(600); + expect(stream.buffer).to.deep.equal(['longer timeout', 'shorter timeout', 'no timeout']); + }); }); }); });