Skip to content

Commit 9b6203f

Browse files
authored
fix: usage with readble-stream (#333)
The readable-stream module has different timings to using plain generators - it turns out that detection of the end of the walkDag method was being done incorrectly - instead of waiting for it's promise to resolve we should wait for the output queue to finish.
1 parent 5cc0ff2 commit 9b6203f

File tree

8 files changed

+137
-48
lines changed

8 files changed

+137
-48
lines changed

packages/ipfs-unixfs-exporter/package.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@
154154
"uint8arrays": "^4.0.2"
155155
},
156156
"devDependencies": {
157+
"@types/readable-stream": "^2.3.15",
157158
"@types/sinon": "^10.0.0",
158159
"aegir": "^38.1.2",
159160
"blockstore-core": "^4.0.1",
@@ -163,12 +164,15 @@
163164
"it-all": "^3.0.2",
164165
"it-buffer-stream": "^3.0.0",
165166
"it-first": "^3.0.2",
167+
"it-to-buffer": "^4.0.2",
166168
"merge-options": "^3.0.4",
169+
"readable-stream": "^4.4.0",
167170
"sinon": "^15.0.0",
168171
"wherearewe": "^2.0.1"
169172
},
170173
"browser": {
171-
"fs": false
174+
"fs": false,
175+
"readable-stream": false
172176
},
173177
"typedoc": {
174178
"entryPoint": "./src/index.ts"

packages/ipfs-unixfs-exporter/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ export interface Exportable<T> {
5959
cid: CID
6060
depth: number
6161
size: bigint
62-
content: (options?: ExporterOptions) => AsyncIterable<T>
62+
content: (options?: ExporterOptions) => AsyncGenerator<T, void, unknown>
6363
}
6464

6565
export interface UnixFSFile extends Exportable<Uint8Array> {

packages/ipfs-unixfs-exporter/src/resolvers/identity.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ import { CustomProgressEvent } from 'progress-events'
88
const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator<Uint8Array, void, undefined>) => {
99
async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
1010
const {
11-
offset,
12-
length
11+
start,
12+
end
1313
} = validateOffsetAndLength(node.length, options.offset, options.length)
1414

15-
const buf = extractDataFromBlock(node, 0n, offset, offset + length)
15+
const buf = extractDataFromBlock(node, 0n, start, end)
1616

1717
options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:identity', {
1818
bytesRead: BigInt(buf.byteLength),
19-
totalBytes: length - offset,
19+
totalBytes: end - start,
2020
fileSize: BigInt(node.byteLength)
2121
}))
2222

packages/ipfs-unixfs-exporter/src/resolvers/raw.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ import { CustomProgressEvent } from 'progress-events'
77
const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator<Uint8Array, void, undefined>) => {
88
async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
99
const {
10-
offset,
11-
length
10+
start,
11+
end
1212
} = validateOffsetAndLength(node.length, options.offset, options.length)
1313

14-
const buf = extractDataFromBlock(node, 0n, offset, offset + length)
14+
const buf = extractDataFromBlock(node, 0n, start, end)
1515

1616
options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:raw', {
1717
bytesRead: BigInt(buf.byteLength),
18-
totalBytes: length - offset,
18+
totalBytes: end - start,
1919
fileSize: BigInt(node.byteLength)
2020
}))
2121

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import { CustomProgressEvent } from 'progress-events'
1515
async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8Array, queue: Pushable<Uint8Array>, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise<void> {
1616
// a `raw` node
1717
if (node instanceof Uint8Array) {
18-
queue.push(extractDataFromBlock(node, streamPosition, start, end))
18+
const buf = extractDataFromBlock(node, streamPosition, start, end)
19+
20+
queue.push(buf)
1921

2022
return
2123
}
@@ -123,6 +125,10 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A
123125
}
124126
}
125127
)
128+
129+
if (streamPosition >= end) {
130+
queue.end()
131+
}
126132
}
127133

