diff --git a/src/gridfs/download.ts b/src/gridfs/download.ts index 32946f67376..9e96151478b 100644 --- a/src/gridfs/download.ts +++ b/src/gridfs/download.ts @@ -189,10 +189,16 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS this.push(null); this.destroyed = true; if (this.s.cursor) { - this.s.cursor.close(error => { - this.emit(GridFSBucketReadStream.CLOSE); - callback && callback(error); - }); + this.s.cursor.close().then( + () => { + this.emit(GridFSBucketReadStream.CLOSE); + callback?.(); + }, + error => { + this.emit(GridFSBucketReadStream.CLOSE); + callback?.(error); + } + ); } else { if (!this.s.init) { // If not initialized, fire close event because we will never @@ -215,7 +221,10 @@ function doRead(stream: GridFSBucketReadStream): void { if (!stream.s.cursor) return; if (!stream.s.file) return; - stream.s.cursor.next((error, doc) => { + const handleReadResult = ({ + error, + doc + }: { error: Error; doc: null } | { error: null; doc: any }) => { if (stream.destroyed) { return; } @@ -226,16 +235,14 @@ function doRead(stream: GridFSBucketReadStream): void { if (!doc) { stream.push(null); - if (!stream.s.cursor) return; - stream.s.cursor.close(error => { - if (error) { + stream.s.cursor?.close().then( + () => { + stream.emit(GridFSBucketReadStream.CLOSE); + }, + error => { stream.emit(GridFSBucketReadStream.ERROR, error); - return; } - - stream.emit(GridFSBucketReadStream.CLOSE); - }); - + ); return; } @@ -308,7 +315,12 @@ function doRead(stream: GridFSBucketReadStream): void { stream.push(buf); return; - }); + }; + + stream.s.cursor.next().then( + doc => handleReadResult({ error: null, doc }), + error => handleReadResult({ error, doc: null }) + ); } function init(stream: GridFSBucketReadStream): void { @@ -323,7 +335,10 @@ function init(stream: GridFSBucketReadStream): void { findOneOptions.skip = stream.s.options.skip; } - stream.s.files.findOne(stream.s.filter, findOneOptions, (error, doc) => { + const handleReadResult = ({ + error, + doc + }: { error: Error; doc: null } | { error: null; doc: any }) => { if (error) { return stream.emit(GridFSBucketReadStream.ERROR, error); } @@ -388,7 +403,12 @@ function init(stream: GridFSBucketReadStream): void { stream.emit(GridFSBucketReadStream.FILE, doc); return; - }); + }; + + stream.s.files.findOne(stream.s.filter, findOneOptions).then( + doc => handleReadResult({ error: null, doc }), + error => handleReadResult({ error, doc: null }) + ); } function waitForFile(stream: GridFSBucketReadStream, callback: Callback): void { diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index f2597b0798b..e36c026d51a 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -105,10 +105,13 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable if (!this.bucket.s.calledOpenUploadStream) { this.bucket.s.calledOpenUploadStream = true; - checkIndexes(this, () => { - this.bucket.s.checkedIndexes = true; - this.bucket.emit('index'); - }); + checkIndexes(this).then( + () => { + this.bucket.s.checkedIndexes = true; + this.bucket.emit('index'); + }, + () => null + ); } } @@ -244,54 +247,36 @@ function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk }; } -function checkChunksIndex(stream: GridFSBucketWriteStream, callback: Callback): void { - stream.chunks.listIndexes().toArray((error?: AnyError, indexes?: Document[]) => { - let index: { files_id: number; n: number }; - if (error) { - // Collection doesn't exist so create index - if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) { - index = { files_id: 1, n: 1 }; - stream.chunks.createIndex(index, { background: false, unique: true }, error => { - if (error) { - return callback(error); - } - - callback(); - }); - return; - } - return callback(error); - } +async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise { + const index = { files_id: 1, n: 1 }; - let hasChunksIndex = false; - if (indexes) { - indexes.forEach((index: Document) => { - if (index.key) { - const keys = Object.keys(index.key); - if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) { - hasChunksIndex = true; - } - } - }); + let indexes; + try { + indexes = await stream.chunks.listIndexes().toArray(); + } catch (error) { + if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) { + indexes = []; + } else { + throw error; } + } - if (hasChunksIndex) { - callback(); - } else { - index = { files_id: 1, n: 1 }; - const writeConcernOptions = getWriteOptions(stream); - - stream.chunks.createIndex( - index, - { - ...writeConcernOptions, - background: true, - unique: true - }, - callback - ); + const hasChunksIndex = !!indexes.find(index => { + const keys = Object.keys(index.key); + if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) { + return true; } + return false; }); + + if (!hasChunksIndex) { + const writeConcernOptions = getWriteOptions(stream); + await stream.chunks.createIndex(index, { + ...writeConcernOptions, + background: true, + unique: true + }); + } } function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolean { @@ -314,13 +299,15 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea return false; } - stream.files.insertOne(filesDoc, getWriteOptions(stream), (error?: AnyError) => { - if (error) { + stream.files.insertOne(filesDoc, getWriteOptions(stream)).then( + () => { + stream.emit(GridFSBucketWriteStream.FINISH, filesDoc); + stream.emit(GridFSBucketWriteStream.CLOSE); + }, + error => { return __handleError(stream, error, callback); } - stream.emit(GridFSBucketWriteStream.FINISH, filesDoc); - stream.emit(GridFSBucketWriteStream.CLOSE); - }); + ); return true; } @@ -328,67 +315,39 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea return false; } -function checkIndexes(stream: GridFSBucketWriteStream, callback: Callback): void { - stream.files.findOne({}, { projection: { _id: 1 } }, (error, doc) => { - if (error) { - return callback(error); - } - if (doc) { - return callback(); - } +async function checkIndexes(stream: GridFSBucketWriteStream): Promise { + const doc = await stream.files.findOne({}, { projection: { _id: 1 } }); + if (doc != null) { + // If at least one document exists assume the collection has the required index + return; + } - stream.files.listIndexes().toArray((error?: AnyError, indexes?: Document) => { - let index: { filename: number; uploadDate: number }; - if (error) { - // Collection doesn't exist so create index - if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) { - index = { filename: 1, uploadDate: 1 }; - stream.files.createIndex(index, { background: false }, (error?: AnyError) => { - if (error) { - return callback(error); - } - - checkChunksIndex(stream, callback); - }); - return; - } - return callback(error); - } + const index = { filename: 1, uploadDate: 1 }; - let hasFileIndex = false; - if (indexes) { - indexes.forEach((index: Document) => { - const keys = Object.keys(index.key); - if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) { - hasFileIndex = true; - } - }); - } + let indexes; + try { + indexes = await stream.files.listIndexes().toArray(); + } catch (error) { + if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) { + indexes = []; + } else { + throw error; + } + } - if (hasFileIndex) { - checkChunksIndex(stream, callback); - } else { - index = { filename: 1, uploadDate: 1 }; - - const writeConcernOptions = getWriteOptions(stream); - - stream.files.createIndex( - index, - { - ...writeConcernOptions, - background: false - }, - (error?: AnyError) => { - if (error) { - return callback(error); - } - - checkChunksIndex(stream, callback); - } - ); - } - }); + const hasFileIndex = !!indexes.find(index => { + const keys = Object.keys(index.key); + if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) { + return true; + } + return false; }); + + if (!hasFileIndex) { + await stream.files.createIndex(index, { background: false }); + } + + await checkChunksIndex(stream); } function createFilesDoc( @@ -471,19 +430,21 @@ function doWrite( return false; } - stream.chunks.insertOne(doc, getWriteOptions(stream), (error?: AnyError) => { - if (error) { - return __handleError(stream, error); - } - --stream.state.outstandingRequests; - --outstandingRequests; + stream.chunks.insertOne(doc, getWriteOptions(stream)).then( + () => { + --stream.state.outstandingRequests; + --outstandingRequests; - if (!outstandingRequests) { - stream.emit('drain', doc); - callback && callback(); - checkDone(stream); + if (!outstandingRequests) { + stream.emit('drain', doc); + callback && callback(); + checkDone(stream); + } + }, + error => { + return __handleError(stream, error); } - }); + ); spaceRemaining = stream.chunkSizeBytes; stream.pos = 0; @@ -545,13 +506,15 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo return false; } - stream.chunks.insertOne(doc, getWriteOptions(stream), (error?: AnyError) => { - if (error) { + stream.chunks.insertOne(doc, getWriteOptions(stream)).then( + () => { + --stream.state.outstandingRequests; + checkDone(stream); + }, + error => { return __handleError(stream, error); } - --stream.state.outstandingRequests; - checkDone(stream); - }); + ); return true; } diff --git a/test/integration/gridfs/.gitkeep b/test/integration/gridfs/.gitkeep deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/test/integration/gridfs/gridfs.spec.test.js b/test/integration/gridfs/gridfs.spec.test.js index fdc2a02597f..924596fc623 100644 --- a/test/integration/gridfs/gridfs.spec.test.js +++ b/test/integration/gridfs/gridfs.spec.test.js @@ -5,7 +5,7 @@ const { setupDatabase } = require('./../shared'); const { expect } = require('chai'); const { GridFSBucket } = require('../../../src'); -describe('GridFS', function () { +describe('GridFS spec', function () { before(function () { return setupDatabase(this.configuration); }); diff --git a/test/integration/gridfs/gridfs.test.ts b/test/integration/gridfs/gridfs.test.ts new file mode 100644 index 00000000000..e90aa98cf76 --- /dev/null +++ b/test/integration/gridfs/gridfs.test.ts @@ -0,0 +1,119 @@ +import { expect } from 'chai'; +import { once } from 'events'; + +import { type Db, type MongoClient, CommandStartedEvent, GridFSBucket } from '../../../src'; +import { sleep } from '../../tools/utils'; + +describe('GridFS', () => { + let client: MongoClient; + let db: Db; + let bucket: GridFSBucket; + let commandStartedEvents: CommandStartedEvent[]; + + beforeEach(async function () { + client = this.configuration.newClient({ monitorCommands: true }); + db = client.db('gridfsTest'); + + // Reset namespace + await db.dropCollection('fs.files').catch(() => null); + await db.dropCollection('fs.chunks').catch(() => null); + await db.dropDatabase().catch(() => null); + await sleep(100); + + commandStartedEvents = []; + client.on('commandStarted', e => commandStartedEvents.push(e)); + bucket = new GridFSBucket(db); + }); + + afterEach(async function () { + commandStartedEvents = []; + await client.close(); + }); + + describe('class GridFSBucket', () => { + const assertIndexesExist = () => { + expect(bucket.s).to.have.property('checkedIndexes', true); + + const listIndexes = commandStartedEvents.filter(e => e.commandName === 'listIndexes'); + expect(listIndexes).to.have.lengthOf(2); + + const createIndexes = commandStartedEvents.filter(e => e.commandName === 'createIndexes'); + expect(createIndexes).to.have.lengthOf(2); + expect(createIndexes[0]).to.have.deep.nested.property('command.createIndexes', 'fs.files'); + expect(createIndexes[0]).to.have.deep.nested.property( + 'command.indexes[0].key', + new Map([ + ['filename', 1], + ['uploadDate', 1] + ]) + ); + expect(createIndexes[1]).to.have.deep.nested.property('command.createIndexes', 'fs.chunks'); + expect(createIndexes[1]).to.have.deep.nested.property( + 'command.indexes[0].key', + new Map([ + ['files_id', 1], + ['n', 1] + ]) + ); + }; + + it('ensures chunks and files collection have required indexes when namespace does not exist', async () => { + // Ensure the namespace does not exist; beforeEach should drop the Db, keeping this true + expect( + (await db.collections()).filter(({ namespace }) => namespace.startsWith('fs')) + ).to.have.lengthOf(0); + + const upload = bucket.openUploadStream('test.txt'); + await once(bucket, 'index'); + await upload.abort(); + + assertIndexesExist(); + }); + + it('ensures chunks and files collection have required indexes when namespace does', async () => { + // Ensure the namespace does exist + await db.createCollection('fs.files'); + await db.createCollection('fs.chunks'); + + const upload = bucket.openUploadStream('test.txt'); + await once(bucket, 'index'); + await upload.abort(); + + assertIndexesExist(); + }); + + it('skips creating required indexes if they already exist', async () => { + const files = await db.createCollection('fs.files'); + const chunks = await db.createCollection('fs.chunks'); + + await files.createIndex( + new Map([ + ['filename', 1], + ['uploadDate', 1] + ]) + ); + + await chunks.createIndex( + new Map([ + ['files_id', 1], + ['n', 1] + ]) + ); + + // reset events array + commandStartedEvents = []; + + const upload = bucket.openUploadStream('test.txt'); + await once(bucket, 'index'); + await upload.abort(); + + // Still listed indexes + const listIndexes = commandStartedEvents.filter(e => e.commandName === 'listIndexes'); + expect(listIndexes).to.have.lengthOf(2); + + // But since it found them, we didn't attempt creation + const createIndexes = commandStartedEvents.filter(e => e.commandName === 'createIndexes'); + expect(createIndexes).to.have.lengthOf(0); + }); + }); +});