Skip to content

Commit 44e7792

Browse files
authored
fix: add tests for unbalanced dags and very deep dags (#299)
To ensure that blocks are emitted in the correct order while also not overflowing the stack, when we loop of the children of a node, use a separate queue for each child and wait for the queue to be idle before moving on to the next child.
1 parent d579997 commit 44e7792

File tree

3 files changed

+238
-16
lines changed

3 files changed

+238
-16
lines changed

packages/ipfs-unixfs-exporter/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,13 @@
159159
"blockstore-core": "^4.0.1",
160160
"delay": "^5.0.0",
161161
"ipfs-unixfs-importer": "^15.0.0",
162+
"iso-random-stream": "^2.0.2",
162163
"it-all": "^2.0.0",
163164
"it-buffer-stream": "^3.0.0",
164165
"it-first": "^2.0.0",
165166
"merge-options": "^3.0.4",
166-
"sinon": "^15.0.0"
167+
"sinon": "^15.0.0",
168+
"wherearewe": "^2.0.1"
167169
},
168170
"browser": {
169171
"fs": false

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import map from 'it-map'
1111
import PQueue from 'p-queue'
1212
import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, Blockstore } from '../../../index.js'
1313

14-
async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, queue: Pushable<Uint8Array>, streamPosition: bigint, start: bigint, end: bigint, walkQueue: PQueue, options: ExporterOptions): Promise<void> {
14+
async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, queue: Pushable<Uint8Array>, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise<void> {
1515
// a `raw` node
1616
if (node instanceof Uint8Array) {
1717
queue.push(extractDataFromBlock(node, streamPosition, start, end))
@@ -98,9 +98,23 @@ async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array,
9898
return
9999
}
100100

101-
void walkQueue.add(async () => {
102-
await walkDAG(blockstore, child, queue, blockStart, start, end, walkQueue, options)
101+
// create a queue for this child - we use a queue instead of recursion
102+
// to avoid overflowing the stack
103+
const childQueue = new PQueue({
104+
concurrency: 1
103105
})
106+
// if any of the child jobs error, end the read queue with the error
107+
childQueue.on('error', error => {
108+
queue.end(error)
109+
})
110+
111+
// if the job rejects the 'error' event will be emitted on the child queue
112+
void childQueue.add(async () => {
113+
await walkDAG(blockstore, child, queue, blockStart, start, end, options)
114+
})
115+
116+
// wait for this child to complete before moving on to the next
117+
await childQueue.onIdle()
104118
}
105119
}
106120
)
@@ -123,20 +137,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
123137
return
124138
}
125139

126-
// use a queue to walk the DAG instead of recursion to ensure very deep DAGs
127-
// don't overflow the stack
128-
const walkQueue = new PQueue({
129-
concurrency: 1
130-
})
131140
const queue = pushable()
132141

133-
void walkQueue.add(async () => {
134-
await walkDAG(blockstore, node, queue, 0n, offset, offset + length, walkQueue, options)
135-
})
136-
137-
walkQueue.on('error', error => {
138-
queue.end(error)
139-
})
142+
void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options)
143+
.catch(err => {
144+
queue.end(err)
145+
})
140146

