Skip to content

Commit d0df723

Browse files
authored
feat: adds progress events to the importer and exporter (#302)
Adds progress events for dag creation and export
1 parent 7deec9c commit d0df723

File tree

19 files changed

+364
-101
lines changed

19 files changed

+364
-101
lines changed

benchmarks/import/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Import Benchmark
2+
3+
How much memory does the importer use while importing files?
4+
5+
It should be relatively flat to enable importing files larger than physical memory.
6+
7+
## Usage
8+
9+
```console
10+
$ npm i
11+
$ npm start
12+
13+
> benchmarks-gc@1.0.0 start
14+
> npm run build && node dist/src/index.js
15+
16+
17+
> benchmarks-gc@1.0.0 build
18+
> aegir build --bundle false
19+
20+
[14:51:28] tsc [started]
21+
[14:51:33] tsc [completed]
22+
generating Ed25519 keypair...
23+
┌─────────┬────────────────┬─────────┬───────────┬──────┐
24+
│ (index) │ Implementation │ ops/s │ ms/op │ runs │
25+
├─────────┼────────────────┼─────────┼───────────┼──────┤
26+
//... results here
27+
```

benchmarks/import/package.json

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{
2+
"name": "ipfs-unixfs-memory-benchmark",
3+
"version": "0.0.0",
4+
"description": "Memory benchmarks for ipfs-unixfs-importer",
5+
"license": "Apache-2.0 OR MIT",
6+
"private": true,
7+
"type": "module",
8+
"types": "./dist/src/index.d.ts",
9+
"files": [
10+
"src",
11+
"dist",
12+
"!dist/test",
13+
"!**/*.tsbuildinfo"
14+
],
15+
"exports": {
16+
".": {
17+
"types": "./dist/src/index.d.ts",
18+
"import": "./dist/src/index.js"
19+
}
20+
},
21+
"eslintConfig": {
22+
"extends": "ipfs",
23+
"parserOptions": {
24+
"sourceType": "module"
25+
}
26+
},
27+
"scripts": {
28+
"build": "aegir build --bundle false",
29+
"clean": "aegir clean",
30+
"lint": "aegir lint",
31+
"dep-check": "aegir dep-check",
32+
"start": "npm run build && node --expose-gc ./dist/test/index.spec.js"
33+
},
34+
"devDependencies": {
35+
"aegir": "^38.1.2",
36+
"blockstore-core": "^4.0.1",
37+
"blockstore-fs": "^1.0.0",
38+
"ipfs-unixfs-importer": "../../packages/ipfs-unixfs-importer",
39+
"it-buffer-stream": "^3.0.1",
40+
"it-drain": "^2.0.1"
41+
}
42+
}

benchmarks/import/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export {}

benchmarks/import/test/index.spec.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/* eslint-env mocha */
2+
3+
import { importer, ImporterOptions } from 'ipfs-unixfs-importer'
4+
import bufferStream from 'it-buffer-stream'
5+
import { MemoryBlockstore } from 'blockstore-core'
6+
import drain from 'it-drain'
7+
8+
const REPEATS = 10
9+
const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB
10+
const CHUNK_SIZE = 65536
11+
12+
async function main (): Promise<void> {
13+
const block = new MemoryBlockstore()
14+
const times: number[] = []
15+
16+
for (let i = 0; i < REPEATS; i++) {
17+
const size = FILE_SIZE
18+
let read = 0
19+
let lastDate = Date.now()
20+
let lastPercent = 0
21+
22+
const options: Partial<ImporterOptions> = {
23+
onProgress: (evt) => {
24+
if (evt.type === 'unixfs:importer:progress:file:read') {
25+
read += Number(evt.detail.bytesRead)
26+
27+
const percent = Math.round((read / size) * 100)
28+
29+
if (percent > lastPercent) {
30+
times[percent] = (times[percent] ?? 0) + (Date.now() - lastDate)
31+
32+
lastDate = Date.now()
33+
lastPercent = percent
34+
}
35+
}
36+
}
37+
}
38+
39+
const buf = new Uint8Array(CHUNK_SIZE).fill(0)
40+
41+
await drain(importer([{
42+
path: '200Bytes.txt',
43+
content: bufferStream(size, {
44+
chunkSize: CHUNK_SIZE,
45+
generator: () => {
46+
return buf
47+
}
48+
})
49+
}], block, options))
50+
}
51+
52+
console.info('Percent\tms') // eslint-disable-line no-console
53+
times.forEach((time, index) => {
54+
console.info(`${index}\t${Math.round(time / REPEATS)}`) // eslint-disable-line no-console
55+
})
56+
}
57+
58+
main().catch(err => {
59+
console.error(err) // eslint-disable-line no-console
60+
process.exit(1)
61+
})

benchmarks/import/tsconfig.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"extends": "aegir/src/config/tsconfig.aegir.json",
3+
"compilerOptions": {
4+
"outDir": "dist"
5+
},
6+
"include": [
7+
"src",
8+
"test"
9+
]
10+
}

