Skip to content

Commit 1e3faa1

Browse files
authored
fix(hash-stream-node): support file stream in readableStreamHasher (#3338)
1 parent 15e0512 commit 1e3faa1

File tree

7 files changed

+126
-3
lines changed

7 files changed

+126
-3
lines changed

packages/hash-stream-node/src/fileStreamHasher.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { Readable } from "stream";
44

55
import { HashCalculator } from "./HashCalculator";
66

7+
// ToDo: deprecate in favor of readableStreamHasher
78
export const fileStreamHasher: StreamHasher<Readable> = (hashCtor: HashConstructor, fileStream: Readable) =>
89
new Promise((resolve, reject) => {
910
if (!isReadStream(fileStream)) {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import * as fs from "fs";
2+
3+
import { fsCreateReadStream } from "./fsCreateReadStream";
4+
5+
jest.setTimeout(30000);
6+
7+
describe(fsCreateReadStream.name, () => {
8+
const mockFileContents = fs.readFileSync(__filename, "utf8");
9+
10+
it("uses file descriptor if defined", (done) => {
11+
fs.promises.open(__filename, "r").then((fd) => {
12+
if ((fd as any).createReadStream) {
13+
const readStream = (fd as any).createReadStream();
14+
const readStreamCopy = fsCreateReadStream(readStream);
15+
16+
const chunks: Array<Buffer> = [];
17+
readStreamCopy.on("data", (chunk) => {
18+
chunks.push(chunk);
19+
});
20+
readStreamCopy.on("end", () => {
21+
const outputFileContents = Buffer.concat(chunks).toString();
22+
expect(outputFileContents).toEqual(mockFileContents);
23+
done();
24+
});
25+
} else {
26+
console.log(`Skipping createReadStream test as it's not available.`);
27+
done();
28+
}
29+
});
30+
});
31+
32+
it("uses start and end if file descriptor is not defined", (done) => {
33+
const readStream = fs.createReadStream(__filename);
34+
const readStreamCopy = fsCreateReadStream(readStream);
35+
36+
const chunks: Array<Buffer> = [];
37+
readStreamCopy.on("data", (chunk) => {
38+
chunks.push(chunk);
39+
});
40+
readStreamCopy.on("end", () => {
41+
const outputFileContents = Buffer.concat(chunks).toString();
42+
expect(outputFileContents).toEqual(mockFileContents);
43+
done();
44+
});
45+
});
46+
});
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import { createReadStream, ReadStream } from "fs";
2+
3+
export const fsCreateReadStream = (readStream: ReadStream) =>
4+
createReadStream(readStream.path, {
5+
fd: (readStream as any).fd,
6+
start: (readStream as any).start,
7+
end: (readStream as any).end,
8+
});
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { createReadStream } from "fs";
2+
import { Readable } from "stream";
3+
4+
import { isFileStream } from "./isFileStream";
5+
6+
describe(isFileStream.name, () => {
7+
describe("returns true if readablestream is fs.ReadStream", () => {
8+
it("with string path", () => {
9+
const readStream = createReadStream(__filename);
10+
expect(isFileStream(readStream)).toStrictEqual(true);
11+
});
12+
13+
it("with buffer path", () => {
14+
const readStream = createReadStream(Buffer.from(__filename, "utf-8"));
15+
expect(isFileStream(readStream)).toStrictEqual(true);
16+
});
17+
18+
it("with filehandle", async () => {
19+
const { promises } = await import("fs");
20+
const fd = await promises.open(__filename, "r");
21+
// @ts-expect-error createReadStream is added in v16.11.0
22+
if (fd.createReadStream) {
23+
// @ts-expect-error createReadStream is added in v16.11.0
24+
const readStream = fd.createReadStream();
25+
expect(isFileStream(readStream)).toStrictEqual(true);
26+
}
27+
});
28+
});
29+
30+
it("returns false if readablestream is not an fs.ReadStream", () => {
31+
const readableStream = new Readable({ read: (size) => {} });
32+
expect(isFileStream(readableStream)).toStrictEqual(false);
33+
});
34+
});
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { ReadStream } from "fs";
2+
import { Readable } from "stream";
3+
4+
export const isFileStream = (stream: Readable): stream is ReadStream =>
5+
typeof (stream as ReadStream).path === "string" ||
6+
Buffer.isBuffer((stream as ReadStream).path) ||
7+
typeof (stream as any).fd === "number";

packages/hash-stream-node/src/readableStreamHasher.spec.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
import { Hash } from "@aws-sdk/types";
2-
import { Readable, Writable, WritableOptions } from "stream";
2+
import { createReadStream } from "fs";
3+
import { Readable, Writable } from "stream";
34

5+
import { fsCreateReadStream } from "./fsCreateReadStream";
46
import { HashCalculator } from "./HashCalculator";
7+
import { isFileStream } from "./isFileStream";
58
import { readableStreamHasher } from "./readableStreamHasher";
69

10+
jest.mock("./fsCreateReadStream");
711
jest.mock("./HashCalculator");
12+
jest.mock("./isFileStream");
13+
jest.mock("fs");
814

915
describe(readableStreamHasher.name, () => {
1016
const mockDigest = jest.fn();
@@ -38,13 +44,29 @@ describe(readableStreamHasher.name, () => {
3844
(HashCalculator as unknown as jest.Mock).mockImplementation(
3945
(hash) => new MockHashCalculator(hash, mockHashCalculatorWrite, mockHashCalculatorEnd)
4046
);
47+
(isFileStream as unknown as jest.Mock).mockReturnValue(false);
4148
mockDigest.mockResolvedValue(mockHash);
4249
});
4350

4451
afterEach(() => {
4552
jest.clearAllMocks();
4653
});
4754

55+
it("creates a copy in case of fileStream", () => {
56+
(fsCreateReadStream as jest.Mock).mockReturnValue(
57+
new Readable({
58+
read: (size) => {},
59+
})
60+
);
61+
(isFileStream as unknown as jest.Mock).mockReturnValue(true);
62+
63+
const fsReadStream = createReadStream(__filename);
64+
readableStreamHasher(mockHashCtor, fsReadStream);
65+
66+
expect(isFileStream).toHaveBeenCalledWith(fsReadStream);
67+
expect(fsCreateReadStream).toHaveBeenCalledWith(fsReadStream);
68+
});
69+
4870
it("computes hash for a readable stream", async () => {
4971
const readableStream = new Readable({
5072
read: (size) => {},

packages/hash-stream-node/src/readableStreamHasher.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
import { HashConstructor, StreamHasher } from "@aws-sdk/types";
22
import { Readable } from "stream";
33

4+
import { fsCreateReadStream } from "./fsCreateReadStream";
45
import { HashCalculator } from "./HashCalculator";
6+
import { isFileStream } from "./isFileStream";
57

68
export const readableStreamHasher: StreamHasher<Readable> = (hashCtor: HashConstructor, readableStream: Readable) => {
9+
// ToDo: throw if readableStream is already flowing and it's copy can't be created.
10+
const streamToPipe = isFileStream(readableStream) ? fsCreateReadStream(readableStream) : readableStream;
11+
712
const hash = new hashCtor();
813
const hashCalculator = new HashCalculator(hash);
9-
readableStream.pipe(hashCalculator);
14+
streamToPipe.pipe(hashCalculator);
1015

1116
return new Promise((resolve, reject) => {
12-
readableStream.on("error", (err: Error) => {
17+
streamToPipe.on("error", (err: Error) => {
1318
// if the source errors, the destination stream needs to manually end
1419
hashCalculator.end();
1520
reject(err);

0 commit comments

Comments
 (0)