128134
const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => {
@@ -134,34 +140,23 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
134140
}
135141

136142
const {
137-
offset,
138-
length
143+
start,
144+
end
139145
} = validateOffsetAndLength(fileSize, options.offset, options.length)
140146

141-
if (length === 0n) {
147+
if (end === 0n) {
142148
return
143149
}
144150

145151
let read = 0n
146-
const wanted = length - offset
152+
const wanted = end - start
147153
const queue = pushable()
148154

149155
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
150156
cid
151157
}))
152158

153-
void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options)
154-
.then(() => {
155-
if (read < wanted) {
156-
throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ')
157-
}
158-
159-
if (read > wanted) {
160-
throw errCode(new Error('Read too many bytes - the file size reported by the UnixFS data in the root node may be incorrect'), 'ERR_OVER_READ')
161-
}
162-
163-
queue.end()
164-
})
159+
void walkDAG(blockstore, node, queue, 0n, start, end, options)
165160
.catch(err => {
166161
queue.end(err)
167162
})
@@ -173,7 +168,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
173168

174169
read += BigInt(buf.byteLength)
175170

176-
if (read === length) {
171+
if (read > wanted) {
172+
queue.end()
173+
throw errCode(new Error('Read too many bytes - the file size reported by the UnixFS data in the root node may be incorrect'), 'ERR_OVER_READ')
174+
}
175+
176+
if (read === wanted) {
177177
queue.end()
178178
}
179179

@@ -185,6 +185,10 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
185185

186186
yield buf
187187
}
188+
189+
if (read < wanted) {
190+
throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ')
191+
}
188192
}
189193

190194
return yieldFileContent

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@ const rawContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, b
1212
const size = unixfs.data.length
1313

1414
const {
15-
offset,
16-
length
15+
start,
16+
end
1717
} = validateOffsetAndLength(size, options.offset, options.length)
1818

1919
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:raw', {
2020
cid
2121
}))
2222

23-
const buf = extractDataFromBlock(unixfs.data, 0n, offset, offset + length)
23+
const buf = extractDataFromBlock(unixfs.data, 0n, start, end)
2424

2525
options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:unixfs:raw', {
2626
bytesRead: BigInt(buf.byteLength),
27-
totalBytes: length - offset,
27+
totalBytes: end - start,
2828
fileSize: BigInt(unixfs.data.byteLength)
2929
}))
3030

packages/ipfs-unixfs-exporter/src/utils/validate-offset-and-length.ts

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,37 @@
11
import errCode from 'err-code'
22

