From 287965ab9e4a88ac5c8275d89cd9e70d577a4674 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 24 Aug 2023 16:56:30 +0200 Subject: [PATCH 1/6] fix: use fanout --- .../content/hamt-sharded-directory.ts | 21 ++++++++++++++++++- packages/ipfs-unixfs/src/index.ts | 3 ++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts index 2f2126ce..2b5309cf 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts @@ -1,3 +1,5 @@ +import { UnixFS } from 'ipfs-unixfs' +import errCode from 'err-code' import { decode, type PBNode } from '@ipld/dag-pb' import map from 'it-map' import parallel from 'it-parallel' @@ -20,11 +22,28 @@ const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, async function * listDirectory (node: PBNode, path: string, resolve: Resolve, depth: number, blockstore: ReadableStorage, options: ExporterOptions): UnixfsV1DirectoryContent { const links = node.Links + if (node.Data == null) { + throw errCode(new Error('no data in PBNode'), 'ERR_NOT_UNIXFS') + } + + let dir: UnixFS + try { + dir = UnixFS.unmarshal(node.Data) + } catch (err: any) { + throw errCode(err, 'ERR_NOT_UNIXFS') + } + + if (!dir.fanout) { + throw errCode(new Error('missing fanout'), 'ERR_NOT_UNIXFS') + } + + const padLength = (dir.fanout - 1n).toString(16).length + const results = pipe( links, source => map(source, link => { return async () => { - const name = link.Name != null ? link.Name.substring(2) : null + const name = link.Name != null ? link.Name.substring(padLength) : null if (name != null && name !== '') { const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options) diff --git a/packages/ipfs-unixfs/src/index.ts b/packages/ipfs-unixfs/src/index.ts index 4df0f6e2..2026d806 100644 --- a/packages/ipfs-unixfs/src/index.ts +++ b/packages/ipfs-unixfs/src/index.ts @@ -52,7 +52,8 @@ class UnixFS { secs: message.mtime.Seconds ?? 0n, nsecs: message.mtime.FractionalNanoseconds } - : undefined + : undefined, + fanout: message.fanout }) // make sure we honour the original mode From ee82abb5327ff09fd149316423478f741a9d0575 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 24 Aug 2023 17:07:09 +0200 Subject: [PATCH 2/6] chore: omg import order is failing the build --- .../src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts index 2b5309cf..fb35d04a 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts @@ -1,6 +1,6 @@ -import { UnixFS } from 'ipfs-unixfs' -import errCode from 'err-code' import { decode, type PBNode } from '@ipld/dag-pb' +import errCode from 'err-code' +import { UnixFS } from 'ipfs-unixfs' import map from 'it-map' import parallel from 'it-parallel' import { pipe } from 'it-pipe' From f42a40fb8ceeb1b2ab0baae9f118db56c045c01d Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 25 Aug 2023 07:19:21 +0100 Subject: [PATCH 3/6] feat: add config option to control fanout size (#356) * feat: add config option to control fanout size Adds a `shardFanoutBytes` option to the importer to allow configuring the number of bytes used for the HAMT prefix, also a test. * fix: use fanout "bits" (#357) --------- Co-authored-by: Rod Vagg --- .../content/hamt-sharded-directory.ts | 2 +- .../test/exporter-sharded.spec.ts | 38 ++++++++++++++++++- .../ipfs-unixfs-importer/src/dir-sharded.ts | 15 ++++++-- .../ipfs-unixfs-importer/src/flat-to-shard.ts | 7 ++-- packages/ipfs-unixfs-importer/src/index.ts | 9 +++++ .../ipfs-unixfs-importer/src/tree-builder.ts | 1 + 6 files changed, 62 insertions(+), 10 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts index fb35d04a..9e59d7c9 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts @@ -33,7 +33,7 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de throw errCode(err, 'ERR_NOT_UNIXFS') } - if (!dir.fanout) { + if (dir.fanout == null) { throw errCode(new Error('missing fanout'), 'ERR_NOT_UNIXFS') } diff --git a/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts index 7d1b1be6..95a8c50e 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts @@ -4,7 +4,7 @@ import * as dagPb from '@ipld/dag-pb' import { expect } from 'aegir/chai' import { MemoryBlockstore } from 'blockstore-core' import { UnixFS } from 'ipfs-unixfs' -import { importer } from 'ipfs-unixfs-importer' +import { importer, type ImportCandidate } from 'ipfs-unixfs-importer' import all from 'it-all' import randomBytes from 'it-buffer-stream' import last from 'it-last' @@ -255,4 +255,40 @@ describe('exporter sharded', function () { expect(exported.name).to.deep.equal('file-1') }) + + it('exports a shard with a different fanout size', async () => { + const files: ImportCandidate[] = [{ + path: '/baz.txt', + content: Uint8Array.from([0, 1, 2, 3, 4]) + }, { + path: '/foo.txt', + content: Uint8Array.from([0, 1, 2, 3, 4]) + }, { + path: '/bar.txt', + content: Uint8Array.from([0, 1, 2, 3, 4]) + }] + + const result = await last(importer(files, block, { + shardSplitThresholdBytes: 0, + shardFanoutBits: 4, // 2**4 = 16 children max + wrapWithDirectory: true + })) + + if (result == null) { + throw new Error('Import failed') + } + + const { cid } = result + const dir = await exporter(cid, block) + + expect(dir).to.have.nested.property('unixfs.fanout', 16n) + + const contents = await all(dir.content()) + + expect(contents.map(entry => ({ + path: `/${entry.name}`, + content: entry.node + }))) + .to.deep.equal(files) + }) }) diff --git a/packages/ipfs-unixfs-importer/src/dir-sharded.ts b/packages/ipfs-unixfs-importer/src/dir-sharded.ts index c30fbfbb..7bee99b0 100644 --- a/packages/ipfs-unixfs-importer/src/dir-sharded.ts +++ b/packages/ipfs-unixfs-importer/src/dir-sharded.ts @@ -18,16 +18,21 @@ async function hamtHashFn (buf: Uint8Array): Promise { } const HAMT_HASH_CODE = BigInt(0x22) +const DEFAULT_FANOUT_BITS = 8 + +export interface DirShardedOptions extends PersistOptions { + shardFanoutBits: number +} class DirSharded extends Dir { private readonly _bucket: Bucket - constructor (props: DirProps, options: PersistOptions) { + constructor (props: DirProps, options: DirShardedOptions) { super(props, options) this._bucket = createHAMT({ hashFn: hamtHashFn, - bits: 8 + bits: options.shardFanoutBits ?? DEFAULT_FANOUT_BITS }) } @@ -88,6 +93,7 @@ export default DirSharded async function * flush (bucket: Bucket, blockstore: Blockstore, shardRoot: DirSharded | null, options: PersistOptions): AsyncIterable { const children = bucket._children + const padLength = (bucket.tableSize() - 1).toString(16).length const links: PBLink[] = [] let childrenSize = 0n @@ -98,7 +104,7 @@ async function * flush (bucket: Bucket, blockstore continue } - const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0') + const labelPrefix = i.toString(16).toUpperCase().padStart(padLength, '0') if (child instanceof Bucket) { let shard @@ -191,6 +197,7 @@ function isDir (obj: any): obj is Dir { function calculateSize (bucket: Bucket, shardRoot: DirSharded | null, options: PersistOptions): number { const children = bucket._children + const padLength = (bucket.tableSize() - 1).toString(16).length const links: PBLink[] = [] for (let i = 0; i < children.length; i++) { @@ -200,7 +207,7 @@ function calculateSize (bucket: Bucket, shardRoot: DirSharded | null, optio continue } - const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0') + const labelPrefix = i.toString(16).toUpperCase().padStart(padLength, '0') if (child instanceof Bucket) { const size = calculateSize(child, null, options) diff --git a/packages/ipfs-unixfs-importer/src/flat-to-shard.ts b/packages/ipfs-unixfs-importer/src/flat-to-shard.ts index a5b4d419..f7e58959 100644 --- a/packages/ipfs-unixfs-importer/src/flat-to-shard.ts +++ b/packages/ipfs-unixfs-importer/src/flat-to-shard.ts @@ -1,9 +1,8 @@ import { DirFlat } from './dir-flat.js' -import DirSharded from './dir-sharded.js' +import DirSharded, { type DirShardedOptions } from './dir-sharded.js' import type { Dir } from './dir.js' -import type { PersistOptions } from './utils/persist.js' -export async function flatToShard (child: Dir | null, dir: Dir, threshold: number, options: PersistOptions): Promise { +export async function flatToShard (child: Dir | null, dir: Dir, threshold: number, options: DirShardedOptions): Promise { let newDir = dir as DirSharded if (dir instanceof DirFlat && dir.estimateNodeSize() > threshold) { @@ -31,7 +30,7 @@ export async function flatToShard (child: Dir | null, dir: Dir, threshold: numbe return newDir } -async function convertToShard (oldDir: DirFlat, options: PersistOptions): Promise { +async function convertToShard (oldDir: DirFlat, options: DirShardedOptions): Promise { const newDir = new DirSharded({ root: oldDir.root, dir: true, diff --git a/packages/ipfs-unixfs-importer/src/index.ts b/packages/ipfs-unixfs-importer/src/index.ts index 88270154..d0dc4a7a 100644 --- a/packages/ipfs-unixfs-importer/src/index.ts +++ b/packages/ipfs-unixfs-importer/src/index.ts @@ -123,6 +123,13 @@ export interface ImporterOptions extends ProgressOptions */ shardSplitThresholdBytes?: number + /** + * The number of bits of a hash digest used at each level of sharding to + * the child index. 2**shardFanoutBits will dictate the maximum number of + * children for any shard in the HAMT. Default: 8 + */ + shardFanoutBits?: number + /** * How many files to import concurrently. For large numbers of small files this * should be high (e.g. 50). Default: 10 @@ -241,6 +248,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri const wrapWithDirectory = options.wrapWithDirectory ?? false const shardSplitThresholdBytes = options.shardSplitThresholdBytes ?? 262144 + const shardFanoutBits = options.shardFanoutBits ?? 8 const cidVersion = options.cidVersion ?? 1 const rawLeaves = options.rawLeaves ?? true const leafType = options.leafType ?? 'file' @@ -269,6 +277,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri const buildTree: TreeBuilder = options.treeBuilder ?? defaultTreeBuilder({ wrapWithDirectory, shardSplitThresholdBytes, + shardFanoutBits, cidVersion, onProgress: options.onProgress }) diff --git a/packages/ipfs-unixfs-importer/src/tree-builder.ts b/packages/ipfs-unixfs-importer/src/tree-builder.ts index 1c2e457d..f84d6fb5 100644 --- a/packages/ipfs-unixfs-importer/src/tree-builder.ts +++ b/packages/ipfs-unixfs-importer/src/tree-builder.ts @@ -7,6 +7,7 @@ import type { PersistOptions } from './utils/persist.js' export interface AddToTreeOptions extends PersistOptions { shardSplitThresholdBytes: number + shardFanoutBits: number } async function addToTree (elem: InProgressImportResult, tree: Dir, options: AddToTreeOptions): Promise { From b30685381396900132c527a51a59c21bcbb3c90c Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 25 Aug 2023 09:40:44 +0200 Subject: [PATCH 4/6] fix: walkPath HAMT with non-default fanout --- .../src/utils/find-cid-in-shard.ts | 44 ++++++++++++++----- .../test/exporter-sharded.spec.ts | 26 ++++++++++- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts b/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts index a02912e3..98886e14 100644 --- a/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts +++ b/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts @@ -1,6 +1,8 @@ import { decode, type PBLink, type PBNode } from '@ipld/dag-pb' import { murmur3128 } from '@multiformats/murmur3' import { Bucket, type BucketPosition, createHAMT } from 'hamt-sharding' +import errCode from 'err-code' +import { UnixFS } from 'ipfs-unixfs' import type { ExporterOptions, ShardTraversalContext, ReadableStorage } from '../index.js' import type { CID } from 'multiformats/cid' @@ -16,13 +18,14 @@ const hashFn = async function (buf: Uint8Array): Promise { } const addLinksToHamtBucket = async (links: PBLink[], bucket: Bucket, rootBucket: Bucket): Promise => { + const padLength = (bucket.tableSize() - 1).toString(16).length await Promise.all( links.map(async link => { if (link.Name == null) { // TODO(@rvagg): what do? this is technically possible throw new Error('Unexpected Link without a Name') } - if (link.Name.length === 2) { + if (link.Name.length === padLength) { const pos = parseInt(link.Name, 16) bucket._putObjectAt(pos, new Bucket({ @@ -37,12 +40,12 @@ const addLinksToHamtBucket = async (links: PBLink[], bucket: Bucket, ro ) } -const toPrefix = (position: number): string => { +const toPrefix = (position: number, padLength: number): string => { return position .toString(16) .toUpperCase() - .padStart(2, '0') - .substring(0, 2) + .padStart(padLength, '0') + .substring(0, padLength) } const toBucketPath = (position: BucketPosition): Array> => { @@ -62,8 +65,27 @@ const toBucketPath = (position: BucketPosition): Array> const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStorage, context?: ShardTraversalContext, options?: ExporterOptions): Promise => { if (context == null) { + if (node.Data == null) { + throw errCode(new Error('no data in PBNode'), 'ERR_NOT_UNIXFS') + } + + let dir: UnixFS + try { + dir = UnixFS.unmarshal(node.Data) + } catch (err: any) { + throw errCode(err, 'ERR_NOT_UNIXFS') + } + + if (dir.type !== 'hamt-sharded-directory') { + throw errCode(new Error('not a HAMT'), 'ERR_NOT_UNIXFS') + } + if (dir.fanout == null) { + throw errCode(new Error('missing fanout'), 'ERR_NOT_UNIXFS') + } + const rootBucket = createHAMT({ - hashFn + hashFn, + bits: Math.log2(Number(dir.fanout)) }) context = { @@ -73,16 +95,18 @@ const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStor } } + const padLength = (context.lastBucket.tableSize() - 1).toString(16).length + await addLinksToHamtBucket(node.Links, context.lastBucket, context.rootBucket) const position = await context.rootBucket._findNewBucketAndPos(name) - let prefix = toPrefix(position.pos) + let prefix = toPrefix(position.pos, padLength) const bucketPath = toBucketPath(position) if (bucketPath.length > context.hamtDepth) { context.lastBucket = bucketPath[context.hamtDepth] - prefix = toPrefix(context.lastBucket._posAtParent) + prefix = toPrefix(context.lastBucket._posAtParent, padLength) } const link = node.Links.find(link => { @@ -90,8 +114,8 @@ const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStor return false } - const entryPrefix = link.Name.substring(0, 2) - const entryName = link.Name.substring(2) + const entryPrefix = link.Name.substring(0, padLength) + const entryName = link.Name.substring(padLength) if (entryPrefix !== prefix) { // not the entry or subshard we're looking for @@ -110,7 +134,7 @@ const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStor return } - if (link.Name != null && link.Name.substring(2) === name) { + if (link.Name != null && link.Name.substring(padLength) === name) { return link.Hash } diff --git a/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts index 95a8c50e..671b97c5 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts @@ -241,7 +241,7 @@ describe('exporter sharded', function () { await block.put(nodeBlockCid, nodeBlockBuf) const shardNodeBuf = dagPb.encode({ - Data: new UnixFS({ type: 'hamt-sharded-directory' }).marshal(), + Data: new UnixFS({ type: 'hamt-sharded-directory', fanout: 2n**8n }).marshal(), Links: [{ Name: '75normal-dir', Tsize: nodeBlockBuf.length, @@ -291,4 +291,28 @@ describe('exporter sharded', function () { }))) .to.deep.equal(files) }) + + it('walks path of a HAMT with a different fanout size', async () => { + const files: ImportCandidate[] = [{ + path: '/foo/bar/baz.txt', + content: Uint8Array.from([0, 1, 2, 3, 4]) + }] + + const result = await last(importer(files, block, { + shardSplitThresholdBytes: 0, + shardFanoutBits: 4, // 2**4 = 16 children max + wrapWithDirectory: true + })) + + if (result == null) { + throw new Error('Import failed') + } + + const { cid } = result + const file = await last(walkPath(`${cid}/foo/bar/baz.txt`, block)) + expect([{ + path: file?.path.replace(`${cid}`, ''), + content: file?.node + }]).to.deep.equal(files) + }) }) From 4b6336f07815b7a3edd84707a73a5660b8973328 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 25 Aug 2023 09:45:02 +0200 Subject: [PATCH 5/6] chore: appease linter --- packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts b/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts index 98886e14..3f1ad035 100644 --- a/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts +++ b/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts @@ -1,7 +1,7 @@ import { decode, type PBLink, type PBNode } from '@ipld/dag-pb' import { murmur3128 } from '@multiformats/murmur3' -import { Bucket, type BucketPosition, createHAMT } from 'hamt-sharding' import errCode from 'err-code' +import { Bucket, type BucketPosition, createHAMT } from 'hamt-sharding' import { UnixFS } from 'ipfs-unixfs' import type { ExporterOptions, ShardTraversalContext, ReadableStorage } from '../index.js' import type { CID } from 'multiformats/cid' From 34b57caebf5eeeddb0d4f551468c8c96ebf72220 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 25 Aug 2023 09:49:04 +0200 Subject: [PATCH 6/6] chore: appease linter --- packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts | 2 +- packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts b/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts index 3f1ad035..13346bf7 100644 --- a/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts +++ b/packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts @@ -68,7 +68,7 @@ const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStor if (node.Data == null) { throw errCode(new Error('no data in PBNode'), 'ERR_NOT_UNIXFS') } - + let dir: UnixFS try { dir = UnixFS.unmarshal(node.Data) diff --git a/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts index 671b97c5..17baf0a0 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts @@ -241,7 +241,7 @@ describe('exporter sharded', function () { await block.put(nodeBlockCid, nodeBlockBuf) const shardNodeBuf = dagPb.encode({ - Data: new UnixFS({ type: 'hamt-sharded-directory', fanout: 2n**8n }).marshal(), + Data: new UnixFS({ type: 'hamt-sharded-directory', fanout: 2n ** 8n }).marshal(), Links: [{ Name: '75normal-dir', Tsize: nodeBlockBuf.length,