Skip to content

Commit 9cfa2af

Browse files
nbbeekendurran
andauthored
refactor(NODE-4904): change internal gridfs logic to use promises (#3493)
Co-authored-by: Durran Jordan <durran@gmail.com>
1 parent b9e4c7c commit 9cfa2af

File tree

5 files changed

+244
-142
lines changed

5 files changed

+244
-142
lines changed

src/gridfs/download.ts

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,16 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS
189189
this.push(null);
190190
this.destroyed = true;
191191
if (this.s.cursor) {
192-
this.s.cursor.close(error => {
193-
this.emit(GridFSBucketReadStream.CLOSE);
194-
callback && callback(error);
195-
});
192+
this.s.cursor.close().then(
193+
() => {
194+
this.emit(GridFSBucketReadStream.CLOSE);
195+
callback?.();
196+
},
197+
error => {
198+
this.emit(GridFSBucketReadStream.CLOSE);
199+
callback?.(error);
200+
}
201+
);
196202
} else {
197203
if (!this.s.init) {
198204
// If not initialized, fire close event because we will never
@@ -215,7 +221,10 @@ function doRead(stream: GridFSBucketReadStream): void {
215221
if (!stream.s.cursor) return;
216222
if (!stream.s.file) return;
217223

218-
stream.s.cursor.next((error, doc) => {
224+
const handleReadResult = ({
225+
error,
226+
doc
227+
}: { error: Error; doc: null } | { error: null; doc: any }) => {
219228
if (stream.destroyed) {
220229
return;
221230
}
@@ -226,16 +235,14 @@ function doRead(stream: GridFSBucketReadStream): void {
226235
if (!doc) {
227236
stream.push(null);
228237

229-
if (!stream.s.cursor) return;
230-
stream.s.cursor.close(error => {
231-
if (error) {
238+
stream.s.cursor?.close().then(
239+
() => {
240+
stream.emit(GridFSBucketReadStream.CLOSE);
241+
},
242+
error => {
232243
stream.emit(GridFSBucketReadStream.ERROR, error);
233-
return;
234244
}
235-
236-
stream.emit(GridFSBucketReadStream.CLOSE);
237-
});
238-
245+
);
239246
return;
240247
}
241248

@@ -308,7 +315,12 @@ function doRead(stream: GridFSBucketReadStream): void {
308315

309316
stream.push(buf);
310317
return;
311-
});
318+
};
319+
320+
stream.s.cursor.next().then(
321+
doc => handleReadResult({ error: null, doc }),
322+
error => handleReadResult({ error, doc: null })
323+
);
312324
}
313325

314326
function init(stream: GridFSBucketReadStream): void {
@@ -323,7 +335,10 @@ function init(stream: GridFSBucketReadStream): void {
323335
findOneOptions.skip = stream.s.options.skip;
324336
}
325337

326-
stream.s.files.findOne(stream.s.filter, findOneOptions, (error, doc) => {
338+
const handleReadResult = ({
339+
error,
340+
doc
341+
}: { error: Error; doc: null } | { error: null; doc: any }) => {
327342
if (error) {
328343
return stream.emit(GridFSBucketReadStream.ERROR, error);
329344
}
@@ -388,7 +403,12 @@ function init(stream: GridFSBucketReadStream): void {
388403

389404
stream.emit(GridFSBucketReadStream.FILE, doc);
390405
return;
391-
});
406+
};
407+
408+
stream.s.files.findOne(stream.s.filter, findOneOptions).then(
409+
doc => handleReadResult({ error: null, doc }),
410+
error => handleReadResult({ error, doc: null })
411+
);
392412
}
393413

394414
function waitForFile(stream: GridFSBucketReadStream, callback: Callback): void {

src/gridfs/upload.ts

Lines changed: 88 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,13 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
105105
if (!this.bucket.s.calledOpenUploadStream) {
106106
this.bucket.s.calledOpenUploadStream = true;
107107

108-
checkIndexes(this, () => {
109-
this.bucket.s.checkedIndexes = true;
110-
this.bucket.emit('index');
111-
});
108+
checkIndexes(this).then(
109+
() => {
110+
this.bucket.s.checkedIndexes = true;
111+
this.bucket.emit('index');
112+
},
113+
() => null
114+
);
112115
}
113116
}
114117

@@ -244,54 +247,36 @@ function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk
244247
};
245248
}
246249

247-
function checkChunksIndex(stream: GridFSBucketWriteStream, callback: Callback): void {
248-
stream.chunks.listIndexes().toArray((error?: AnyError, indexes?: Document[]) => {
249-
let index: { files_id: number; n: number };
250-
if (error) {
251-
// Collection doesn't exist so create index
252-
if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) {
253-
index = { files_id: 1, n: 1 };
254-
stream.chunks.createIndex(index, { background: false, unique: true }, error => {
255-
if (error) {
256-
return callback(error);
257-
}
258-
259-
callback();
260-
});
261-
return;
262-
}
263-
return callback(error);
264-
}
250+
async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void> {
251+
const index = { files_id: 1, n: 1 };
265252

266-
let hasChunksIndex = false;
267-
if (indexes) {
268-
indexes.forEach((index: Document) => {
269-
if (index.key) {
270-
const keys = Object.keys(index.key);
271-
if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
272-
hasChunksIndex = true;
273-
}
274-
}
275-
});
253+
let indexes;
254+
try {
255+
indexes = await stream.chunks.listIndexes().toArray();
256+
} catch (error) {
257+
if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) {
258+
indexes = [];
259+
} else {
260+
throw error;
276261
}
262+
}
277263

278-
if (hasChunksIndex) {
279-
callback();
280-
} else {
281-
index = { files_id: 1, n: 1 };
282-
const writeConcernOptions = getWriteOptions(stream);
283-
284-
stream.chunks.createIndex(
285-
index,
286-
{
287-
...writeConcernOptions,
288-
background: true,
289-
unique: true
290-
},
291-
callback
292-
);
264+
const hasChunksIndex = !!indexes.find(index => {
265+
const keys = Object.keys(index.key);
266+
if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
267+
return true;
293268
}
269+
return false;
294270
});
271+
272+
if (!hasChunksIndex) {
273+
const writeConcernOptions = getWriteOptions(stream);
274+
await stream.chunks.createIndex(index, {
275+
...writeConcernOptions,
276+
background: true,
277+
unique: true
278+
});
279+
}
295280
}
296281

297282
function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolean {
@@ -314,81 +299,55 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea
314299
return false;
315300
}
316301

317-
stream.files.insertOne(filesDoc, getWriteOptions(stream), (error?: AnyError) => {
318-
if (error) {
302+
stream.files.insertOne(filesDoc, getWriteOptions(stream)).then(
303+
() => {
304+
stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
305+
stream.emit(GridFSBucketWriteStream.CLOSE);
306+
},
307+
error => {
319308
return __handleError(stream, error, callback);
320309
}
321-
stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
322-
stream.emit(GridFSBucketWriteStream.CLOSE);
323-
});
310+
);
324311

325312
return true;
326313
}
327314

328315
return false;
329316
}
330317

331-
function checkIndexes(stream: GridFSBucketWriteStream, callback: Callback): void {
332-
stream.files.findOne({}, { projection: { _id: 1 } }, (error, doc) => {
333-
if (error) {
334-
return callback(error);
335-
}
336-
if (doc) {
337-
return callback();
338-
}
318+
async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
319+
const doc = await stream.files.findOne({}, { projection: { _id: 1 } });
320+
if (doc != null) {
321+
// If at least one document exists assume the collection has the required index
322+
return;
323+
}
339324

340-
stream.files.listIndexes().toArray((error?: AnyError, indexes?: Document) => {
341-
let index: { filename: number; uploadDate: number };
342-
if (error) {
343-
// Collection doesn't exist so create index
344-
if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) {
345-
index = { filename: 1, uploadDate: 1 };
346-
stream.files.createIndex(index, { background: false }, (error?: AnyError) => {
347-
if (error) {
348-
return callback(error);
349-
}
350-
351-
checkChunksIndex(stream, callback);
352-
});
353-
return;
354-
}
355-
return callback(error);
356-
}
325+
const index = { filename: 1, uploadDate: 1 };
357326

358-
let hasFileIndex = false;
359-
if (indexes) {
360-
indexes.forEach((index: Document) => {
361-
const keys = Object.keys(index.key);
362-
if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
363-
hasFileIndex = true;
364-
}
365-
});
366-
}
327+
let indexes;
328+
try {
329+
indexes = await stream.files.listIndexes().toArray();
330+
} catch (error) {
331+
if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) {
332+
indexes = [];
333+
} else {
334+
throw error;
335+
}
336+
}
367337

368-
if (hasFileIndex) {
369-
checkChunksIndex(stream, callback);
370-
} else {
371-
index = { filename: 1, uploadDate: 1 };
372-
373-
const writeConcernOptions = getWriteOptions(stream);
374-
375-
stream.files.createIndex(
376-
index,
377-
{
378-
...writeConcernOptions,
379-
background: false
380-
},
381-
(error?: AnyError) => {
382-
if (error) {
383-
return callback(error);
384-
}
385-
386-
checkChunksIndex(stream, callback);
387-
}
388-
);
389-
}
390-
});
338+
const hasFileIndex = !!indexes.find(index => {
339+
const keys = Object.keys(index.key);
340+
if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
341+
return true;
342+
}
343+
return false;
391344
});
345+
346+
if (!hasFileIndex) {
347+
await stream.files.createIndex(index, { background: false });
348+
}
349+
350+
await checkChunksIndex(stream);
392351
}
393352

394353
function createFilesDoc(
@@ -471,19 +430,21 @@ function doWrite(
471430
return false;
472431
}
473432

474-
stream.chunks.insertOne(doc, getWriteOptions(stream), (error?: AnyError) => {
475-
if (error) {
476-
return __handleError(stream, error);
477-
}
478-
--stream.state.outstandingRequests;
479-
--outstandingRequests;
433+
stream.chunks.insertOne(doc, getWriteOptions(stream)).then(
434+
() => {
435+
--stream.state.outstandingRequests;
436+
--outstandingRequests;
480437

481-
if (!outstandingRequests) {
482-
stream.emit('drain', doc);
483-
callback && callback();
484-
checkDone(stream);
438+
if (!outstandingRequests) {
439+
stream.emit('drain', doc);
440+
callback && callback();
441+
checkDone(stream);
442+
}
443+
},
444+
error => {
445+
return __handleError(stream, error);
485446
}
486-
});
447+
);
487448

488449
spaceRemaining = stream.chunkSizeBytes;
489450
stream.pos = 0;
@@ -545,13 +506,15 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo
545506
return false;
546507
}
547508

548-
stream.chunks.insertOne(doc, getWriteOptions(stream), (error?: AnyError) => {
549-
if (error) {
509+
stream.chunks.insertOne(doc, getWriteOptions(stream)).then(
510+
() => {
511+
--stream.state.outstandingRequests;
512+
checkDone(stream);
513+
},
514+
error => {
550515
return __handleError(stream, error);
551516
}
552-
--stream.state.outstandingRequests;
553-
checkDone(stream);
554-
});
517+
);
555518
return true;
556519
}
557520

test/integration/gridfs/.gitkeep

Whitespace-only changes.

test/integration/gridfs/gridfs.spec.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const { setupDatabase } = require('./../shared');
55
const { expect } = require('chai');
66
const { GridFSBucket } = require('../../../src');
77

8-
describe('GridFS', function () {
8+
describe('GridFS spec', function () {
99
before(function () {
1010
return setupDatabase(this.configuration);
1111
});

0 commit comments

Comments
 (0)