packages/ipfs-unixfs-exporter/src/index.ts

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,47 @@ import type { UnixFS } from 'ipfs-unixfs'
66
import type { PBNode } from '@ipld/dag-pb'
77
import type { Blockstore } from 'interface-blockstore'
88
import type { Bucket } from 'hamt-sharding'
9-
import type { ProgressOptions } from 'progress-events'
9+
import type { ProgressOptions, ProgressEvent } from 'progress-events'
10+
11+
export interface ExportProgress {
12+
/**
13+
* How many bytes of the file have been read
14+
*/
15+
bytesRead: bigint
16+
17+
/**
18+
* How many bytes of the file will be read - n.b. this may be
19+
* smaller than `fileSize` if `offset`/`length` have been
20+
* specified
21+
*/
22+
totalBytes: bigint
23+
24+
/**
25+
* The size of the file being read - n.b. this may be
26+
* larger than `total` if `offset`/`length` has been
27+
* specified
28+
*/
29+
fileSize: bigint
30+
}
31+
32+
export interface ExportWalk {
33+
cid: CID
34+
}
1035

11-
export interface ExporterOptions extends ProgressOptions {
36+
/**
37+
* Progress events emitted by the exporter
38+
*/
39+
export type ExporterProgressEvents =
40+
ProgressEvent<'unixfs:exporter:progress:unixfs:file', ExportProgress> |
41+
ProgressEvent<'unixfs:exporter:progress:unixfs:raw', ExportProgress> |
42+
ProgressEvent<'unixfs:exporter:progress:raw', ExportProgress> |
43+
ProgressEvent<'unixfs:exporter:progress:identity', ExportProgress> |
44+
ProgressEvent<'unixfs:exporter:walk:file', ExportWalk> |
45+
ProgressEvent<'unixfs:exporter:walk:directory', ExportWalk> |
46+
ProgressEvent<'unixfs:exporter:walk:hamt-sharded-directory', ExportWalk> |
47+
ProgressEvent<'unixfs:exporter:walk:raw', ExportWalk>
48+
49+
export interface ExporterOptions extends ProgressOptions<ExporterProgressEvents> {
1250
offset?: number
1351
length?: number
1452
signal?: AbortSignal

packages/ipfs-unixfs-exporter/src/resolvers/identity.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import errCode from 'err-code'
22
import extractDataFromBlock from '../utils/extract-data-from-block.js'
33
import validateOffsetAndLength from '../utils/validate-offset-and-length.js'
44
import * as mh from 'multiformats/hashes/digest'
5-
import type { ExporterOptions, Resolver } from '../index.js'
5+
import type { ExporterOptions, Resolver, ExportProgress } from '../index.js'
6+
import { CustomProgressEvent } from 'progress-events'
67

78
const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator<Uint8Array, void, undefined>) => {
89
async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
@@ -11,7 +12,15 @@ const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGene
1112
length
1213
} = validateOffsetAndLength(node.length, options.offset, options.length)
1314

14-
yield extractDataFromBlock(node, 0n, offset, offset + length)
15+
const buf = extractDataFromBlock(node, 0n, offset, offset + length)
16+
17+
options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:identity', {
18+
bytesRead: BigInt(buf.byteLength),
19+
totalBytes: length - offset,
20+
fileSize: BigInt(node.byteLength)
21+
}))
22+
23+
yield buf
1524
}
1625

1726
return contentGenerator

packages/ipfs-unixfs-exporter/src/resolvers/raw.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import errCode from 'err-code'
2-
import type { ExporterOptions, Resolver } from '../index.js'
2+
import type { ExporterOptions, Resolver, ExportProgress } from '../index.js'
33
import extractDataFromBlock from '../utils/extract-data-from-block.js'
44
import validateOffsetAndLength from '../utils/validate-offset-and-length.js'
5+
import { CustomProgressEvent } from 'progress-events'
56

67
const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator<Uint8Array, void, undefined>) => {
78
async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
@@ -10,7 +11,15 @@ const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGene
1011
length
1112
} = validateOffsetAndLength(node.length, options.offset, options.length)
1213

13-
yield extractDataFromBlock(node, 0n, offset, offset + length)
14+
const buf = extractDataFromBlock(node, 0n, offset, offset + length)
15+
16+
options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:raw', {
17+
bytesRead: BigInt(buf.byteLength),
18+
totalBytes: length - offset,
19+
fileSize: BigInt(node.byteLength)
20+
}))
21+
22+
yield buf
1423
}
1524

