Skip to content

Commit 7955610

Browse files
nbbeekendurran
andauthored
fix(NODE-4788)!: use implementer Writable methods for GridFSBucketWriteStream (#3808)
Co-authored-by: Durran Jordan <durran@gmail.com>
1 parent 2fbb715 commit 7955610

File tree

5 files changed

+215
-575
lines changed

5 files changed

+215
-575
lines changed

src/gridfs/download.ts

Lines changed: 16 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -78,35 +78,15 @@ export interface GridFSBucketReadStreamPrivate {
7878
* Do not instantiate this class directly. Use `openDownloadStream()` instead.
7979
* @public
8080
*/
81-
export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableStream {
81+
export class GridFSBucketReadStream extends Readable {
8282
/** @internal */
8383
s: GridFSBucketReadStreamPrivate;
8484

85-
/**
86-
* An error occurred
87-
* @event
88-
*/
89-
static readonly ERROR = 'error' as const;
9085
/**
9186
* Fires when the stream loaded the file document corresponding to the provided id.
9287
* @event
9388
*/
9489
static readonly FILE = 'file' as const;
95-
/**
96-
* Emitted when a chunk of data is available to be consumed.
97-
* @event
98-
*/
99-
static readonly DATA = 'data' as const;
100-
/**
101-
* Fired when the stream is exhausted (no more data events).
102-
* @event
103-
*/
104-
static readonly END = 'end' as const;
105-
/**
106-
* Fired when the stream is exhausted and the underlying cursor is killed
107-
* @event
108-
*/
109-
static readonly CLOSE = 'close' as const;
11090

11191
/**
11292
* @param chunks - Handle for chunks collection
@@ -122,7 +102,7 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS
122102
filter: Document,
123103
options?: GridFSBucketReadStreamOptions
124104
) {
125-
super();
105+
super({ emitClose: true });
126106
this.s = {
127107
bytesToTrim: 0,
128108
bytesToSkip: 0,
@@ -185,20 +165,8 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS
185165
*/
186166
async abort(): Promise<void> {
187167
this.push(null);
188-
this.destroyed = true;
189-
if (this.s.cursor) {
190-
try {
191-
await this.s.cursor.close();
192-
} finally {
193-
this.emit(GridFSBucketReadStream.CLOSE);
194-
}
195-
} else {
196-
if (!this.s.init) {
197-
// If not initialized, fire close event because we will never
198-
// get a cursor
199-
this.emit(GridFSBucketReadStream.CLOSE);
200-
}
201-
}
168+
this.destroy();
169+
await this.s.cursor?.close();
202170
}
203171
}
204172

@@ -221,19 +189,15 @@ function doRead(stream: GridFSBucketReadStream): void {
221189
return;
222190
}
223191
if (error) {
224-
stream.emit(GridFSBucketReadStream.ERROR, error);
192+
stream.destroy(error);
225193
return;
226194
}
227195
if (!doc) {
228196
stream.push(null);
229197

230198
stream.s.cursor?.close().then(
231-
() => {
232-
stream.emit(GridFSBucketReadStream.CLOSE);
233-
},
234-
error => {
235-
stream.emit(GridFSBucketReadStream.ERROR, error);
236-
}
199+
() => null,
200+
error => stream.destroy(error)
237201
);
238202
return;
239203
}
@@ -244,17 +208,15 @@ function doRead(stream: GridFSBucketReadStream): void {
244208
const expectedN = stream.s.expected++;
245209
const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining);
246210
if (doc.n > expectedN) {
247-
return stream.emit(
248-
GridFSBucketReadStream.ERROR,
211+
return stream.destroy(
249212
new MongoGridFSChunkError(
250213
`ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}`
251214
)
252215
);
253216
}
254217

255218
if (doc.n < expectedN) {
256-
return stream.emit(
257-
GridFSBucketReadStream.ERROR,
219+
return stream.destroy(
258220
new MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`)
259221
);
260222
}
@@ -263,16 +225,14 @@ function doRead(stream: GridFSBucketReadStream): void {
263225

264226
if (buf.byteLength !== expectedLength) {
265227
if (bytesRemaining <= 0) {
266-
return stream.emit(
267-
GridFSBucketReadStream.ERROR,
228+
return stream.destroy(
268229
new MongoGridFSChunkError(
269230
`ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes`
270231
)
271232
);
272233
}
273234

274-
return stream.emit(
275-
GridFSBucketReadStream.ERROR,
235+
return stream.destroy(
276236
new MongoGridFSChunkError(
277237
`ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}`
278238
)
@@ -332,7 +292,7 @@ function init(stream: GridFSBucketReadStream): void {
332292
doc
333293
}: { error: Error; doc: null } | { error: null; doc: any }) => {
334294
if (error) {
335-
return stream.emit(GridFSBucketReadStream.ERROR, error);
295+
return stream.destroy(error);
336296
}
337297

338298
if (!doc) {
@@ -343,7 +303,7 @@ function init(stream: GridFSBucketReadStream): void {
343303
// TODO(NODE-3483)
344304
const err = new MongoRuntimeError(errmsg);
345305
err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor
346-
return stream.emit(GridFSBucketReadStream.ERROR, err);
306+
return stream.destroy(err);
347307
}
348308

349309
// If document is empty, kill the stream immediately and don't
@@ -357,14 +317,14 @@ function init(stream: GridFSBucketReadStream): void {
357317
// If user destroys the stream before we have a cursor, wait
358318
// until the query is done to say we're 'closed' because we can't
359319
// cancel a query.
360-
stream.emit(GridFSBucketReadStream.CLOSE);
320+
stream.destroy();
361321
return;
362322
}
363323

364324
try {
365325
stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options);
366326
} catch (error) {
367-
return stream.emit(GridFSBucketReadStream.ERROR, error);
327+
return stream.destroy(error);
368328
}
369329

370330
const filter: Document = { files_id: doc._id };
@@ -390,7 +350,7 @@ function init(stream: GridFSBucketReadStream): void {
390350
try {
391351
stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options);
392352
} catch (error) {
393-
return stream.emit(GridFSBucketReadStream.ERROR, error);
353+
return stream.destroy(error);
394354
}
395355

396356
stream.emit(GridFSBucketReadStream.FILE, doc);

0 commit comments

Comments
 (0)