Skip to content

Commit 0bde921

Browse files
committed
fix(NODE-4788)!: correctly implement gridfs stream underscore methods
1 parent 51a573f commit 0bde921

File tree

3 files changed

+143
-452
lines changed

3 files changed

+143
-452
lines changed

src/gridfs/upload.ts

Lines changed: 50 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ export interface GridFSBucketWriteStreamOptions extends WriteConcernOptions {
3838
* Do not instantiate this class directly. Use `openUploadStream()` instead.
3939
* @public
4040
*/
41-
export class GridFSBucketWriteStream extends Writable implements NodeJS.WritableStream {
41+
export class GridFSBucketWriteStream extends Writable {
4242
bucket: GridFSBucket;
4343
chunks: Collection<GridFSChunk>;
4444
filename: string;
@@ -115,6 +115,14 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
115115
}
116116
}
117117

118+
/** The stream is considered constructed when the indexes are done being created */
119+
override _construct(callback: (error?: Error | null) => void): void {
120+
if (this.bucket.s.checkedIndexes) {
121+
return process.nextTick(callback);
122+
}
123+
this.bucket.once('index', callback);
124+
}
125+
118126
/**
119127
* Write a buffer to the stream.
120128
*
@@ -123,22 +131,18 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
123131
* @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
124132
* @returns False if this write required flushing a chunk to MongoDB. True otherwise.
125133
*/
126-
override write(chunk: Buffer | string): boolean;
127-
override write(chunk: Buffer | string, callback: Callback<void>): boolean;
128-
override write(chunk: Buffer | string, encoding: BufferEncoding | undefined): boolean;
129-
override write(
134+
override _write(
130135
chunk: Buffer | string,
131-
encoding: BufferEncoding | undefined,
136+
encoding: BufferEncoding,
132137
callback: Callback<void>
133-
): boolean;
134-
override write(
135-
chunk: Buffer | string,
136-
encodingOrCallback?: Callback<void> | BufferEncoding,
137-
callback?: Callback<void>
138-
): boolean {
139-
const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
140-
callback = typeof encodingOrCallback === 'function' ? encodingOrCallback : callback;
141-
return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback));
138+
): void {
139+
doWrite(this, chunk, encoding, callback);
140+
}
141+
142+
override _final(callback: (error?: Error | null) => void): void {
143+
if (this.state.streamEnd) process.nextTick(callback);
144+
this.state.streamEnd = true;
145+
writeRemnant(this, callback);
142146
}
143147

144148
/**
@@ -159,76 +163,14 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
159163
this.state.aborted = true;
160164
await this.chunks.deleteMany({ files_id: this.id });
161165
}
162-
163-
/**
164-
* Tells the stream that no more data will be coming in. The stream will
165-
* persist the remaining data to MongoDB, write the files document, and
166-
* then emit a 'finish' event.
167-
*
168-
* @param chunk - Buffer to write
169-
* @param encoding - Optional encoding for the buffer
170-
* @param callback - Function to call when all files and chunks have been persisted to MongoDB
171-
*/
172-
override end(): this;
173-
override end(chunk: Buffer): this;
174-
override end(callback: Callback<GridFSFile | void>): this;
175-
override end(chunk: Buffer, callback: Callback<GridFSFile | void>): this;
176-
override end(chunk: Buffer, encoding: BufferEncoding): this;
177-
override end(
178-
chunk: Buffer,
179-
encoding: BufferEncoding | undefined,
180-
callback: Callback<GridFSFile | void>
181-
): this;
182-
override end(
183-
chunkOrCallback?: Buffer | Callback<GridFSFile | void>,
184-
encodingOrCallback?: BufferEncoding | Callback<GridFSFile | void>,
185-
callback?: Callback<GridFSFile | void>
186-
): this {
187-
const chunk = typeof chunkOrCallback === 'function' ? undefined : chunkOrCallback;
188-
const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
189-
callback =
190-
typeof chunkOrCallback === 'function'
191-
? chunkOrCallback
192-
: typeof encodingOrCallback === 'function'
193-
? encodingOrCallback
194-
: callback;
195-
196-
if (this.state.streamEnd || checkAborted(this, callback)) return this;
197-
198-
this.state.streamEnd = true;
199-
200-
if (callback) {
201-
this.once(GridFSBucketWriteStream.FINISH, (result: GridFSFile) => {
202-
if (callback) callback(undefined, result);
203-
});
204-
}
205-
206-
if (!chunk) {
207-
waitForIndexes(this, () => !!writeRemnant(this));
208-
return this;
209-
}
210-
211-
this.write(chunk, encoding, () => {
212-
writeRemnant(this);
213-
});
214-
215-
return this;
216-
}
217166
}
218167

219-
function __handleError(
220-
stream: GridFSBucketWriteStream,
221-
error: AnyError,
222-
callback?: Callback
223-
): void {
168+
function handleError(stream: GridFSBucketWriteStream, error: AnyError, callback: Callback): void {
224169
if (stream.state.errored) {
225170
return;
226171
}
227172
stream.state.errored = true;
228-
if (callback) {
229-
return callback(error);
230-
}
231-
stream.emit(GridFSBucketWriteStream.ERROR, error);
173+
process.nextTick(callback, error);
232174
}
233175

234176
function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk {
@@ -271,8 +213,11 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>
271213
}
272214
}
273215

274-
function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolean {
275-
if (stream.done) return true;
216+
function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
217+
if (stream.done) {
218+
process.nextTick(callback);
219+
return;
220+
}
276221
if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
277222
// Set done so we do not trigger duplicate createFilesDoc
278223
stream.done = true;
@@ -287,24 +232,18 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea
287232
stream.options.metadata
288233
);
289234

290-
if (checkAborted(stream, callback)) {
291-
return false;
235+
if (isAborted(stream, callback)) {
236+
return;
292237
}
293238

294239
stream.files.insertOne(filesDoc, { writeConcern: stream.writeConcern }).then(
295-
() => {
296-
stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
297-
stream.emit(GridFSBucketWriteStream.CLOSE);
298-
},
240+
() => callback(),
299241
error => {
300-
return __handleError(stream, error, callback);
242+
return handleError(stream, error, callback);
301243
}
302244
);
303-
304-
return true;
305245
}
306-
307-
return false;
246+
process.nextTick(callback);
308247
}
309248

310249
async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
@@ -377,11 +316,11 @@ function createFilesDoc(
377316
function doWrite(
378317
stream: GridFSBucketWriteStream,
379318
chunk: Buffer | string,
380-
encoding?: BufferEncoding,
381-
callback?: Callback<void>
382-
): boolean {
383-
if (checkAborted(stream, callback)) {
384-
return false;
319+
encoding: BufferEncoding,
320+
callback: Callback<void>
321+
): void {
322+
if (isAborted(stream, callback)) {
323+
return;
385324
}
386325

387326
const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
@@ -392,13 +331,8 @@ function doWrite(
392331
if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
393332
inputBuf.copy(stream.bufToStore, stream.pos);
394333
stream.pos += inputBuf.length;
395-
396-
callback && callback();
397-
398-
// Note that we reverse the typical semantics of write's return value
399-
// to be compatible with node's `.pipe()` function.
400-
// True means client can keep writing.
401-
return true;
334+
process.nextTick(callback);
335+
return;
402336
}
403337

404338
// Otherwise, buffer is too big for current chunk, so we need to flush
@@ -418,8 +352,8 @@ function doWrite(
418352
++stream.state.outstandingRequests;
419353
++outstandingRequests;
420354

421-
if (checkAborted(stream, callback)) {
422-
return false;
355+
if (isAborted(stream, callback)) {
356+
return;
423357
}
424358

425359
stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(
@@ -429,12 +363,11 @@ function doWrite(
429363

430364
if (!outstandingRequests) {
431365
stream.emit('drain', doc);
432-
callback && callback();
433-
checkDone(stream);
366+
checkDone(stream, callback);
434367
}
435368
},
436369
error => {
437-
return __handleError(stream, error);
370+
return handleError(stream, error, callback);
438371
}
439372
);
440373

@@ -445,29 +378,9 @@ function doWrite(
445378
inputBufRemaining -= numToCopy;
446379
numToCopy = Math.min(spaceRemaining, inputBufRemaining);
447380
}
448-
449-
// Note that we reverse the typical semantics of write's return value
450-
// to be compatible with node's `.pipe()` function.
451-
// False means the client should wait for the 'drain' event.
452-
return false;
453-
}
454-
455-
function waitForIndexes(
456-
stream: GridFSBucketWriteStream,
457-
callback: (res: boolean) => boolean
458-
): boolean {
459-
if (stream.bucket.s.checkedIndexes) {
460-
return callback(false);
461-
}
462-
463-
stream.bucket.once('index', () => {
464-
callback(true);
465-
});
466-
467-
return true;
468381
}
469382

470-
function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boolean {
383+
function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void {
471384
// Buffer is empty, so don't bother to insert
472385
if (stream.pos === 0) {
473386
return checkDone(stream, callback);
@@ -482,28 +395,25 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo
482395
const doc = createChunkDoc(stream.id, stream.n, remnant);
483396

484397
// If the stream was aborted, do not write remnant
485-
if (checkAborted(stream, callback)) {
486-
return false;
398+
if (isAborted(stream, callback)) {
399+
return;
487400
}
488401

489402
stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(
490403
() => {
491404
--stream.state.outstandingRequests;
492-
checkDone(stream);
405+
checkDone(stream, callback);
493406
},
494407
error => {
495-
return __handleError(stream, error);
408+
return handleError(stream, error, callback);
496409
}
497410
);
498-
return true;
411+
return;
499412
}
500413

501-
function checkAborted(stream: GridFSBucketWriteStream, callback?: Callback<void>): boolean {
414+
function isAborted(stream: GridFSBucketWriteStream, callback: Callback<void>): boolean {
502415
if (stream.state.aborted) {
503-
if (typeof callback === 'function') {
504-
// TODO(NODE-3485): Replace with MongoGridFSStreamClosedError
505-
callback(new MongoAPIError('Stream has been aborted'));
506-
}
416+
process.nextTick(callback, new MongoAPIError('Stream has been aborted'));
507417
return true;
508418
}
509419
return false;

0 commit comments

Comments
 (0)