Skip to content

Commit 295077e

Browse files
achingbrainrvagg
andauthored
feat: add blockReadConcurrency option to exporter (#361)
By default we attempt to load all siblings in a given layer of a DAG at once to allow slow/async loading routines extra time to fetch data before it is needed. Some blockstores (e.g. CAR files) require the exporter to only request the next sequential CID in a DAG. Add a `blockReadConcurrency` option (named similarly to the importer's `blockWriteConcurrency` option) to control this behaviour. Fixes #359 --------- Co-authored-by: Rod Vagg <rod@vagg.org>
1 parent eb358b2 commit 295077e

File tree

6 files changed

+272
-4
lines changed

6 files changed

+272
-4
lines changed

packages/ipfs-unixfs-exporter/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
"iso-random-stream": "^2.0.2",
7979
"it-all": "^3.0.2",
8080
"it-buffer-stream": "^3.0.0",
81+
"it-drain": "^3.0.5",
8182
"it-first": "^3.0.2",
8283
"it-to-buffer": "^4.0.2",
8384
"merge-options": "^3.0.4",

packages/ipfs-unixfs-exporter/src/index.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,40 @@ export type ExporterProgressEvents =
9494
ProgressEvent<'unixfs:exporter:walk:raw', ExportWalk>
9595

9696
export interface ExporterOptions extends ProgressOptions<ExporterProgressEvents> {
97+
/**
98+
* An optional offset to start reading at.
99+
*
100+
* If the CID resolves to a file this will be a byte offset within that file,
101+
* otherwise if it's a directory it will be a directory entry offset within
102+
* the directory listing. (default: undefined)
103+
*/
97104
offset?: number
105+
106+
/**
107+
* An optional length to read.
108+
*
109+
* If the CID resolves to a file this will be the number of bytes read from
110+
* the file, otherwise if it's a directory it will be the number of directory
111+
* entries read from the directory listing. (default: undefined)
112+
*/
98113
length?: number
114+
115+
/**
116+
* This signal can be used to abort any long-lived operations such as fetching
117+
* blocks from the network. (default: undefined)
118+
*/
99119
signal?: AbortSignal
120+
121+
/**
122+
* When a DAG layer is encountered, all child nodes are loaded in parallel but
123+
* processed as they arrive. This allows us to load sibling nodes in advance
124+
* of yielding their bytes. Pass a value here to control the number of blocks
125+
* loaded in parallel. If a strict depth-first traversal is required, this
126+
* value should be set to `1`, otherwise the traversal order will tend to
127+
* resemble a breadth-first fan-out and yield a have stable ordering.
128+
* (default: undefined)
129+
*/
130+
blockReadConcurrency?: number
100131
}
101132

102133
export interface Exportable<T> {
@@ -143,6 +174,8 @@ export interface Exportable<T> {
143174
size: bigint
144175

145176
/**
177+
* @example File content
178+
*
146179
* When `entry` is a file or a `raw` node, `offset` and/or `length` arguments can be passed to `entry.content()` to return slices of data:
147180
*
148181
* ```javascript
@@ -162,6 +195,8 @@ export interface Exportable<T> {
162195
* return data
163196
* ```
164197
*
198+
* @example Directory content
199+
*
165200
* If `entry` is a directory, passing `offset` and/or `length` to `entry.content()` will limit the number of files returned from the directory.
166201
*
167202
* ```javascript
@@ -176,7 +211,6 @@ export interface Exportable<T> {
176211
*
177212
* // `entries` contains the first 5 files/directories in the directory
178213
* ```
179-
*
180214
*/
181215
content(options?: ExporterOptions): AsyncGenerator<T, void, unknown>
182216
}

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, de
2525
return result.entry
2626
}
2727
}),
28-
source => parallel(source, { ordered: true }),
28+
source => parallel(source, {
29+
ordered: true,
30+
concurrency: options.blockReadConcurrency
31+
}),
2932
source => filter(source, entry => entry != null)
3033
)
3134
}

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A
8484
}
8585
}),
8686
(source) => parallel(source, {
87-
ordered: true
87+
ordered: true,
88+
concurrency: options.blockReadConcurrency
8889
}),
8990
async (source) => {
9091
for await (const { link, block, blockStart } of source) {

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de
6262
}
6363
}
6464
}),
65-
source => parallel(source, { ordered: true })
65+
source => parallel(source, {
66+
ordered: true,
67+
concurrency: options.blockReadConcurrency
68+
})
6669
)
6770

