Skip to content

refactor(NODE-4904): change internal gridfs logic to use promises #3493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions src/gridfs/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,16 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS
this.push(null);
this.destroyed = true;
if (this.s.cursor) {
this.s.cursor.close(error => {
this.emit(GridFSBucketReadStream.CLOSE);
callback && callback(error);
});
this.s.cursor.close().then(
() => {
this.emit(GridFSBucketReadStream.CLOSE);
callback?.();
},
error => {
this.emit(GridFSBucketReadStream.CLOSE);
callback?.(error);
}
);
} else {
if (!this.s.init) {
// If not initialized, fire close event because we will never
Expand All @@ -215,7 +221,10 @@ function doRead(stream: GridFSBucketReadStream): void {
if (!stream.s.cursor) return;
if (!stream.s.file) return;

stream.s.cursor.next((error, doc) => {
const handleReadResult = ({
error,
doc
}: { error: Error; doc: null } | { error: null; doc: any }) => {
if (stream.destroyed) {
return;
}
Expand All @@ -226,16 +235,14 @@ function doRead(stream: GridFSBucketReadStream): void {
if (!doc) {
stream.push(null);

if (!stream.s.cursor) return;
stream.s.cursor.close(error => {
if (error) {
stream.s.cursor?.close().then(
() => {
stream.emit(GridFSBucketReadStream.CLOSE);
},
error => {
stream.emit(GridFSBucketReadStream.ERROR, error);
return;
}

stream.emit(GridFSBucketReadStream.CLOSE);
});

);
return;
}

Expand Down Expand Up @@ -308,7 +315,12 @@ function doRead(stream: GridFSBucketReadStream): void {

stream.push(buf);
return;
});
};

stream.s.cursor.next().then(
doc => handleReadResult({ error: null, doc }),
error => handleReadResult({ error, doc: null })
);
}

function init(stream: GridFSBucketReadStream): void {
Expand All @@ -323,7 +335,10 @@ function init(stream: GridFSBucketReadStream): void {
findOneOptions.skip = stream.s.options.skip;
}

stream.s.files.findOne(stream.s.filter, findOneOptions, (error, doc) => {
const handleReadResult = ({
error,
doc
}: { error: Error; doc: null } | { error: null; doc: any }) => {
if (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
}
Expand Down Expand Up @@ -388,7 +403,12 @@ function init(stream: GridFSBucketReadStream): void {

stream.emit(GridFSBucketReadStream.FILE, doc);
return;
});
};

stream.s.files.findOne(stream.s.filter, findOneOptions).then(
doc => handleReadResult({ error: null, doc }),
error => handleReadResult({ error, doc: null })
);
}

function waitForFile(stream: GridFSBucketReadStream, callback: Callback): void {
Expand Down
213 changes: 88 additions & 125 deletions src/gridfs/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,13 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
if (!this.bucket.s.calledOpenUploadStream) {
this.bucket.s.calledOpenUploadStream = true;

checkIndexes(this, () => {
this.bucket.s.checkedIndexes = true;
this.bucket.emit('index');
});
checkIndexes(this).then(
() => {
this.bucket.s.checkedIndexes = true;
this.bucket.emit('index');
},
() => null
);
}
}

Expand Down Expand Up @@ -244,54 +247,36 @@ function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk
};
}

function checkChunksIndex(stream: GridFSBucketWriteStream, callback: Callback): void {
stream.chunks.listIndexes().toArray((error?: AnyError, indexes?: Document[]) => {
let index: { files_id: number; n: number };
if (error) {
// Collection doesn't exist so create index
if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) {
index = { files_id: 1, n: 1 };
stream.chunks.createIndex(index, { background: false, unique: true }, error => {
if (error) {
return callback(error);
}

callback();
});
return;
}
return callback(error);
}
async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void> {
const index = { files_id: 1, n: 1 };

let hasChunksIndex = false;
if (indexes) {
indexes.forEach((index: Document) => {
if (index.key) {
const keys = Object.keys(index.key);
if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
hasChunksIndex = true;
}
}
});
let indexes;
try {
indexes = await stream.chunks.listIndexes().toArray();
} catch (error) {
if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) {
indexes = [];
} else {
throw error;
}
}

if (hasChunksIndex) {
callback();
} else {
index = { files_id: 1, n: 1 };
const writeConcernOptions = getWriteOptions(stream);

stream.chunks.createIndex(
index,
{
...writeConcernOptions,
background: true,
unique: true
},
callback
);
const hasChunksIndex = !!indexes.find(index => {
const keys = Object.keys(index.key);
if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
return true;
}
return false;
});

if (!hasChunksIndex) {
const writeConcernOptions = getWriteOptions(stream);
await stream.chunks.createIndex(index, {
...writeConcernOptions,
background: true,
unique: true
});
}
}

function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolean {
Expand All @@ -314,81 +299,55 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea
return false;
}

stream.files.insertOne(filesDoc, getWriteOptions(stream), (error?: AnyError) => {
if (error) {
stream.files.insertOne(filesDoc, getWriteOptions(stream)).then(
() => {
stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
stream.emit(GridFSBucketWriteStream.CLOSE);
},
error => {
return __handleError(stream, error, callback);
}
stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
stream.emit(GridFSBucketWriteStream.CLOSE);
});
);

return true;
}

return false;
}

function checkIndexes(stream: GridFSBucketWriteStream, callback: Callback): void {
stream.files.findOne({}, { projection: { _id: 1 } }, (error, doc) => {
if (error) {
return callback(error);
}
if (doc) {
return callback();
}
async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
const doc = await stream.files.findOne({}, { projection: { _id: 1 } });
if (doc != null) {
// If at least one document exists assume the collection has the required index
return;
}

stream.files.listIndexes().toArray((error?: AnyError, indexes?: Document) => {
let index: { filename: number; uploadDate: number };
if (error) {
// Collection doesn't exist so create index
if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) {
index = { filename: 1, uploadDate: 1 };
stream.files.createIndex(index, { background: false }, (error?: AnyError) => {
if (error) {
return callback(error);
}

checkChunksIndex(stream, callback);
});
return;
}
return callback(error);
}
const index = { filename: 1, uploadDate: 1 };

let hasFileIndex = false;
if (indexes) {
indexes.forEach((index: Document) => {
const keys = Object.keys(index.key);
if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
hasFileIndex = true;
}
});
}
let indexes;
try {
indexes = await stream.files.listIndexes().toArray();
} catch (error) {
if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) {
indexes = [];
} else {
throw error;
}
}

if (hasFileIndex) {
checkChunksIndex(stream, callback);
} else {
index = { filename: 1, uploadDate: 1 };

const writeConcernOptions = getWriteOptions(stream);

stream.files.createIndex(
index,
{
...writeConcernOptions,
background: false
},
(error?: AnyError) => {
if (error) {
return callback(error);
}

checkChunksIndex(stream, callback);
}
);
}
});
const hasFileIndex = !!indexes.find(index => {
const keys = Object.keys(index.key);
if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
return true;
}
return false;
});

if (!hasFileIndex) {
await stream.files.createIndex(index, { background: false });
}

await checkChunksIndex(stream);
}

function createFilesDoc(
Expand Down Expand Up @@ -471,19 +430,21 @@ function doWrite(
return false;
}

stream.chunks.insertOne(doc, getWriteOptions(stream), (error?: AnyError) => {
if (error) {
return __handleError(stream, error);
}
--stream.state.outstandingRequests;
--outstandingRequests;
stream.chunks.insertOne(doc, getWriteOptions(stream)).then(
() => {
--stream.state.outstandingRequests;
--outstandingRequests;

if (!outstandingRequests) {
stream.emit('drain', doc);
callback && callback();
checkDone(stream);
if (!outstandingRequests) {
stream.emit('drain', doc);
callback && callback();
checkDone(stream);
}
},
error => {
return __handleError(stream, error);
}
});
);

spaceRemaining = stream.chunkSizeBytes;
stream.pos = 0;
Expand Down Expand Up @@ -545,13 +506,15 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo
return false;
}

stream.chunks.insertOne(doc, getWriteOptions(stream), (error?: AnyError) => {
if (error) {
stream.chunks.insertOne(doc, getWriteOptions(stream)).then(
() => {
--stream.state.outstandingRequests;
checkDone(stream);
},
error => {
return __handleError(stream, error);
}
--stream.state.outstandingRequests;
checkDone(stream);
});
);
return true;
}

Expand Down
Empty file removed test/integration/gridfs/.gitkeep
Empty file.
2 changes: 1 addition & 1 deletion test/integration/gridfs/gridfs.spec.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { setupDatabase } = require('./../shared');
const { expect } = require('chai');
const { GridFSBucket } = require('../../../src');

describe('GridFS', function () {
describe('GridFS spec', function () {
before(function () {
return setupDatabase(this.configuration);
});
Expand Down
Loading