From 8caeb786396891c4f17f7942bb773a33aaad3638 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 15 Mar 2023 18:52:27 +0100 Subject: [PATCH] fix: reduce reqiured number of blockstore methods The importer only needs `.put`, the exporter only needs `.get` --- packages/ipfs-unixfs-exporter/src/index.ts | 6 +- .../src/dag-builder/buffer-importer.ts | 15 ++-- .../src/dag-builder/dir.ts | 9 ++- .../src/dag-builder/file.ts | 73 +++++++++---------- packages/ipfs-unixfs-importer/src/dir-flat.ts | 15 ++-- packages/ipfs-unixfs-importer/src/index.ts | 23 ++++-- .../test/builder-balanced.spec.ts | 12 ++- .../test/chunker-custom.spec.ts | 5 +- 8 files changed, 84 insertions(+), 74 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/src/index.ts b/packages/ipfs-unixfs-exporter/src/index.ts index 935a745d..fcbbc2cf 100644 --- a/packages/ipfs-unixfs-exporter/src/index.ts +++ b/packages/ipfs-unixfs-exporter/src/index.ts @@ -79,11 +79,7 @@ export interface ShardTraversalContext { lastBucket: Bucket } -export interface BlockstoreOptions { - signal?: AbortSignal -} - -export type Blockstore = Pick +export type Blockstore = Pick const toPathComponents = (path: string = ''): string[] => { // split on / unless escaped with \ diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts b/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts index f2a561f9..e5fd5895 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts @@ -32,10 +32,10 @@ export interface BufferImporterOptions extends ProgressOptions { - options.onProgress?.(new CustomProgressEvent('unixfs:importer:progress', { bytes: buffer.length, path: file.path })) + options.onProgress?.(new CustomProgressEvent('unixfs:importer:progress', { bytes: block.byteLength, path: file.path })) let unixfs const opts: PersistOptions = { @@ -50,19 +50,20 @@ export function defaultBufferImporter (options: BufferImporterOptions): BufferIm } else { unixfs = new UnixFS({ type: options.leafType, - data: buffer + data: block }) - buffer = dagPb.encode({ + block = dagPb.encode({ Data: unixfs.marshal(), Links: [] }) } return { - cid: await persist(buffer, block, opts), + cid: await persist(block, blockstore, opts), unixfs, - size: BigInt(buffer.length) + size: BigInt(block.length), + block } } } diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts b/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts index 57a54e6e..e789fb68 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts @@ -17,15 +17,16 @@ export const dirBuilder = async (dir: Directory, blockstore: Blockstore, options mode: dir.mode }) - const buffer = encode(prepare({ Data: unixfs.marshal() })) - const cid = await persist(buffer, blockstore, options) + const block = encode(prepare({ Data: unixfs.marshal() })) + const cid = await persist(block, blockstore, options) const path = dir.path return { cid, path, unixfs, - size: BigInt(buffer.length), - originalPath: dir.originalPath + size: BigInt(block.length), + originalPath: dir.originalPath, + block } } diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/file.ts b/packages/ipfs-unixfs-importer/src/dag-builder/file.ts index 89be051b..6ded9e18 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/file.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/file.ts @@ -3,7 +3,7 @@ import { persist } from '../utils/persist.js' import { encode, PBLink, prepare } from '@ipld/dag-pb' import parallelBatch from 'it-parallel-batch' import * as rawCodec from 'multiformats/codecs/raw' -import type { BufferImporter, File, InProgressImportResult, Blockstore } from '../index.js' +import type { BufferImporter, File, InProgressImportResult, Blockstore, SingleBlockImportResult } from '../index.js' import type { FileLayout, Reducer } from '../layout/index.js' import type { Version } from 'multiformats/cid' import type { ProgressOptions } from 'progress-events' @@ -15,24 +15,37 @@ interface BuildFileBatchOptions { async function * buildFileBatch (file: File, blockstore: Blockstore, options: BuildFileBatchOptions): AsyncGenerator { let count = -1 - let previous: InProgressImportResult | undefined + let previous: SingleBlockImportResult | undefined for await (const entry of parallelBatch(options.bufferImporter(file, blockstore), options.blockWriteConcurrency)) { count++ if (count === 0) { - previous = entry + // cache the first entry if case there aren't any more + previous = { + ...entry, + single: true + } + continue } else if (count === 1 && (previous != null)) { - yield previous + // we have the second block of a multiple block import so yield the first + yield { + ...previous, + block: undefined, + single: undefined + } previous = undefined } - yield entry + // yield the second or later block of a multiple block import + yield { + ...entry, + block: undefined + } } if (previous != null) { - previous.single = true yield previous } } @@ -43,49 +56,32 @@ interface ReduceOptions extends ProgressOptions { signal?: AbortSignal } +function isSingleBlockImport (result: any): result is SingleBlockImportResult { + return result.single === true +} + const reduce = (file: File, blockstore: Blockstore, options: ReduceOptions): Reducer => { const reducer: Reducer = async function (leaves) { - if (leaves.length === 1 && leaves[0]?.single === true && options.reduceSingleLeafToSelf) { + if (leaves.length === 1 && isSingleBlockImport(leaves[0]) && options.reduceSingleLeafToSelf) { const leaf = leaves[0] - if (file.mtime !== undefined || file.mode !== undefined) { + if (isSingleBlockImport(leaf) && (file.mtime !== undefined || file.mode !== undefined)) { // only one leaf node which is a raw leaf - we have metadata so convert it into a // UnixFS entry otherwise we'll have nowhere to store the metadata - let buffer = await blockstore.get(leaf.cid, options) - leaf.unixfs = new UnixFS({ type: 'file', mtime: file.mtime, mode: file.mode, - data: buffer + data: leaf.block }) - buffer = encode(prepare({ Data: leaf.unixfs.marshal() })) - - // // TODO vmx 2021-03-26: This is what the original code does, it checks - // // the multihash of the original leaf node and uses then the same - // // hasher. i wonder if that's really needed or if we could just use - // // the hasher from `options.hasher` instead. - // const multihash = mh.decode(leaf.cid.multihash.bytes) - // let hasher - // switch multihash { - // case sha256.code { - // hasher = sha256 - // break; - // } - // //case identity.code { - // // hasher = identity - // // break; - // //} - // default: { - // throw new Error(`Unsupported hasher "${multihash}"`) - // } - // } - leaf.cid = await persist(buffer, blockstore, { + leaf.block = encode(prepare({ Data: leaf.unixfs.marshal() })) + + leaf.cid = await persist(leaf.block, blockstore, { ...options, cidVersion: options.cidVersion }) - leaf.size = BigInt(buffer.length) + leaf.size = BigInt(leaf.block.length) } return { @@ -147,15 +143,16 @@ const reduce = (file: File, blockstore: Blockstore, options: ReduceOptions): Red Data: f.marshal(), Links: links } - const buffer = encode(prepare(node)) - const cid = await persist(buffer, blockstore, options) + const block = encode(prepare(node)) + const cid = await persist(block, blockstore, options) return { cid, path: file.path, unixfs: f, - size: BigInt(buffer.length + node.Links.reduce((acc, curr) => acc + (curr.Tsize ?? 0), 0)), - originalPath: file.originalPath + size: BigInt(block.length + node.Links.reduce((acc, curr) => acc + (curr.Tsize ?? 0), 0)), + originalPath: file.originalPath, + block } } diff --git a/packages/ipfs-unixfs-importer/src/dir-flat.ts b/packages/ipfs-unixfs-importer/src/dir-flat.ts index 2597a5f7..7d62a926 100644 --- a/packages/ipfs-unixfs-importer/src/dir-flat.ts +++ b/packages/ipfs-unixfs-importer/src/dir-flat.ts @@ -1,6 +1,7 @@ import { encode, PBNode, prepare } from '@ipld/dag-pb' import type { Blockstore } from 'interface-blockstore' import { UnixFS } from 'ipfs-unixfs' +import type { CID } from 'multiformats/cid' import { Dir, CID_V0, CID_V1, DirProps } from './dir.js' import type { ImportResult, InProgressImportResult } from './index.js' import { persist, PersistOptions } from './utils/persist.js' @@ -68,20 +69,22 @@ export class DirFlat extends Dir { async * flush (block: Blockstore): AsyncGenerator { const links = [] - for (let [name, child] of this._children.entries()) { + for (const [name, child] of this._children.entries()) { + let result: { size?: bigint | number, cid?: CID } = child + if (child instanceof Dir) { for await (const entry of child.flush(block)) { - child = entry + result = entry - yield child + yield entry } } - if (child.size != null && (child.cid != null)) { + if (result.size != null && (result.cid != null)) { links.push({ Name: name, - Tsize: Number(child.size), - Hash: child.cid + Tsize: Number(result.size), + Hash: result.cid }) } } diff --git a/packages/ipfs-unixfs-importer/src/index.ts b/packages/ipfs-unixfs-importer/src/index.ts index 47d960d3..82d69c29 100644 --- a/packages/ipfs-unixfs-importer/src/index.ts +++ b/packages/ipfs-unixfs-importer/src/index.ts @@ -17,11 +17,7 @@ import type { ProgressOptions } from 'progress-events' export type ByteStream = AwaitIterable export type ImportContent = ByteStream | Uint8Array -export interface BlockstoreOptions { - signal?: AbortSignal -} - -export type Blockstore = Pick +export type Blockstore = Pick export interface FileCandidate { path?: string @@ -60,14 +56,25 @@ export interface ImportResult { unixfs?: UnixFS } -export interface InProgressImportResult extends ImportResult { - single?: boolean +export interface MultipleBlockImportResult extends ImportResult { + originalPath?: string +} + +export interface SingleBlockImportResult extends ImportResult { + single: true originalPath?: string + block: Uint8Array +} + +export type InProgressImportResult = SingleBlockImportResult | MultipleBlockImportResult + +export interface BufferImporterResult extends ImportResult { + block: Uint8Array } export interface HamtHashFn { (value: Uint8Array): Promise } export interface TreeBuilder { (source: AsyncIterable, blockstore: Blockstore): AsyncIterable } -export interface BufferImporter { (file: File, blockstore: Blockstore): AsyncIterable<() => Promise> } +export interface BufferImporter { (file: File, blockstore: Blockstore): AsyncIterable<() => Promise> } export type ImportProgressEvents = BufferImportProgressEvents diff --git a/packages/ipfs-unixfs-importer/test/builder-balanced.spec.ts b/packages/ipfs-unixfs-importer/test/builder-balanced.spec.ts index f40f4272..850defcc 100644 --- a/packages/ipfs-unixfs-importer/test/builder-balanced.spec.ts +++ b/packages/ipfs-unixfs-importer/test/builder-balanced.spec.ts @@ -24,7 +24,8 @@ describe('builder: balanced', () => { it('reduces one value into itself', async () => { const source = [{ cid: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn'), - size: 0n + size: 0n, + block: Uint8Array.from([]) }] const result = await balanced(options)((async function * () { @@ -37,13 +38,16 @@ describe('builder: balanced', () => { it('reduces 3 values into parent', async () => { const source = [{ cid: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn'), - size: 0n + size: 0n, + block: Uint8Array.from([]) }, { cid: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn'), - size: 0n + size: 0n, + block: Uint8Array.from([]) }, { cid: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn'), - size: 0n + size: 0n, + block: Uint8Array.from([]) }] const result = await balanced(options)((async function * () { diff --git a/packages/ipfs-unixfs-importer/test/chunker-custom.spec.ts b/packages/ipfs-unixfs-importer/test/chunker-custom.spec.ts index 9d5cae8f..7fb112bd 100644 --- a/packages/ipfs-unixfs-importer/test/chunker-custom.spec.ts +++ b/packages/ipfs-unixfs-importer/test/chunker-custom.spec.ts @@ -19,7 +19,7 @@ describe('custom chunker', function () { const block = new MemoryBlockstore() const fromPartsTest = (content: AsyncIterable, size: bigint) => async () => { - const put = async (buf: Uint8Array): Promise<{ cid: CID, size: bigint, unixfs: UnixFS }> => { + const put = async (buf: Uint8Array): Promise<{ cid: CID, size: bigint, unixfs: UnixFS, block: Uint8Array }> => { const encodedBlock = await Block.encode({ value: buf, codec: rawCodec, @@ -29,7 +29,8 @@ describe('custom chunker', function () { return { cid: encodedBlock.cid, size: BigInt(buf.length), - unixfs: new UnixFS() + unixfs: new UnixFS(), + block: buf } }