6871
for await (const { entries } of results) {

packages/ipfs-unixfs-exporter/test/exporter.spec.ts

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { fixedSize } from 'ipfs-unixfs-importer/chunker'
1212
import { balanced, type FileLayout, flat, trickle } from 'ipfs-unixfs-importer/layout'
1313
import all from 'it-all'
1414
import randomBytes from 'it-buffer-stream'
15+
import drain from 'it-drain'
1516
import first from 'it-first'
1617
import last from 'it-last'
1718
import toBuffer from 'it-to-buffer'
@@ -20,6 +21,7 @@ import * as raw from 'multiformats/codecs/raw'
2021
import { identity } from 'multiformats/hashes/identity'
2122
import { sha256 } from 'multiformats/hashes/sha2'
2223
import { Readable } from 'readable-stream'
24+
import Sinon from 'sinon'
2325
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
2426
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
2527
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
@@ -1343,4 +1345,228 @@ describe('exporter', () => {
13431345
dataSizeInBytes *= 10
13441346
}
13451347
})
1348+
1349+
it('should allow control of block read concurrency for files', async () => {
1350+
// create a multi-layered DAG of a manageable size
1351+
const imported = await first(importer([{
1352+
path: '1.2MiB.txt',
1353+
content: asAsyncIterable(smallFile)
1354+
}], block, {
1355+
rawLeaves: true,
1356+
chunker: fixedSize({ chunkSize: 50 }),
1357+
layout: balanced({ maxChildrenPerNode: 2 })
1358+
}))
1359+
1360+
if (imported == null) {
1361+
throw new Error('Nothing imported')
1362+
}
1363+
1364+
const node = dagPb.decode(await block.get(imported.cid))
1365+
expect(node.Links).to.have.lengthOf(2, 'imported node had too many children')
1366+
1367+
const child1 = dagPb.decode(await block.get(node.Links[0].Hash))
1368+
expect(child1.Links).to.have.lengthOf(2, 'layer 1 node had too many children')
1369+
1370+
const child2 = dagPb.decode(await block.get(node.Links[1].Hash))
1371+
expect(child2.Links).to.have.lengthOf(2, 'layer 1 node had too many children')
1372+
1373+
// should be raw nodes
1374+
expect(child1.Links[0].Hash.code).to.equal(raw.code, 'layer 2 node had wrong codec')
1375+
expect(child1.Links[1].Hash.code).to.equal(raw.code, 'layer 2 node had wrong codec')
1376+
expect(child2.Links[0].Hash.code).to.equal(raw.code, 'layer 2 node had wrong codec')
1377+
expect(child2.Links[1].Hash.code).to.equal(raw.code, 'layer 2 node had wrong codec')
1378+
1379+
// export file
1380+
const file = await exporter(imported.cid, block)
1381+
1382+
// export file data with default settings
1383+
const blockReadSpy = Sinon.spy(block, 'get')
1384+
const contentWithDefaultBlockConcurrency = await toBuffer(file.content())
1385+
1386+
// blocks should be loaded in default order - a whole level of sibling nodes at a time
1387+
expect(blockReadSpy.getCalls().map(call => call.args[0].toString())).to.deep.equal([
1388+
node.Links[0].Hash.toString(),
1389+
node.Links[1].Hash.toString(),
1390+
child1.Links[0].Hash.toString(),
1391+
child1.Links[1].Hash.toString(),
1392+
child2.Links[0].Hash.toString(),
1393+
child2.Links[1].Hash.toString()
1394+
])
1395+
1396+
// export file data overriding read concurrency
1397+
blockReadSpy.resetHistory()
1398+
const contentWitSmallBlockConcurrency = await toBuffer(file.content({
1399+
blockReadConcurrency: 1
1400+
}))
1401+
1402+
// blocks should be loaded in traversal order
1403+
expect(blockReadSpy.getCalls().map(call => call.args[0].toString())).to.deep.equal([
1404+
node.Links[0].Hash.toString(),
1405+
child1.Links[0].Hash.toString(),
1406+
child1.Links[1].Hash.toString(),
1407+
node.Links[1].Hash.toString(),
1408+
child2.Links[0].Hash.toString(),
1409+
child2.Links[1].Hash.toString()
1410+
])
1411+
1412+
// ensure exported bytes are the same
1413+
expect(contentWithDefaultBlockConcurrency).to.equalBytes(contentWitSmallBlockConcurrency)
1414+
})
1415+
1416+
it('should allow control of block read concurrency for directories', async () => {
1417+
const entries = 1024
1418+
1419+
// create a largeish directory
1420+
const imported = await last(importer((async function * () {
1421+
for (let i = 0; i < entries; i++) {
1422+
yield {
1423+
path: `file-${i}.txt`,
1424+
content: Uint8Array.from([i])
1425+
}
1426+
}
1427+
})(), block, {
1428+
wrapWithDirectory: true
1429+
}))
1430+
1431+
if (imported == null) {
1432+
throw new Error('Nothing imported')
1433+
}
1434+
1435+
const node = dagPb.decode(await block.get(imported.cid))
1436+
expect(node.Links).to.have.lengthOf(entries, 'imported node had too many children')
1437+
1438+
for (const link of node.Links) {
1439+
// should be raw nodes
1440+
expect(link.Hash.code).to.equal(raw.code, 'child node had wrong codec')
1441+
}
1442+
1443+
// export directory
1444+
const directory = await exporter(imported.cid, block)
1445+
1446+
// export file data with default settings
1447+
const originalGet = block.get.bind(block)
1448+
1449+
const expectedInvocations: string[] = []
1450+
1451+
for (const link of node.Links) {
1452+
expectedInvocations.push(`${link.Hash.toString()}-start`)
1453+
expectedInvocations.push(`${link.Hash.toString()}-end`)
1454+
}
1455+
1456+
const actualInvocations: string[] = []
1457+
1458+
block.get = async (cid) => {
1459+
actualInvocations.push(`${cid.toString()}-start`)
1460+
1461+
// introduce a small delay - if running in parallel actualInvocations will
1462+
// be:
1463+
// `foo-start`, `bar-start`, `baz-start`, `foo-end`, `bar-end`, `baz-end`
1464+
// if in series it will be:
1465+
// `foo-start`, `foo-end`, `bar-start`, `bar-end`, `baz-start`, `baz-end`
1466+
await delay(1)
1467+
1468+
actualInvocations.push(`${cid.toString()}-end`)
1469+
1470+
return originalGet(cid)
1471+
}
1472+
1473+
const blockReadSpy = Sinon.spy(block, 'get')
1474+
await drain(directory.content({
1475+
blockReadConcurrency: 1
1476+
}))
1477+
1478+
// blocks should be loaded in default order - a whole level of sibling nodes at a time
1479+
expect(blockReadSpy.getCalls().map(call => call.args[0].toString())).to.deep.equal(
1480+
node.Links.map(link => link.Hash.toString())
1481+
)
1482+
1483+
expect(actualInvocations).to.deep.equal(expectedInvocations)
1484+
})
1485+
1486+
it('should allow control of block read concurrency for HAMT sharded directories', async () => {
1487+
const entries = 1024
1488+
1489+
// create a sharded directory
1490+
const imported = await last(importer((async function * () {
1491+
for (let i = 0; i < entries; i++) {
1492+
yield {
1493+
path: `file-${i}.txt`,
1494+
content: Uint8Array.from([i])
1495+
}
1496+
}
1497+
})(), block, {
1498+
wrapWithDirectory: true,
1499+
shardSplitThresholdBytes: 10
1500+
}))
1501+
1502+
if (imported == null) {
1503+
throw new Error('Nothing imported')
1504+
}
1505+
1506+
const node = dagPb.decode(await block.get(imported.cid))
1507+
const data = UnixFS.unmarshal(node.Data ?? new Uint8Array(0))
1508+
expect(data.type).to.equal('hamt-sharded-directory')
1509+
1510+
// traverse the shard, collect all the CIDs
1511+
async function collectCIDs (node: PBNode): Promise<CID[]> {
1512+
const children: CID[] = []
1513+
1514+
for (const link of node.Links) {
1515+
children.push(link.Hash)
1516+
1517+
if (link.Hash.code === dagPb.code) {
1518+
const buf = await block.get(link.Hash)
1519+
const childNode = dagPb.decode(buf)
1520+
1521+
children.push(...(await collectCIDs(childNode)))
1522+
}
1523+
}
1524+
1525+
return children
1526+
}
1527+
1528+
const children: CID[] = await collectCIDs(node)
1529+
1530+
// export directory
1531+
const directory = await exporter(imported.cid, block)
1532+
1533+
// export file data with default settings
1534+
const originalGet = block.get.bind(block)
1535+
1536+
const expectedInvocations: string[] = []
1537+
1538+
for (const cid of children) {
1539+
expectedInvocations.push(`${cid.toString()}-start`)
1540+
expectedInvocations.push(`${cid.toString()}-end`)
1541+
}
1542+
1543+
const actualInvocations: string[] = []
1544+
1545+
block.get = async (cid) => {
1546+
actualInvocations.push(`${cid.toString()}-start`)
1547+
1548+
// introduce a small delay - if running in parallel actualInvocations will
1549+
// be:
1550+
// `foo-start`, `bar-start`, `baz-start`, `foo-end`, `bar-end`, `baz-end`
1551+
// if in series it will be:
1552+
// `foo-start`, `foo-end`, `bar-start`, `bar-end`, `baz-start`, `baz-end`
1553+
await delay(1)
1554+
1555+
actualInvocations.push(`${cid.toString()}-end`)
1556+
1557+
return originalGet(cid)
1558+
}
1559+
1560+
const blockReadSpy = Sinon.spy(block, 'get')
1561+
await drain(directory.content({
1562+
blockReadConcurrency: 1
1563+
}))
1564+
1565+
// blocks should be loaded in default order - a whole level of sibling nodes at a time
1566+
expect(blockReadSpy.getCalls().map(call => call.args[0].toString())).to.deep.equal(
1567+
children.map(link => link.toString())
1568+
)
1569+
1570+
expect(actualInvocations).to.deep.equal(expectedInvocations)
1571+
})
13461572
})

0 commit comments

Comments
 (0)