diff --git a/benchmarks/import/README.md b/benchmarks/import/README.md new file mode 100644 index 00000000..51ffd7e6 --- /dev/null +++ b/benchmarks/import/README.md @@ -0,0 +1,27 @@ +# Import Benchmark + +How much memory does the importer use while importing files? + +It should be relatively flat to enable importing files larger than physical memory. + +## Usage + +```console +$ npm i +$ npm start + +> benchmarks-gc@1.0.0 start +> npm run build && node dist/src/index.js + + +> benchmarks-gc@1.0.0 build +> aegir build --bundle false + +[14:51:28] tsc [started] +[14:51:33] tsc [completed] +generating Ed25519 keypair... +┌─────────┬────────────────┬─────────┬───────────┬──────┐ +│ (index) │ Implementation │ ops/s │ ms/op │ runs │ +├─────────┼────────────────┼─────────┼───────────┼──────┤ +//... results here +``` diff --git a/benchmarks/import/package.json b/benchmarks/import/package.json new file mode 100644 index 00000000..56f5618e --- /dev/null +++ b/benchmarks/import/package.json @@ -0,0 +1,42 @@ +{ + "name": "ipfs-unixfs-memory-benchmark", + "version": "0.0.0", + "description": "Memory benchmarks for ipfs-unixfs-importer", + "license": "Apache-2.0 OR MIT", + "private": true, + "type": "module", + "types": "./dist/src/index.d.ts", + "files": [ + "src", + "dist", + "!dist/test", + "!**/*.tsbuildinfo" + ], + "exports": { + ".": { + "types": "./dist/src/index.d.ts", + "import": "./dist/src/index.js" + } + }, + "eslintConfig": { + "extends": "ipfs", + "parserOptions": { + "sourceType": "module" + } + }, + "scripts": { + "build": "aegir build --bundle false", + "clean": "aegir clean", + "lint": "aegir lint", + "dep-check": "aegir dep-check", + "start": "npm run build && node --expose-gc ./dist/test/index.spec.js" + }, + "devDependencies": { + "aegir": "^38.1.2", + "blockstore-core": "^4.0.1", + "blockstore-fs": "^1.0.0", + "ipfs-unixfs-importer": "../../packages/ipfs-unixfs-importer", + "it-buffer-stream": "^3.0.1", + "it-drain": "^2.0.1" + } +} diff --git a/benchmarks/import/src/index.ts b/benchmarks/import/src/index.ts new file mode 100644 index 00000000..336ce12b --- /dev/null +++ b/benchmarks/import/src/index.ts @@ -0,0 +1 @@ +export {} diff --git a/benchmarks/import/test/index.spec.ts b/benchmarks/import/test/index.spec.ts new file mode 100644 index 00000000..dbd65ced --- /dev/null +++ b/benchmarks/import/test/index.spec.ts @@ -0,0 +1,61 @@ +/* eslint-env mocha */ + +import { importer, ImporterOptions } from 'ipfs-unixfs-importer' +import bufferStream from 'it-buffer-stream' +import { MemoryBlockstore } from 'blockstore-core' +import drain from 'it-drain' + +const REPEATS = 10 +const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB +const CHUNK_SIZE = 65536 + +async function main (): Promise { + const block = new MemoryBlockstore() + const times: number[] = [] + + for (let i = 0; i < REPEATS; i++) { + const size = FILE_SIZE + let read = 0 + let lastDate = Date.now() + let lastPercent = 0 + + const options: Partial = { + onProgress: (evt) => { + if (evt.type === 'unixfs:importer:progress:file:read') { + read += Number(evt.detail.bytesRead) + + const percent = Math.round((read / size) * 100) + + if (percent > lastPercent) { + times[percent] = (times[percent] ?? 0) + (Date.now() - lastDate) + + lastDate = Date.now() + lastPercent = percent + } + } + } + } + + const buf = new Uint8Array(CHUNK_SIZE).fill(0) + + await drain(importer([{ + path: '200Bytes.txt', + content: bufferStream(size, { + chunkSize: CHUNK_SIZE, + generator: () => { + return buf + } + }) + }], block, options)) + } + + console.info('Percent\tms') // eslint-disable-line no-console + times.forEach((time, index) => { + console.info(`${index}\t${Math.round(time / REPEATS)}`) // eslint-disable-line no-console + }) +} + +main().catch(err => { + console.error(err) // eslint-disable-line no-console + process.exit(1) +}) diff --git a/benchmarks/import/tsconfig.json b/benchmarks/import/tsconfig.json new file mode 100644 index 00000000..13a35996 --- /dev/null +++ b/benchmarks/import/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src", + "test" + ] +} diff --git a/packages/ipfs-unixfs-exporter/src/index.ts b/packages/ipfs-unixfs-exporter/src/index.ts index 37b38c88..d155bdbd 100644 --- a/packages/ipfs-unixfs-exporter/src/index.ts +++ b/packages/ipfs-unixfs-exporter/src/index.ts @@ -6,9 +6,47 @@ import type { UnixFS } from 'ipfs-unixfs' import type { PBNode } from '@ipld/dag-pb' import type { Blockstore } from 'interface-blockstore' import type { Bucket } from 'hamt-sharding' -import type { ProgressOptions } from 'progress-events' +import type { ProgressOptions, ProgressEvent } from 'progress-events' + +export interface ExportProgress { + /** + * How many bytes of the file have been read + */ + bytesRead: bigint + + /** + * How many bytes of the file will be read - n.b. this may be + * smaller than `fileSize` if `offset`/`length` have been + * specified + */ + totalBytes: bigint + + /** + * The size of the file being read - n.b. this may be + * larger than `total` if `offset`/`length` has been + * specified + */ + fileSize: bigint +} + +export interface ExportWalk { + cid: CID +} -export interface ExporterOptions extends ProgressOptions { +/** + * Progress events emitted by the exporter + */ +export type ExporterProgressEvents = + ProgressEvent<'unixfs:exporter:progress:unixfs:file', ExportProgress> | + ProgressEvent<'unixfs:exporter:progress:unixfs:raw', ExportProgress> | + ProgressEvent<'unixfs:exporter:progress:raw', ExportProgress> | + ProgressEvent<'unixfs:exporter:progress:identity', ExportProgress> | + ProgressEvent<'unixfs:exporter:walk:file', ExportWalk> | + ProgressEvent<'unixfs:exporter:walk:directory', ExportWalk> | + ProgressEvent<'unixfs:exporter:walk:hamt-sharded-directory', ExportWalk> | + ProgressEvent<'unixfs:exporter:walk:raw', ExportWalk> + +export interface ExporterOptions extends ProgressOptions { offset?: number length?: number signal?: AbortSignal diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/identity.ts b/packages/ipfs-unixfs-exporter/src/resolvers/identity.ts index f226dbf3..d63a6786 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/identity.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/identity.ts @@ -2,7 +2,8 @@ import errCode from 'err-code' import extractDataFromBlock from '../utils/extract-data-from-block.js' import validateOffsetAndLength from '../utils/validate-offset-and-length.js' import * as mh from 'multiformats/hashes/digest' -import type { ExporterOptions, Resolver } from '../index.js' +import type { ExporterOptions, Resolver, ExportProgress } from '../index.js' +import { CustomProgressEvent } from 'progress-events' const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator) => { async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator { @@ -11,7 +12,15 @@ const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGene length } = validateOffsetAndLength(node.length, options.offset, options.length) - yield extractDataFromBlock(node, 0n, offset, offset + length) + const buf = extractDataFromBlock(node, 0n, offset, offset + length) + + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:progress:identity', { + bytesRead: BigInt(buf.byteLength), + totalBytes: length - offset, + fileSize: BigInt(node.byteLength) + })) + + yield buf } return contentGenerator diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/raw.ts b/packages/ipfs-unixfs-exporter/src/resolvers/raw.ts index f567cdcb..502dba9f 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/raw.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/raw.ts @@ -1,7 +1,8 @@ import errCode from 'err-code' -import type { ExporterOptions, Resolver } from '../index.js' +import type { ExporterOptions, Resolver, ExportProgress } from '../index.js' import extractDataFromBlock from '../utils/extract-data-from-block.js' import validateOffsetAndLength from '../utils/validate-offset-and-length.js' +import { CustomProgressEvent } from 'progress-events' const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator) => { async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator { @@ -10,7 +11,15 @@ const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGene length } = validateOffsetAndLength(node.length, options.offset, options.length) - yield extractDataFromBlock(node, 0n, offset, offset + length) + const buf = extractDataFromBlock(node, 0n, offset, offset + length) + + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:progress:raw', { + bytesRead: BigInt(buf.byteLength), + totalBytes: length - offset, + fileSize: BigInt(node.byteLength) + })) + + yield buf } return contentGenerator diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts index 903b0f63..8db82377 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts @@ -2,7 +2,8 @@ import parallel from 'it-parallel' import { pipe } from 'it-pipe' import map from 'it-map' import filter from 'it-filter' -import type { ExporterOptions, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js' +import type { ExporterOptions, ExportWalk, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js' +import { CustomProgressEvent } from 'progress-events' const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => { async function * yieldDirectoryContent (options: ExporterOptions = {}): UnixfsV1DirectoryContent { @@ -10,6 +11,10 @@ const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, de const length = options.length ?? node.Links.length const links = node.Links.slice(offset, length) + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:walk:directory', { + cid + })) + yield * pipe( links, source => map(source, link => { diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts index 51f4bd4e..4a0abf63 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts @@ -9,7 +9,8 @@ import parallel from 'it-parallel' import { pipe } from 'it-pipe' import map from 'it-map' import PQueue from 'p-queue' -import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, ReadableStorage } from '../../../index.js' +import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, ReadableStorage, ExportProgress, ExportWalk } from '../../../index.js' +import { CustomProgressEvent } from 'progress-events' async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8Array, queue: Pushable, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise { // a `raw` node @@ -110,6 +111,10 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A // if the job rejects the 'error' event will be emitted on the child queue void childQueue.add(async () => { + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:walk:file', { + cid: link.Hash + })) + await walkDAG(blockstore, child, queue, blockStart, start, end, options) }) @@ -138,12 +143,15 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, } let read = 0n + const wanted = length - offset const queue = pushable() + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:walk:file', { + cid + })) + void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options) .then(() => { - const wanted = length - offset - if (read < wanted) { throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ') } @@ -169,6 +177,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, queue.end() } + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:progress:unixfs:file', { + bytesRead: read, + totalBytes: wanted, + fileSize + })) + yield buf } } diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts index 3dbb94d4..ec070b6c 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts @@ -2,10 +2,15 @@ import parallel from 'it-parallel' import { pipe } from 'it-pipe' import map from 'it-map' import { decode, PBNode } from '@ipld/dag-pb' -import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver, ReadableStorage } from '../../../index.js' +import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver, ReadableStorage, ExportWalk } from '../../../index.js' +import { CustomProgressEvent } from 'progress-events' const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => { function yieldHamtDirectoryContent (options: ExporterOptions = {}): UnixfsV1DirectoryContent { + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:walk:hamt-sharded-directory', { + cid + })) + return listDirectory(node, path, resolve, depth, blockstore, options) } @@ -30,6 +35,10 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de const block = await blockstore.get(link.Hash, options) node = decode(block) + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:walk:hamt-sharded-directory', { + cid: link.Hash + })) + return { entries: listDirectory(node, path, resolve, depth, blockstore, options) } } } diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts index e9f2a37e..e85471b0 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts @@ -1,4 +1,5 @@ -import type { ExporterOptions, UnixfsV1Resolver } from '../../../index.js' +import { CustomProgressEvent } from 'progress-events' +import type { ExporterOptions, ExportProgress, ExportWalk, UnixfsV1Resolver } from '../../../index.js' import extractDataFromBlock from '../../../utils/extract-data-from-block.js' import validateOffsetAndLength from '../../../utils/validate-offset-and-length.js' @@ -15,7 +16,19 @@ const rawContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, b length } = validateOffsetAndLength(size, options.offset, options.length) - yield extractDataFromBlock(unixfs.data, 0n, offset, offset + length) + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:walk:raw', { + cid + })) + + const buf = extractDataFromBlock(unixfs.data, 0n, offset, offset + length) + + options.onProgress?.(new CustomProgressEvent('unixfs:exporter:progress:unixfs:raw', { + bytesRead: BigInt(buf.byteLength), + totalBytes: length - offset, + fileSize: BigInt(unixfs.data.byteLength) + })) + + yield buf } return yieldRawContent diff --git a/packages/ipfs-unixfs-exporter/test/importer.spec.ts b/packages/ipfs-unixfs-exporter/test/importer.spec.ts index afdb6a91..4043072c 100644 --- a/packages/ipfs-unixfs-exporter/test/importer.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/importer.spec.ts @@ -652,8 +652,8 @@ strategies.forEach((strategy) => { }], block, options)) expect(onProgress.called).to.equal(true) - expect(onProgress.getCall(0).args[0]).to.have.property('type', 'unixfs:importer:progress') - expect(onProgress.getCall(0).args[0]).to.have.deep.property('detail', { bytes: chunkSize, path }) + expect(onProgress.getCall(0).args[0]).to.have.property('type', 'unixfs:importer:progress:file:read') + expect(onProgress.getCall(0).args[0]).to.have.deep.property('detail', { bytesRead: BigInt(chunkSize), chunkSize: BigInt(chunkSize), path }) }) it('will import files with CID version 1', async () => { diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts b/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts index e5fd5895..66c8e9db 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts @@ -3,18 +3,24 @@ import { persist, PersistOptions } from '../utils/persist.js' import * as dagPb from '@ipld/dag-pb' import * as raw from 'multiformats/codecs/raw' import type { BufferImporter } from '../index.js' -import type { Version } from 'multiformats/cid' +import type { CID, Version } from 'multiformats/cid' import { CustomProgressEvent } from 'progress-events' import type { ProgressOptions, ProgressEvent } from 'progress-events' /** * Passed to the onProgress callback while importing files */ -export interface ImportProgressData { +export interface ImportWriteProgress { /** - * The size of the current chunk + * How many bytes we have written for this source so far - this may be + * bigger than the file size due to the DAG-PB wrappers of each block */ - bytes: number + bytesWritten: bigint + + /** + * The CID of the block that has been written + */ + cid: CID /** * The path of the file being imported, if one was specified @@ -23,7 +29,7 @@ export interface ImportProgressData { } export type BufferImportProgressEvents = - ProgressEvent<'unixfs:importer:progress', ImportProgressData> + ProgressEvent<'unixfs:importer:progress:file:write', ImportWriteProgress> export interface BufferImporterOptions extends ProgressOptions { cidVersion: Version @@ -33,9 +39,10 @@ export interface BufferImporterOptions extends ProgressOptions { - options.onProgress?.(new CustomProgressEvent('unixfs:importer:progress', { bytes: block.byteLength, path: file.path })) + yield async () => { // eslint-disable-line no-loop-func let unixfs const opts: PersistOptions = { @@ -59,8 +66,18 @@ export function defaultBufferImporter (options: BufferImporterOptions): BufferIm }) } + const cid = await persist(block, blockstore, opts) + + bytesWritten += BigInt(block.byteLength) + + options.onProgress?.(new CustomProgressEvent('unixfs:importer:progress:file:write', { + bytesWritten, + cid, + path: file.path + })) + return { - cid: await persist(block, blockstore, opts), + cid, unixfs, size: BigInt(block.length), block diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts b/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts index 43a7c437..c86f4a5f 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts @@ -3,9 +3,8 @@ import { persist } from '../utils/persist.js' import { encode, prepare } from '@ipld/dag-pb' import type { Directory, InProgressImportResult, WritableStorage } from '../index.js' import type { Version } from 'multiformats/cid' -import type { ProgressOptions } from 'progress-events' -export interface DirBuilderOptions extends ProgressOptions { +export interface DirBuilderOptions { cidVersion: Version signal?: AbortSignal } diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/file.ts b/packages/ipfs-unixfs-importer/src/dag-builder/file.ts index a74a22ef..27a59b64 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/file.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/file.ts @@ -1,12 +1,13 @@ import { UnixFS } from 'ipfs-unixfs' import { persist } from '../utils/persist.js' -import { encode, PBLink, prepare } from '@ipld/dag-pb' +import { encode, PBLink, PBNode, prepare } from '@ipld/dag-pb' import parallelBatch from 'it-parallel-batch' import * as rawCodec from 'multiformats/codecs/raw' -import type { BufferImporter, File, InProgressImportResult, WritableStorage, SingleBlockImportResult } from '../index.js' +import type { BufferImporter, File, InProgressImportResult, WritableStorage, SingleBlockImportResult, ImporterProgressEvents } from '../index.js' import type { FileLayout, Reducer } from '../layout/index.js' -import type { Version } from 'multiformats/cid' -import type { ProgressOptions } from 'progress-events' +import type { CID, Version } from 'multiformats/cid' +import { CustomProgressEvent } from 'progress-events' +import type { ProgressOptions, ProgressEvent } from 'progress-events' interface BuildFileBatchOptions { bufferImporter: BufferImporter @@ -50,7 +51,22 @@ async function * buildFileBatch (file: File, blockstore: WritableStorage, option } } -interface ReduceOptions extends ProgressOptions { +export interface LayoutLeafProgress { + /** + * The CID of the leaf being written + */ + cid: CID + + /** + * The path of the file being imported, if one was specified + */ + path?: string +} + +export type ReducerProgressEvents = + ProgressEvent<'unixfs:importer:progress:file:layout', LayoutLeafProgress> + +interface ReduceOptions extends ProgressOptions { reduceSingleLeafToSelf: boolean cidVersion: Version signal?: AbortSignal @@ -64,6 +80,7 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions) const reducer: Reducer = async function (leaves) { if (leaves.length === 1 && isSingleBlockImport(leaves[0]) && options.reduceSingleLeafToSelf) { const leaf = leaves[0] + let node: Uint8Array | PBNode = leaf.block if (isSingleBlockImport(leaf) && (file.mtime !== undefined || file.mode !== undefined)) { // only one leaf node which is a raw leaf - we have metadata so convert it into a @@ -75,7 +92,9 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions) data: leaf.block }) - leaf.block = encode(prepare({ Data: leaf.unixfs.marshal() })) + node = { Data: leaf.unixfs.marshal(), Links: [] } + + leaf.block = encode(prepare(node)) leaf.cid = await persist(leaf.block, blockstore, { ...options, @@ -84,6 +103,11 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions) leaf.size = BigInt(leaf.block.length) } + options.onProgress?.(new CustomProgressEvent('unixfs:importer:progress:file:layout', { + cid: leaf.cid, + path: leaf.originalPath + })) + return { cid: leaf.cid, path: file.path, @@ -146,6 +170,11 @@ const reduce = (file: File, blockstore: WritableStorage, options: ReduceOptions) const block = encode(prepare(node)) const cid = await persist(block, blockstore, options) + options.onProgress?.(new CustomProgressEvent('unixfs:importer:progress:file:layout', { + cid, + path: file.originalPath + })) + return { cid, path: file.path, diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/index.ts b/packages/ipfs-unixfs-importer/src/dag-builder/index.ts index 70e88b56..294f3b1f 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/index.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/index.ts @@ -1,9 +1,34 @@ import { dirBuilder, DirBuilderOptions } from './dir.js' import { fileBuilder, FileBuilderOptions } from './file.js' import errCode from 'err-code' -import type { Directory, File, FileCandidate, ImportCandidate, InProgressImportResult, WritableStorage } from '../index.js' +import type { Directory, File, FileCandidate, ImportCandidate, ImporterProgressEvents, InProgressImportResult, WritableStorage } from '../index.js' import type { ChunkValidator } from './validate-chunks.js' import type { Chunker } from '../chunker/index.js' +import type { ProgressEvent, ProgressOptions } from 'progress-events' +import { CustomProgressEvent } from 'progress-events' + +/** + * Passed to the onProgress callback while importing files + */ +export interface ImportReadProgress { + /** + * How many bytes we have read from this source so far + */ + bytesRead: bigint + + /** + * The size of the current chunk + */ + chunkSize: bigint + + /** + * The path of the file being imported, if one was specified + */ + path?: string +} + +export type DagBuilderProgressEvents = + ProgressEvent<'unixfs:importer:progress:file:read', ImportReadProgress> function isIterable (thing: any): thing is Iterable { return Symbol.iterator in thing @@ -33,7 +58,7 @@ function contentAsAsyncIterable (content: Uint8Array | AsyncIterable throw errCode(new Error('Content was invalid'), 'ERR_INVALID_CONTENT') } -export interface DagBuilderOptions extends FileBuilderOptions, DirBuilderOptions { +export interface DagBuilderOptions extends FileBuilderOptions, DirBuilderOptions, ProgressOptions { chunker: Chunker chunkValidator: ChunkValidator wrapWithDirectory: boolean @@ -63,7 +88,22 @@ export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder { path: entry.path, mtime: entry.mtime, mode: entry.mode, - content: options.chunker(options.chunkValidator(contentAsAsyncIterable(entry.content))), + content: (async function * () { + let bytesRead = 0n + + for await (const chunk of options.chunker(options.chunkValidator(contentAsAsyncIterable(entry.content)))) { + const currentChunkSize = BigInt(chunk.byteLength) + bytesRead += currentChunkSize + + options.onProgress?.(new CustomProgressEvent('unixfs:importer:progress:file:read', { + bytesRead, + chunkSize: currentChunkSize, + path: entry.path + })) + + yield chunk + } + })(), originalPath } diff --git a/packages/ipfs-unixfs-importer/src/index.ts b/packages/ipfs-unixfs-importer/src/index.ts index 3c4283cd..7c73743b 100644 --- a/packages/ipfs-unixfs-importer/src/index.ts +++ b/packages/ipfs-unixfs-importer/src/index.ts @@ -1,5 +1,5 @@ import parallelBatch from 'it-parallel-batch' -import { DAGBuilder, defaultDagBuilder } from './dag-builder/index.js' +import { DAGBuilder, DagBuilderProgressEvents, defaultDagBuilder } from './dag-builder/index.js' import { defaultTreeBuilder } from './tree-builder.js' import type { UnixFS, Mtime } from 'ipfs-unixfs' import type { CID, Version as CIDVersion } from 'multiformats/cid' @@ -13,6 +13,7 @@ import first from 'it-first' import errcode from 'err-code' import type { AwaitIterable } from 'interface-store' import type { ProgressOptions } from 'progress-events' +import type { ReducerProgressEvents } from './dag-builder/file.js' export type ByteStream = AwaitIterable export type ImportContent = ByteStream | Uint8Array @@ -76,13 +77,15 @@ export interface HamtHashFn { (value: Uint8Array): Promise } export interface TreeBuilder { (source: AsyncIterable, blockstore: WritableStorage): AsyncIterable } export interface BufferImporter { (file: File, blockstore: WritableStorage): AsyncIterable<() => Promise> } -export type ImportProgressEvents = - BufferImportProgressEvents +export type ImporterProgressEvents = + BufferImportProgressEvents | + DagBuilderProgressEvents | + ReducerProgressEvents /** * Options to control the importer's behaviour */ -export interface ImporterOptions extends ProgressOptions { +export interface ImporterOptions extends ProgressOptions { /** * When a file would span multiple DAGNodes, if this is true the leaf nodes * will not be wrapped in `UnixFS` protobufs and will instead contain the diff --git a/packages/ipfs-unixfs-importer/test/benchmark.spec.ts b/packages/ipfs-unixfs-importer/test/benchmark.spec.ts deleted file mode 100644 index 5c843fdc..00000000 --- a/packages/ipfs-unixfs-importer/test/benchmark.spec.ts +++ /dev/null @@ -1,63 +0,0 @@ -/* eslint-env mocha */ - -import { importer, ImporterOptions } from '../src/index.js' -import bufferStream from 'it-buffer-stream' -import { MemoryBlockstore } from 'blockstore-core' -import drain from 'it-drain' - -const REPEATS = 10 -const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB -const CHUNK_SIZE = 65536 - -describe.skip('benchmark', function () { - this.timeout(30 * 1000) - - const block = new MemoryBlockstore() - - const times: number[] = [] - - after(() => { - console.info('Percent\tms') // eslint-disable-line no-console - times.forEach((time, index) => { - console.info(`${index}\t${Math.round(time / REPEATS)}`) // eslint-disable-line no-console - }) - }) - - for (let i = 0; i < REPEATS; i++) { - it(`run ${i}`, async () => { // eslint-disable-line no-loop-func - this.timeout(0) - - const size = FILE_SIZE - let read = 0 - let lastDate = Date.now() - let lastPercent = 0 - - const options: Partial = { - onProgress: (evt) => { - read += evt.detail.bytes - - const percent = Math.round((read / size) * 100) - - if (percent > lastPercent) { - times[percent] = (times[percent] ?? 0) + (Date.now() - lastDate) - - lastDate = Date.now() - lastPercent = percent - } - } - } - - const buf = new Uint8Array(CHUNK_SIZE).fill(0) - - await drain(importer([{ - path: '200Bytes.txt', - content: bufferStream(size, { - chunkSize: CHUNK_SIZE, - generator: () => { - return buf - } - }) - }], block, options)) - }) - } -})