Skip to content

Commit 1181991

Browse files
pr again
1 parent 299e8a1 commit 1181991

File tree

2 files changed

+221
-1
lines changed

2 files changed

+221
-1
lines changed

src/cmap/message_stream.ts

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
import { Duplex, type DuplexOptions } from 'stream';
2+
3+
import type { BSONSerializeOptions, Document } from '../bson';
4+
import { MongoDecompressionError, MongoParseError } from '../error';
5+
import type { ClientSession } from '../sessions';
6+
import { BufferPool, type Callback } from '../utils';
7+
import {
8+
type MessageHeader,
9+
OpCompressedRequest,
10+
OpMsgResponse,
11+
OpQueryResponse,
12+
type WriteProtocolMessageType
13+
} from './commands';
14+
import { compress, Compressor, type CompressorName, decompress } from './wire_protocol/compression';
15+
import { OP_COMPRESSED, OP_MSG } from './wire_protocol/constants';
16+
17+
const MESSAGE_HEADER_SIZE = 16;
18+
const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID
19+
20+
const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
21+
/** @internal */
22+
const kBuffer = Symbol('buffer');
23+
24+
/** @internal */
25+
export interface MessageStreamOptions extends DuplexOptions {
26+
maxBsonMessageSize?: number;
27+
}
28+
29+
/** @internal */
30+
export interface OperationDescription extends BSONSerializeOptions {
31+
started: number;
32+
cb: Callback<Document>;
33+
documentsReturnedIn?: string;
34+
noResponse: boolean;
35+
raw: boolean;
36+
requestId: number;
37+
session?: ClientSession;
38+
agreedCompressor?: CompressorName;
39+
zlibCompressionLevel?: number;
40+
$clusterTime?: Document;
41+
}
42+
43+
/**
44+
* A duplex stream that is capable of reading and writing raw wire protocol messages, with
45+
* support for optional compression
46+
* @internal
47+
*/
48+
export class MessageStream extends Duplex {
49+
/** @internal */
50+
maxBsonMessageSize: number;
51+
/** @internal */
52+
[kBuffer]: BufferPool;
53+
/** @internal */
54+
isMonitoringConnection = false;
55+
56+
constructor(options: MessageStreamOptions = {}) {
57+
super(options);
58+
this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
59+
this[kBuffer] = new BufferPool();
60+
}
61+
62+
get buffer(): BufferPool {
63+
return this[kBuffer];
64+
}
65+
66+
override _write(chunk: Buffer, _: unknown, callback: Callback<Buffer>): void {
67+
this[kBuffer].append(chunk);
68+
processIncomingData(this, callback);
69+
}
70+
71+
override _read(/* size */): void {
72+
// NOTE: This implementation is empty because we explicitly push data to be read
73+
// when `writeMessage` is called.
74+
return;
75+
}
76+
77+
writeCommand(
78+
command: WriteProtocolMessageType,
79+
operationDescription: OperationDescription
80+
): void {
81+
const agreedCompressor = operationDescription.agreedCompressor ?? 'none';
82+
if (agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)) {
83+
const data = command.toBin();
84+
this.push(Array.isArray(data) ? Buffer.concat(data) : data);
85+
return;
86+
}
87+
// otherwise, compress the message
88+
const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());
89+
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
90+
91+
// Extract information needed for OP_COMPRESSED from the uncompressed message
92+
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
93+
94+
const options = {
95+
agreedCompressor,
96+
zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0
97+
};
98+
// Compress the message body
99+
compress(options, messageToBeCompressed).then(
100+
compressedMessage => {
101+
// Create the msgHeader of OP_COMPRESSED
102+
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
103+
msgHeader.writeInt32LE(
104+
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
105+
0
106+
); // messageLength
107+
msgHeader.writeInt32LE(command.requestId, 4); // requestID
108+
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
109+
msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode
110+
111+
// Create the compression details of OP_COMPRESSED
112+
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
113+
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
114+
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
115+
compressionDetails.writeUInt8(Compressor[agreedCompressor], 8); // compressorID
116+
this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
117+
},
118+
error => {
119+
operationDescription.cb(error);
120+
}
121+
);
122+
}
123+
}
124+
125+
function processIncomingData(stream: MessageStream, callback: Callback<Buffer>): void {
126+
const buffer = stream[kBuffer];
127+
const sizeOfMessage = buffer.getInt32();
128+
129+
if (sizeOfMessage == null) {
130+
return callback();
131+
}
132+
133+
if (sizeOfMessage < 0) {
134+
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
135+
}
136+
137+
if (sizeOfMessage > stream.maxBsonMessageSize) {
138+
return callback(
139+
new MongoParseError(
140+
`Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`
141+
)
142+
);
143+
}
144+
145+
if (sizeOfMessage > buffer.length) {
146+
return callback();
147+
}
148+
149+
const message = buffer.read(sizeOfMessage);
150+
const messageHeader: MessageHeader = {
151+
length: message.readInt32LE(0),
152+
requestId: message.readInt32LE(4),
153+
responseTo: message.readInt32LE(8),
154+
opCode: message.readInt32LE(12)
155+
};
156+
157+
const monitorHasAnotherHello = () => {
158+
if (stream.isMonitoringConnection) {
159+
// Can we read the next message size?
160+
const sizeOfMessage = buffer.getInt32();
161+
if (sizeOfMessage != null && sizeOfMessage <= buffer.length) {
162+
return true;
163+
}
164+
}
165+
return false;
166+
};
167+
168+
let ResponseType = messageHeader.opCode === OP_MSG ? OpMsgResponse : OpQueryResponse;
169+
if (messageHeader.opCode !== OP_COMPRESSED) {
170+
const messageBody = message.subarray(MESSAGE_HEADER_SIZE);
171+
172+
// If we are a monitoring connection message stream and
173+
// there is more in the buffer that can be read, skip processing since we
174+
// want the last hello command response that is in the buffer.
175+
if (monitorHasAnotherHello()) {
176+
return processIncomingData(stream, callback);
177+
}
178+
179+
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
180+
181+
if (buffer.length >= 4) {
182+
return processIncomingData(stream, callback);
183+
}
184+
return callback();
185+
}
186+
187+
messageHeader.fromCompressed = true;
188+
messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
189+
messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
190+
const compressorID = message[MESSAGE_HEADER_SIZE + 8];
191+
const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);
192+
193+
// recalculate based on wrapped opcode
194+
ResponseType = messageHeader.opCode === OP_MSG ? OpMsgResponse : OpQueryResponse;
195+
decompress(compressorID, compressedBuffer).then(
196+
messageBody => {
197+
if (messageBody.length !== messageHeader.length) {
198+
return callback(
199+
new MongoDecompressionError('Message body and message header must be the same length')
200+
);
201+
}
202+
203+
// If we are a monitoring connection message stream and
204+
// there is more in the buffer that can be read, skip processing since we
205+
// want the last hello command response that is in the buffer.
206+
if (monitorHasAnotherHello()) {
207+
return processIncomingData(stream, callback);
208+
}
209+
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
210+
211+
if (buffer.length >= 4) {
212+
return processIncomingData(stream, callback);
213+
}
214+
return callback();
215+
},
216+
error => {
217+
return callback(error);
218+
}
219+
);
220+
}

test/integration/command-logging-and-monitoring/command_logging_and_monitoring.prose.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { expect } from 'chai';
22

33
import { DEFAULT_MAX_DOCUMENT_LENGTH, type Document } from '../../mongodb';
44

5-
describe('Command Logging and Monitoring Prose Tests', function () {
5+
describe.only('Command Logging and Monitoring Prose Tests', function () {
66
const loggerFeatureFlag = Symbol.for('@@mdb.enableMongoLogger');
77
const ELLIPSES_LENGTH = 3;
88
let client;

0 commit comments

Comments
 (0)