3-
const validateOffsetAndLength = (size: number | bigint, offset: number | bigint = 0, length: number | bigint = size): { offset: bigint, length: bigint } => {
4-
offset = BigInt(offset ?? 0)
5-
length = BigInt(length ?? size)
3+
const validateOffsetAndLength = (size: number | bigint, offset: number | bigint = 0, length: number | bigint = size): { start: bigint, end: bigint } => {
4+
const fileSize = BigInt(size)
5+
const start = BigInt(offset ?? 0)
6+
let end = BigInt(length)
67

7-
if (offset == null) {
8-
offset = 0n
8+
if (end !== fileSize) {
9+
end = start + end
910
}
1011

11-
if (offset < 0n) {
12-
throw errCode(new Error('Offset must be greater than or equal to 0'), 'ERR_INVALID_PARAMS')
12+
if (end > fileSize) {
13+
end = fileSize
1314
}
1415

15-
if (offset > size) {
16-
throw errCode(new Error('Offset must be less than the file size'), 'ERR_INVALID_PARAMS')
16+
if (start < 0n) {
17+
throw errCode(new Error('Offset must be greater than or equal to 0'), 'ERR_INVALID_PARAMS')
1718
}
1819

19-
if (length == null) {
20-
length = BigInt(size) - offset
20+
if (start > fileSize) {
21+
throw errCode(new Error('Offset must be less than the file size'), 'ERR_INVALID_PARAMS')
2122
}
2223

23-
if (length < 0n) {
24+
if (end < 0n) {
2425
throw errCode(new Error('Length must be greater than or equal to 0'), 'ERR_INVALID_PARAMS')
2526
}
2627

27-
if (offset + length > size) {
28-
length = BigInt(size) - offset
28+
if (end > fileSize) {
29+
throw errCode(new Error('Length must be less than the file size'), 'ERR_INVALID_PARAMS')
2930
}
3031

3132
return {
32-
offset,
33-
length
33+
start,
34+
end
3435
}
3536
}
3637

packages/ipfs-unixfs-exporter/test/exporter.spec.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import type { Blockstore } from 'interface-blockstore'
2525
import { balanced, FileLayout, flat, trickle } from 'ipfs-unixfs-importer/layout'
2626
import type { Chunker } from 'ipfs-unixfs-importer/chunker'
2727
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
28+
import toBuffer from 'it-to-buffer'
29+
import { Readable } from 'readable-stream'
30+
import { isNode } from 'wherearewe'
2831

2932
const ONE_MEG = Math.pow(1024, 2)
3033

@@ -1229,4 +1232,81 @@ describe('exporter', () => {
12291232
signal: abortController.signal
12301233
})).to.eventually.be.rejectedWith(message)
12311234
})
1235+
1236+
it('should support being used with readable-stream', async () => {
1237+
if (!isNode) {
1238+
// node-only test
1239+
return
1240+
}
1241+
1242+
let dataSizeInBytes = 10
1243+
1244+
// iterate through order of magnitude in size until hitting 10MB
1245+
while (dataSizeInBytes <= 10_000_000) {
1246+
const bytes = await toBuffer(randomBytes(dataSizeInBytes))
1247+
1248+
// chunk up the bytes to simulate a more real-world like behavior
1249+
const chunkLength = 100_000
1250+
let currentIndex = 0
1251+
1252+
const readableStream = new Readable({
1253+
read (): void {
1254+
// if this is the last chunk
1255+
if (currentIndex + chunkLength > bytes.length) {
1256+
this.push(bytes.subarray(currentIndex))
1257+
this.push(null)
1258+
} else {
1259+
this.push(bytes.subarray(currentIndex, currentIndex + chunkLength))
1260+
1261+
currentIndex = currentIndex + chunkLength
1262+
}
1263+
}
1264+
})
1265+
1266+
const result = await last(importer([{
1267+
content: readableStream
1268+
}], block))
1269+
1270+
if (result == null) {
1271+
throw new Error('Import failed')
1272+
}
1273+
1274+
const file = await exporter(result.cid, block)
1275+
const contentIterator = file.content()
1276+
1277+
const readableStreamToBytes = async (readableStream: Readable): Promise<Uint8Array> => {
1278+
return await new Promise((resolve, reject) => {
1279+
const chunks: any[] = []
1280+
readableStream.on('data', chunk => {
1281+
chunks.push(chunk)
1282+
})
1283+
1284+
readableStream.on('end', () => {
1285+
const uint8Array = uint8ArrayConcat(chunks)
1286+
resolve(uint8Array)
1287+
})
1288+
1289+
readableStream.on('error', reject)
1290+
})
1291+
}
1292+
1293+
const dataStream = new Readable({
1294+
async read (): Promise<void> {
1295+
const result = await contentIterator.next()
1296+
if (result.done === true) {
1297+
this.push(null) // end the stream
1298+
} else {
1299+
this.push(result.value)
1300+
}
1301+
}
1302+
})
1303+
1304+
const data = await readableStreamToBytes(dataStream)
1305+
1306+
expect(data.byteLength).to.equal(dataSizeInBytes)
1307+
expect(data).to.equalBytes(bytes)
1308+
1309+
dataSizeInBytes *= 10
1310+
}
1311+
})
12321312
})

0 commit comments

Comments
 (0)