1625
return contentGenerator

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@ import parallel from 'it-parallel'
22
import { pipe } from 'it-pipe'
33
import map from 'it-map'
44
import filter from 'it-filter'
5-
import type { ExporterOptions, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js'
5+
import type { ExporterOptions, ExportWalk, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js'
6+
import { CustomProgressEvent } from 'progress-events'
67

78
const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => {
89
async function * yieldDirectoryContent (options: ExporterOptions = {}): UnixfsV1DirectoryContent {
910
const offset = options.offset ?? 0
1011
const length = options.length ?? node.Links.length
1112
const links = node.Links.slice(offset, length)
1213

14+
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:directory', {
15+
cid
16+
}))
17+
1318
yield * pipe(
1419
links,
1520
source => map(source, link => {

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import parallel from 'it-parallel'
99
import { pipe } from 'it-pipe'
1010
import map from 'it-map'
1111
import PQueue from 'p-queue'
12-
import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, ReadableStorage } from '../../../index.js'
12+
import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, ReadableStorage, ExportProgress, ExportWalk } from '../../../index.js'
13+
import { CustomProgressEvent } from 'progress-events'
1314

1415
async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8Array, queue: Pushable<Uint8Array>, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise<void> {
1516
// a `raw` node
@@ -110,6 +111,10 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A
110111

111112
// if the job rejects the 'error' event will be emitted on the child queue
112113
void childQueue.add(async () => {
114+
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
115+
cid: link.Hash
116+
}))
117+
113118
await walkDAG(blockstore, child, queue, blockStart, start, end, options)
114119
})
115120

@@ -138,12 +143,15 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
138143
}
139144

140145
let read = 0n
146+
const wanted = length - offset
141147
const queue = pushable()
142148

149+
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
150+
cid
151+
}))
152+
143153
void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options)
144154
.then(() => {
145-
const wanted = length - offset
146-
147155
if (read < wanted) {
148156
throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ')
149157
}
@@ -169,6 +177,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
169177
queue.end()
170178
}
171179

180+
options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:unixfs:file', {
181+
bytesRead: read,
182+
totalBytes: wanted,
183+
fileSize
184+
}))
185+
172186
yield buf
173187
}
174188
}

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@ import parallel from 'it-parallel'
22
import { pipe } from 'it-pipe'
33
import map from 'it-map'
44
import { decode, PBNode } from '@ipld/dag-pb'
5-
import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver, ReadableStorage } from '../../../index.js'
5+
import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver, ReadableStorage, ExportWalk } from '../../../index.js'
6+
import { CustomProgressEvent } from 'progress-events'
67

78
const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => {
89
function yieldHamtDirectoryContent (options: ExporterOptions = {}): UnixfsV1DirectoryContent {
10+
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:hamt-sharded-directory', {
11+
cid
12+
}))
13+
914
return listDirectory(node, path, resolve, depth, blockstore, options)
1015
}
1116

@@ -30,6 +35,10 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de
3035
const block = await blockstore.get(link.Hash, options)
3136
node = decode(block)
3237

38+
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:hamt-sharded-directory', {
39+
cid: link.Hash
40+
}))
41+
3342
return { entries: listDirectory(node, path, resolve, depth, blockstore, options) }
3443
}
3544
}

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import type { ExporterOptions, UnixfsV1Resolver } from '../../../index.js'
1+
import { CustomProgressEvent } from 'progress-events'
2+
import type { ExporterOptions, ExportProgress, ExportWalk, UnixfsV1Resolver } from '../../../index.js'
23
import extractDataFromBlock from '../../../utils/extract-data-from-block.js'
34
import validateOffsetAndLength from '../../../utils/validate-offset-and-length.js'
45

@@ -15,7 +16,19 @@ const rawContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, b
1516
length
1617
} = validateOffsetAndLength(size, options.offset, options.length)
1718

18-
yield extractDataFromBlock(unixfs.data, 0n, offset, offset + length)
19+
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:raw', {
20+
cid
21+
}))
22+
23+
const buf = extractDataFromBlock(unixfs.data, 0n, offset, offset + length)
24+
25+
options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:unixfs:raw', {
26+
bytesRead: BigInt(buf.byteLength),
27+
totalBytes: length - offset,
28+
fileSize: BigInt(unixfs.data.byteLength)
29+
}))
30+
31+
yield buf
1932
}
2033

2134
return yieldRawContent

packages/ipfs-unixfs-exporter/test/importer.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,8 +652,8 @@ strategies.forEach((strategy) => {
652652
}], block, options))
653653

654654
expect(onProgress.called).to.equal(true)
655-
expect(onProgress.getCall(0).args[0]).to.have.property('type', 'unixfs:importer:progress')
656-
expect(onProgress.getCall(0).args[0]).to.have.deep.property('detail', { bytes: chunkSize, path })
655+
expect(onProgress.getCall(0).args[0]).to.have.property('type', 'unixfs:importer:progress:file:read')
656+
expect(onProgress.getCall(0).args[0]).to.have.deep.property('detail', { bytesRead: BigInt(chunkSize), chunkSize: BigInt(chunkSize), path })
657657
})
658658

659659
it('will import files with CID version 1', async () => {

0 commit comments

Comments
 (0)