From 7ded672c3d417ebabe705bda2a9218442378c529 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 28 Feb 2023 13:42:59 +0000 Subject: [PATCH] fix: parallelise loading of dag-pb links in directories when exporting --- packages/ipfs-unixfs-exporter/package.json | 1 + .../resolvers/unixfs-v1/content/directory.ts | 24 +++++++++---- .../content/hamt-sharded-directory.ts | 35 ++++++++++++------- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/package.json b/packages/ipfs-unixfs-exporter/package.json index 1ab0f60c..6a0cb1be 100644 --- a/packages/ipfs-unixfs-exporter/package.json +++ b/packages/ipfs-unixfs-exporter/package.json @@ -142,6 +142,7 @@ "hamt-sharding": "^3.0.0", "interface-blockstore": "^4.0.0", "ipfs-unixfs": "^11.0.0", + "it-filter": "^2.0.0", "it-last": "^2.0.0", "it-map": "^2.0.0", "it-parallel": "^3.0.0", diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts index dedcc8a2..903b0f63 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts @@ -1,3 +1,7 @@ +import parallel from 'it-parallel' +import { pipe } from 'it-pipe' +import map from 'it-map' +import filter from 'it-filter' import type { ExporterOptions, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js' const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => { @@ -6,13 +10,19 @@ const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, de const length = options.length ?? node.Links.length const links = node.Links.slice(offset, length) - for (const link of links) { - const result = await resolve(link.Hash, link.Name ?? '', `${path}/${link.Name ?? ''}`, [], depth + 1, blockstore, options) - - if (result.entry != null) { - yield result.entry - } - } + yield * pipe( + links, + source => map(source, link => { + return async () => { + const linkName = link.Name ?? '' + const linkPath = `${path}/${linkName}` + const result = await resolve(link.Hash, linkName, linkPath, [], depth + 1, blockstore, options) + return result.entry + } + }), + source => parallel(source, { ordered: true }), + source => filter(source, entry => entry != null) + ) } return yieldDirectoryContent diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts index 54b67382..203a3448 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts @@ -1,3 +1,6 @@ +import parallel from 'it-parallel' +import { pipe } from 'it-pipe' +import map from 'it-map' import { decode, PBNode } from '@ipld/dag-pb' import type { Blockstore } from 'interface-blockstore' import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js' @@ -22,22 +25,30 @@ const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, async function * listDirectory (node: PBNode, path: string, resolve: Resolve, depth: number, blockstore: Blockstore, options: ExporterOptions): UnixfsV1DirectoryContent { const links = node.Links - for (const link of links) { - const name = link.Name != null ? link.Name.substring(2) : null + const results = pipe( + links, + source => map(source, link => { + return async () => { + const name = link.Name != null ? link.Name.substring(2) : null - if (name != null && name !== '') { - const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options) + if (name != null && name !== '') { + const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options) - yield result.entry - } else { - // descend into subshard - const block = await blockstore.get(link.Hash) - node = decode(block) + return { entries: result.entry == null ? [] : [result.entry] } + } else { + // descend into subshard + const block = await blockstore.get(link.Hash) + node = decode(block) - for await (const file of listDirectory(node, path, resolve, depth, blockstore, options)) { - yield file + return { entries: listDirectory(node, path, resolve, depth, blockstore, options) } + } } - } + }), + source => parallel(source, { ordered: true }) + ) + + for await (const { entries } of results) { + yield * entries } }