141147
let read = 0n
142148

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/* eslint-env mocha */
2+
3+
import { expect } from 'aegir/chai'
4+
import all from 'it-all'
5+
import { MemoryBlockstore } from 'blockstore-core'
6+
import { concat, concat as uint8ArrayConcat } from 'uint8arrays/concat'
7+
import { exporter } from './../src/index.js'
8+
import randomBytes from 'iso-random-stream/src/random.js'
9+
import { sha256 } from 'multiformats/hashes/sha2'
10+
import { CID } from 'multiformats/cid'
11+
import * as raw from 'multiformats/codecs/raw'
12+
import { UnixFS } from 'ipfs-unixfs'
13+
import * as dagPb from '@ipld/dag-pb'
14+
import type { Blockstore } from 'interface-blockstore'
15+
import { isNode } from 'wherearewe'
16+
17+
describe('exporter esoteric DAGs', () => {
18+
let block: Blockstore
19+
20+
beforeEach(() => {
21+
block = new MemoryBlockstore()
22+
})
23+
24+
async function storeBlock (buf: Uint8Array, codec: number): Promise<CID> {
25+
const mh = await sha256.digest(buf)
26+
const cid = CID.createV1(codec, mh)
27+
28+
await block.put(cid, buf)
29+
30+
return cid
31+
}
32+
33+
it('exports an unbalanced DAG', async () => {
34+
const leaves = await Promise.all([
35+
randomBytes(5),
36+
randomBytes(3),
37+
randomBytes(6),
38+
randomBytes(10),
39+
randomBytes(4),
40+
randomBytes(7),
41+
randomBytes(8)
42+
].map(async buf => {
43+
return {
44+
cid: await storeBlock(buf, raw.code),
45+
buf
46+
}
47+
}))
48+
49+
// create an unbalanced DAG:
50+
//
51+
// root
52+
// / | | \
53+
// 0 * 5 6
54+
// / | \
55+
// 1 * 4
56+
// / \
57+
// 2 3
58+
59+
const intermediateNode1 = {
60+
Data: new UnixFS({
61+
type: 'file',
62+
blockSizes: [
63+
BigInt(leaves[2].buf.byteLength),
64+
BigInt(leaves[3].buf.byteLength)
65+
]
66+
}).marshal(),
67+
Links: [{
68+
Name: '',
69+
Hash: leaves[2].cid,
70+
Tsize: leaves[2].buf.byteLength
71+
}, {
72+
Name: '',
73+
Hash: leaves[3].cid,
74+
Tsize: leaves[3].buf.byteLength
75+
}]
76+
}
77+
const intermediateNode1Buf = dagPb.encode(intermediateNode1)
78+
const intermediateNode1Cid = await storeBlock(intermediateNode1Buf, dagPb.code)
79+
80+
const intermediateNode2 = {
81+
Data: new UnixFS({
82+
type: 'file',
83+
blockSizes: [
84+
BigInt(leaves[1].buf.byteLength),
85+
BigInt(leaves[2].buf.byteLength + leaves[3].buf.byteLength),
86+
BigInt(leaves[4].buf.byteLength)
87+
]
88+
}).marshal(),
89+
Links: [{
90+
Name: '',
91+
Hash: leaves[1].cid,
92+
Tsize: leaves[1].buf.byteLength
93+
}, {
94+
Name: '',
95+
Hash: intermediateNode1Cid,
96+
Tsize: intermediateNode1Buf.length
97+
}, {
98+
Name: '',
99+
Hash: leaves[4].cid,
100+
Tsize: leaves[4].buf.byteLength
101+
}]
102+
}
103+
104+
const intermediateNode2Buf = dagPb.encode(intermediateNode2)
105+
const intermediateNode2Cid = await storeBlock(intermediateNode2Buf, dagPb.code)
106+
107+
const unixfs = new UnixFS({
108+
type: 'file',
109+
blockSizes: [
110+
BigInt(leaves[0].buf.byteLength),
111+
BigInt(leaves[1].buf.byteLength + leaves[2].buf.byteLength + leaves[3].buf.byteLength + leaves[4].buf.byteLength),
112+
BigInt(leaves[5].buf.byteLength),
113+
BigInt(leaves[6].buf.byteLength)
114+
]
115+
})
116+
117+
const rootNode = {
118+
Data: unixfs.marshal(),
119+
Links: [{
120+
Name: '',
121+
Hash: leaves[0].cid,
122+
Tsize: leaves[0].buf.byteLength
123+
}, {
124+
Name: '',
125+
Hash: intermediateNode2Cid,
126+
Tsize: intermediateNode2Buf.byteLength
127+
}, {
128+
Name: '',
129+
Hash: leaves[5].cid,
130+
Tsize: leaves[5].buf.byteLength
131+
}, {
132+
Name: '',
133+
Hash: leaves[6].cid,
134+
Tsize: leaves[6].buf.byteLength
135+
}]
136+
}
137+
138+
const rootBuf = dagPb.encode(rootNode)
139+
const rootCid = await storeBlock(rootBuf, dagPb.code)
140+
const exported = await exporter(rootCid, block)
141+
142+
if (exported.type !== 'file') {
143+
throw new Error('Unexpected type')
144+
}
145+
146+
const data = uint8ArrayConcat(await all(exported.content()))
147+
expect(data).to.deep.equal(concat(
148+
leaves.map(l => l.buf)
149+
))
150+
})
151+
152+
it('exports a very deep DAG', async () => {
153+
if (!isNode) {
154+
// browsers are quite slow so only run on node
155+
return
156+
}
157+
158+
const buf: Uint8Array = randomBytes(5)
159+
let child = {
160+
cid: await storeBlock(buf, raw.code),
161+
buf
162+
}
163+
164+
// create a very deep DAG:
165+
//
166+
// root
167+
// \
168+
// *
169+
// \
170+
// *
171+
// \
172+
// ... many nodes here
173+
// \
174+
// 0
175+
let rootCid: CID | undefined
176+
177+
for (let i = 0; i < 100000; i++) {
178+
const parent = {
179+
Data: new UnixFS({
180+
type: 'file',
181+
blockSizes: [
182+
BigInt(buf.byteLength)
183+
]
184+
}).marshal(),
185+
Links: [{
186+
Name: '',
187+
Hash: child.cid,
188+
Tsize: child.buf.byteLength
189+
}]
190+
}
191+
192+
const parentBuf = dagPb.encode(parent)
193+
rootCid = await storeBlock(parentBuf, dagPb.code)
194+
195+
child = {
196+
cid: rootCid,
197+
buf: parentBuf
198+
}
199+
}
200+
201+
if (rootCid == null) {
202+
throw new Error('Root CID not set')
203+
}
204+
205+
const exported = await exporter(rootCid, block)
206+
207+
if (exported.type !== 'file') {
208+
throw new Error('Unexpected type')
209+
}
210+
211+
const data = uint8ArrayConcat(await all(exported.content()))
212+
expect(data).to.deep.equal(buf)
213+
})
214+
})

0 commit comments

Comments
 (0)