Skip to content

Strict DFS traversal #359

Closed
Closed
@guanzo

Description

@guanzo

The Trustless Gateway spec is being productionized by Saturn, Daghaus and possibly others. It's important for DAG traversal clients to be able to consume CARs with blocks in DFS order as this is optimal for streaming and incremental verification. While DFS isn't required by the spec, it seems to be the preferred traversal order as it's the only explicitly mentioned order, the other being "unknown".

This is the section that performs BFS, there may be more though.

for (let i = 0; i < node.Links.length; i++) {
const childLink = node.Links[i]
const childStart = streamPosition // inclusive
const childEnd = childStart + file.blockSizes[i] // exclusive
if ((start >= childStart && start < childEnd) || // child has offset byte
(end >= childStart && end <= childEnd) || // child has end byte
(start < childStart && end > childEnd)) { // child is between offset and end bytes
childOps.push({
link: childLink,
blockStart: streamPosition
})
}
streamPosition = childEnd
if (streamPosition > end) {
break
}
}
await pipe(
childOps,
(source) => map(source, (op) => {
return async () => {
const block = await blockstore.get(op.link.Hash, options)
return {
...op,
block
}
}
}),
(source) => parallel(source, {
ordered: true
}),
async (source) => {
for await (const { link, block, blockStart } of source) {
let child: dagPb.PBNode | Uint8Array
switch (link.Hash.code) {
case dagPb.code:
child = dagPb.decode(block)
break
case raw.code:
child = block
break
default:
queue.end(errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS'))
return
}
// create a queue for this child - we use a queue instead of recursion
// to avoid overflowing the stack
const childQueue = new PQueue({
concurrency: 1
})
// if any of the child jobs error, end the read queue with the error
childQueue.on('error', error => {
queue.end(error)
})
// if the job rejects the 'error' event will be emitted on the child queue
void childQueue.add(async () => {
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
cid: link.Hash
}))
await walkDAG(blockstore, child, queue, blockStart, start, end, options)
})
// wait for this child to complete before moving on to the next
await childQueue.onIdle()
}
}
)

Metadata

Metadata

Assignees

No one assigned

    Labels

    need/triageNeeds initial labeling and prioritization

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions