Skip to content

Commit e6aebb4

Browse files
committed
fix(NODE-4788)!: correctly implement gridfs stream underscore methods
1 parent 51a573f commit e6aebb4

File tree

4 files changed

+167
-477
lines changed

4 files changed

+167
-477
lines changed

src/gridfs/upload.ts

Lines changed: 60 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Writable } from 'stream';
33
import type { Document } from '../bson';
44
import { ObjectId } from '../bson';
55
import type { Collection } from '../collection';
6-
import { type AnyError, MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error';
6+
import { MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error';
77
import type { Callback } from '../utils';
88
import type { WriteConcernOptions } from '../write_concern';
99
import { WriteConcern } from './../write_concern';
@@ -38,7 +38,7 @@ export interface GridFSBucketWriteStreamOptions extends WriteConcernOptions {
3838
* Do not instantiate this class directly. Use `openUploadStream()` instead.
3939
* @public
4040
*/
41-
export class GridFSBucketWriteStream extends Writable implements NodeJS.WritableStream {
41+
export class GridFSBucketWriteStream extends Writable {
4242
bucket: GridFSBucket;
4343
chunks: Collection<GridFSChunk>;
4444
filename: string;
@@ -59,15 +59,7 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
5959
};
6060
writeConcern?: WriteConcern;
6161

62-
/** @event */
63-
static readonly CLOSE = 'close';
64-
/** @event */
65-
static readonly ERROR = 'error';
66-
/**
67-
* `end()` was called and the write stream successfully wrote the file metadata and all the chunks to MongoDB.
68-
* @event
69-
*/
70-
static readonly FINISH = 'finish';
62+
fileMetadata: GridFSFile | null = null;
7163

7264
/**
7365
* @param bucket - Handle for this stream's corresponding bucket
@@ -115,6 +107,16 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
115107
}
116108
}
117109

110+
/**
111+
* The stream is considered constructed when the indexes ßare done being created
112+
*/
113+
override _construct(callback: (error?: Error | null) => void): void {
114+
if (this.bucket.s.checkedIndexes) {
115+
return process.nextTick(callback);
116+
}
117+
this.bucket.once('index', callback);
118+
}
119+
118120
/**
119121
* Write a buffer to the stream.
120122
*
@@ -123,22 +125,20 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
123125
* @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
124126
* @returns False if this write required flushing a chunk to MongoDB. True otherwise.
125127
*/
126-
override write(chunk: Buffer | string): boolean;
127-
override write(chunk: Buffer | string, callback: Callback<void>): boolean;
128-
override write(chunk: Buffer | string, encoding: BufferEncoding | undefined): boolean;
129-
override write(
128+
override _write(
130129
chunk: Buffer | string,
131-
encoding: BufferEncoding | undefined,
130+
encoding: BufferEncoding,
132131
callback: Callback<void>
133-
): boolean;
134-
override write(
135-
chunk: Buffer | string,
136-
encodingOrCallback?: Callback<void> | BufferEncoding,
137-
callback?: Callback<void>
138-
): boolean {
139-
const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
140-
callback = typeof encodingOrCallback === 'function' ? encodingOrCallback : callback;
141-
return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback));
132+
): void {
133+
doWrite(this, chunk, encoding, callback);
134+
}
135+
136+
override _final(callback: (error?: Error | null) => void): void {
137+
if (this.state.streamEnd) {
138+
return process.nextTick(callback);
139+
}
140+
this.state.streamEnd = true;
141+
writeRemnant(this, callback);
142142
}
143143

144144
/**
@@ -159,76 +159,15 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
159159
this.state.aborted = true;
160160
await this.chunks.deleteMany({ files_id: this.id });
161161
}
162-
163-
/**
164-
* Tells the stream that no more data will be coming in. The stream will
165-
* persist the remaining data to MongoDB, write the files document, and
166-
* then emit a 'finish' event.
167-
*
168-
* @param chunk - Buffer to write
169-
* @param encoding - Optional encoding for the buffer
170-
* @param callback - Function to call when all files and chunks have been persisted to MongoDB
171-
*/
172-
override end(): this;
173-
override end(chunk: Buffer): this;
174-
override end(callback: Callback<GridFSFile | void>): this;
175-
override end(chunk: Buffer, callback: Callback<GridFSFile | void>): this;
176-
override end(chunk: Buffer, encoding: BufferEncoding): this;
177-
override end(
178-
chunk: Buffer,
179-
encoding: BufferEncoding | undefined,
180-
callback: Callback<GridFSFile | void>
181-
): this;
182-
override end(
183-
chunkOrCallback?: Buffer | Callback<GridFSFile | void>,
184-
encodingOrCallback?: BufferEncoding | Callback<GridFSFile | void>,
185-
callback?: Callback<GridFSFile | void>
186-
): this {
187-
const chunk = typeof chunkOrCallback === 'function' ? undefined : chunkOrCallback;
188-
const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
189-
callback =
190-
typeof chunkOrCallback === 'function'
191-
? chunkOrCallback
192-
: typeof encodingOrCallback === 'function'
193-
? encodingOrCallback
194-
: callback;
195-
196-
if (this.state.streamEnd || checkAborted(this, callback)) return this;
197-
198-
this.state.streamEnd = true;
199-
200-
if (callback) {
201-
this.once(GridFSBucketWriteStream.FINISH, (result: GridFSFile) => {
202-
if (callback) callback(undefined, result);
203-
});
204-
}
205-
206-
if (!chunk) {
207-
waitForIndexes(this, () => !!writeRemnant(this));
208-
return this;
209-
}
210-
211-
this.write(chunk, encoding, () => {
212-
writeRemnant(this);
213-
});
214-
215-
return this;
216-
}
217162
}
218163

219-
function __handleError(
220-
stream: GridFSBucketWriteStream,
221-
error: AnyError,
222-
callback?: Callback
223-
): void {
164+
function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void {
224165
if (stream.state.errored) {
166+
process.nextTick(callback);
225167
return;
226168
}
227169
stream.state.errored = true;
228-
if (callback) {
229-
return callback(error);
230-
}
231-
stream.emit(GridFSBucketWriteStream.ERROR, error);
170+
process.nextTick(callback, error);
232171
}
233172

234173
function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk {
@@ -271,13 +210,16 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>
271210
}
272211
}
273212

274-
function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolean {
275-
if (stream.done) return true;
213+
function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
214+
if (stream.done) {
215+
return process.nextTick(callback);
216+
}
217+
276218
if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
277219
// Set done so we do not trigger duplicate createFilesDoc
278220
stream.done = true;
279221
// Create a new files doc
280-
const filesDoc = createFilesDoc(
222+
const fileMetadata = createFilesDoc(
281223
stream.id,
282224
stream.length,
283225
stream.chunkSizeBytes,
@@ -287,24 +229,21 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea
287229
stream.options.metadata
288230
);
289231

290-
if (checkAborted(stream, callback)) {
291-
return false;
232+
if (isAborted(stream, callback)) {
233+
return;
292234
}
293235

294-
stream.files.insertOne(filesDoc, { writeConcern: stream.writeConcern }).then(
236+
stream.files.insertOne(fileMetadata, { writeConcern: stream.writeConcern }).then(
295237
() => {
296-
stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
297-
stream.emit(GridFSBucketWriteStream.CLOSE);
238+
stream.fileMetadata = fileMetadata;
239+
callback();
298240
},
299-
error => {
300-
return __handleError(stream, error, callback);
301-
}
241+
error => handleError(stream, error, callback)
302242
);
303-
304-
return true;
243+
return;
305244
}
306245

307-
return false;
246+
process.nextTick(callback);
308247
}
309248

310249
async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
@@ -377,11 +316,11 @@ function createFilesDoc(
377316
function doWrite(
378317
stream: GridFSBucketWriteStream,
379318
chunk: Buffer | string,
380-
encoding?: BufferEncoding,
381-
callback?: Callback<void>
382-
): boolean {
383-
if (checkAborted(stream, callback)) {
384-
return false;
319+
encoding: BufferEncoding,
320+
callback: Callback<void>
321+
): void {
322+
if (isAborted(stream, callback)) {
323+
return;
385324
}
386325

387326
const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
@@ -392,13 +331,8 @@ function doWrite(
392331
if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
393332
inputBuf.copy(stream.bufToStore, stream.pos);
394333
stream.pos += inputBuf.length;
395-
396-
callback && callback();
397-
398-
// Note that we reverse the typical semantics of write's return value
399-
// to be compatible with node's `.pipe()` function.
400-
// True means client can keep writing.
401-
return true;
334+
process.nextTick(callback);
335+
return;
402336
}
403337

404338
// Otherwise, buffer is too big for current chunk, so we need to flush
@@ -418,8 +352,8 @@ function doWrite(
418352
++stream.state.outstandingRequests;
419353
++outstandingRequests;
420354

421-
if (checkAborted(stream, callback)) {
422-
return false;
355+
if (isAborted(stream, callback)) {
356+
return;
423357
}
424358

425359
stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(
@@ -429,13 +363,10 @@ function doWrite(
429363

430364
if (!outstandingRequests) {
431365
stream.emit('drain', doc);
432-
callback && callback();
433-
checkDone(stream);
366+
checkDone(stream, callback);
434367
}
435368
},
436-
error => {
437-
return __handleError(stream, error);
438-
}
369+
error => handleError(stream, error, callback)
439370
);
440371

441372
spaceRemaining = stream.chunkSizeBytes;
@@ -445,29 +376,9 @@ function doWrite(
445376
inputBufRemaining -= numToCopy;
446377
numToCopy = Math.min(spaceRemaining, inputBufRemaining);
447378
}
448-
449-
// Note that we reverse the typical semantics of write's return value
450-
// to be compatible with node's `.pipe()` function.
451-
// False means the client should wait for the 'drain' event.
452-
return false;
453-
}
454-
455-
function waitForIndexes(
456-
stream: GridFSBucketWriteStream,
457-
callback: (res: boolean) => boolean
458-
): boolean {
459-
if (stream.bucket.s.checkedIndexes) {
460-
return callback(false);
461-
}
462-
463-
stream.bucket.once('index', () => {
464-
callback(true);
465-
});
466-
467-
return true;
468379
}
469380

470-
function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boolean {
381+
function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void {
471382
// Buffer is empty, so don't bother to insert
472383
if (stream.pos === 0) {
473384
return checkDone(stream, callback);
@@ -482,28 +393,22 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo
482393
const doc = createChunkDoc(stream.id, stream.n, remnant);
483394

484395
// If the stream was aborted, do not write remnant
485-
if (checkAborted(stream, callback)) {
486-
return false;
396+
if (isAborted(stream, callback)) {
397+
return;
487398
}
488399

489400
stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(
490401
() => {
491402
--stream.state.outstandingRequests;
492-
checkDone(stream);
403+
checkDone(stream, callback);
493404
},
494-
error => {
495-
return __handleError(stream, error);
496-
}
405+
error => handleError(stream, error, callback)
497406
);
498-
return true;
499407
}
500408

501-
function checkAborted(stream: GridFSBucketWriteStream, callback?: Callback<void>): boolean {
409+
function isAborted(stream: GridFSBucketWriteStream, callback: Callback<void>): boolean {
502410
if (stream.state.aborted) {
503-
if (typeof callback === 'function') {
504-
// TODO(NODE-3485): Replace with MongoGridFSStreamClosedError
505-
callback(new MongoAPIError('Stream has been aborted'));
506-
}
411+
process.nextTick(callback, new MongoAPIError('Stream has been aborted'));
507412
return true;
508413
}
509414
return false;

0 commit comments

Comments
 (0)