@@ -8,6 +8,7 @@ import { pushable } from 'it-pushable'
8
8
import parallel from 'it-parallel'
9
9
import { pipe } from 'it-pipe'
10
10
import map from 'it-map'
11
+ import PQueue from 'p-queue'
11
12
12
13
/**
13
14
* @typedef {import('../../../types').ExporterOptions } ExporterOptions
@@ -19,14 +20,15 @@ import map from 'it-map'
19
20
/**
20
21
* @param {Blockstore } blockstore
21
22
* @param {PBNode | Uint8Array } node
22
- * @param {import('it-pushable').Pushable<Uint8Array | undefined > } queue
23
+ * @param {import('it-pushable').Pushable<Uint8Array> } queue
23
24
* @param {number } streamPosition
24
25
* @param {number } start
25
26
* @param {number } end
27
+ * @param {PQueue } walkQueue
26
28
* @param {ExporterOptions } options
27
29
* @returns {Promise<void> }
28
30
*/
29
- async function walkDAG ( blockstore , node , queue , streamPosition , start , end , options ) {
31
+ async function walkDAG ( blockstore , node , queue , streamPosition , start , end , walkQueue , options ) {
30
32
// a `raw` node
31
33
if ( node instanceof Uint8Array ) {
32
34
queue . push ( extractDataFromBlock ( node , streamPosition , start , end ) )
@@ -100,19 +102,23 @@ async function walkDAG (blockstore, node, queue, streamPosition, start, end, opt
100
102
} ) ,
101
103
async ( source ) => {
102
104
for await ( const { link, block, blockStart } of source ) {
105
+ /** @type {PBNode | Uint8Array } */
103
106
let child
104
107
switch ( link . Hash . code ) {
105
108
case dagPb . code :
106
- child = await dagPb . decode ( block )
109
+ child = dagPb . decode ( block )
107
110
break
108
111
case raw . code :
109
112
child = block
110
113
break
111
114
default :
112
- throw errCode ( new Error ( `Unsupported codec: ${ link . Hash . code } ` ) , 'ERR_NOT_UNIXFS' )
115
+ queue . end ( errCode ( new Error ( `Unsupported codec: ${ link . Hash . code } ` ) , 'ERR_NOT_UNIXFS' ) )
116
+ return
113
117
}
114
118
115
- await walkDAG ( blockstore , child , queue , blockStart , start , end , options )
119
+ walkQueue . add ( async ( ) => {
120
+ await walkDAG ( blockstore , child , queue , blockStart , start , end , walkQueue , options )
121
+ } )
116
122
}
117
123
}
118
124
)
@@ -141,14 +147,20 @@ const fileContent = (cid, node, unixfs, path, resolve, depth, blockstore) => {
141
147
return
142
148
}
143
149
144
- const queue = pushable ( {
145
- objectMode : true
150
+ // use a queue to walk the DAG instead of recursion to ensure very deep DAGs
151
+ // don't overflow the stack
152
+ const walkQueue = new PQueue ( {
153
+ concurrency : 1
146
154
} )
155
+ const queue = pushable ( )
147
156
148
- walkDAG ( blockstore , node , queue , 0 , offset , offset + length , options )
149
- . catch ( err => {
150
- queue . end ( err )
151
- } )
157
+ walkQueue . add ( async ( ) => {
158
+ await walkDAG ( blockstore , node , queue , 0 , offset , offset + length , walkQueue , options )
159
+ } )
160
+
161
+ walkQueue . on ( 'error' , error => {
162
+ queue . end ( error )
163
+ } )
152
164
153
165
let read = 0
154
166
0 commit comments