Skip to content

Commit 9e01878

Browse files
author
Alan Shaw
authored
fix: parallelise loading of dag-pb links in directories when exporting (#286)
This PR extends the work done in #249 to also include directories and sharded directories.
1 parent 999a539 commit 9e01878

File tree

3 files changed

+41
-19
lines changed

3 files changed

+41
-19
lines changed

packages/ipfs-unixfs-exporter/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
"hamt-sharding": "^3.0.0",
143143
"interface-blockstore": "^4.0.0",
144144
"ipfs-unixfs": "^11.0.0",
145+
"it-filter": "^2.0.0",
145146
"it-last": "^2.0.0",
146147
"it-map": "^2.0.0",
147148
"it-parallel": "^3.0.0",

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import parallel from 'it-parallel'
2+
import { pipe } from 'it-pipe'
3+
import map from 'it-map'
4+
import filter from 'it-filter'
15
import type { ExporterOptions, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js'
26

37
const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => {
@@ -6,13 +10,19 @@ const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, de
610
const length = options.length ?? node.Links.length
711
const links = node.Links.slice(offset, length)
812

9-
for (const link of links) {
10-
const result = await resolve(link.Hash, link.Name ?? '', `${path}/${link.Name ?? ''}`, [], depth + 1, blockstore, options)
11-
12-
if (result.entry != null) {
13-
yield result.entry
14-
}
15-
}
13+
yield * pipe(
14+
links,
15+
source => map(source, link => {
16+
return async () => {
17+
const linkName = link.Name ?? ''
18+
const linkPath = `${path}/${linkName}`
19+
const result = await resolve(link.Hash, linkName, linkPath, [], depth + 1, blockstore, options)
20+
return result.entry
21+
}
22+
}),
23+
source => parallel(source, { ordered: true }),
24+
source => filter(source, entry => entry != null)
25+
)
1626
}
1727

1828
return yieldDirectoryContent

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import parallel from 'it-parallel'
2+
import { pipe } from 'it-pipe'
3+
import map from 'it-map'
14
import { decode, PBNode } from '@ipld/dag-pb'
25
import type { Blockstore } from 'interface-blockstore'
36
import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js'
@@ -22,22 +25,30 @@ const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path,
2225
async function * listDirectory (node: PBNode, path: string, resolve: Resolve, depth: number, blockstore: Blockstore, options: ExporterOptions): UnixfsV1DirectoryContent {
2326
const links = node.Links
2427

25-
for (const link of links) {
26-
const name = link.Name != null ? link.Name.substring(2) : null
28+
const results = pipe(
29+
links,
30+
source => map(source, link => {
31+
return async () => {
32+
const name = link.Name != null ? link.Name.substring(2) : null
2733

28-
if (name != null && name !== '') {
29-
const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options)
34+
if (name != null && name !== '') {
35+
const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options)
3036

31-
yield result.entry
32-
} else {
33-
// descend into subshard
34-
const block = await blockstore.get(link.Hash)
35-
node = decode(block)
37+
return { entries: result.entry == null ? [] : [result.entry] }
38+
} else {
39+
// descend into subshard
40+
const block = await blockstore.get(link.Hash)
41+
node = decode(block)
3642

37-
for await (const file of listDirectory(node, path, resolve, depth, blockstore, options)) {
38-
yield file
43+
return { entries: listDirectory(node, path, resolve, depth, blockstore, options) }
44+
}
3945
}
40-
}
46+
}),
47+
source => parallel(source, { ordered: true })
48+
)
49+
50+
for await (const { entries } of results) {
51+
yield * entries
4152
}
4253
}
4354

0 commit comments

Comments
 (0)