Skip to content

fix: reduce required number of blockstore methods #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions packages/ipfs-unixfs-exporter/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ export interface ShardTraversalContext {
lastBucket: Bucket<boolean>
}

export interface BlockstoreOptions {
signal?: AbortSignal
}

export type Blockstore = Pick<InterfaceBlockstore, 'has' | 'put' | 'get'>
export type Blockstore = Pick<InterfaceBlockstore, 'get'>

const toPathComponents = (path: string = ''): string[] => {
// split on / unless escaped with \
Expand Down
15 changes: 8 additions & 7 deletions packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ export interface BufferImporterOptions extends ProgressOptions<BufferImportProgr
}

export function defaultBufferImporter (options: BufferImporterOptions): BufferImporter {
return async function * bufferImporter (file, block) {
for await (let buffer of file.content) {
return async function * bufferImporter (file, blockstore) {
for await (let block of file.content) {
yield async () => {
options.onProgress?.(new CustomProgressEvent<ImportProgressData>('unixfs:importer:progress', { bytes: buffer.length, path: file.path }))
options.onProgress?.(new CustomProgressEvent<ImportProgressData>('unixfs:importer:progress', { bytes: block.byteLength, path: file.path }))
let unixfs

const opts: PersistOptions = {
Expand All @@ -50,19 +50,20 @@ export function defaultBufferImporter (options: BufferImporterOptions): BufferIm
} else {
unixfs = new UnixFS({
type: options.leafType,
data: buffer
data: block
})

buffer = dagPb.encode({
block = dagPb.encode({
Data: unixfs.marshal(),
Links: []
})
}

return {
cid: await persist(buffer, block, opts),
cid: await persist(block, blockstore, opts),
unixfs,
size: BigInt(buffer.length)
size: BigInt(block.length),
block
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions packages/ipfs-unixfs-importer/src/dag-builder/dir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ export const dirBuilder = async (dir: Directory, blockstore: Blockstore, options
mode: dir.mode
})

const buffer = encode(prepare({ Data: unixfs.marshal() }))
const cid = await persist(buffer, blockstore, options)
const block = encode(prepare({ Data: unixfs.marshal() }))
const cid = await persist(block, blockstore, options)
const path = dir.path

return {
cid,
path,
unixfs,
size: BigInt(buffer.length),
originalPath: dir.originalPath
size: BigInt(block.length),
originalPath: dir.originalPath,
block
}
}
73 changes: 35 additions & 38 deletions packages/ipfs-unixfs-importer/src/dag-builder/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { persist } from '../utils/persist.js'
import { encode, PBLink, prepare } from '@ipld/dag-pb'
import parallelBatch from 'it-parallel-batch'
import * as rawCodec from 'multiformats/codecs/raw'
import type { BufferImporter, File, InProgressImportResult, Blockstore } from '../index.js'
import type { BufferImporter, File, InProgressImportResult, Blockstore, SingleBlockImportResult } from '../index.js'
import type { FileLayout, Reducer } from '../layout/index.js'
import type { Version } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'
Expand All @@ -15,24 +15,37 @@ interface BuildFileBatchOptions {

async function * buildFileBatch (file: File, blockstore: Blockstore, options: BuildFileBatchOptions): AsyncGenerator<InProgressImportResult> {
let count = -1
let previous: InProgressImportResult | undefined
let previous: SingleBlockImportResult | undefined

for await (const entry of parallelBatch(options.bufferImporter(file, blockstore), options.blockWriteConcurrency)) {
count++

if (count === 0) {
previous = entry
// cache the first entry if case there aren't any more
previous = {
...entry,
single: true
}

continue
} else if (count === 1 && (previous != null)) {
yield previous
// we have the second block of a multiple block import so yield the first
yield {
...previous,
block: undefined,
single: undefined
}
previous = undefined
}

yield entry
// yield the second or later block of a multiple block import
yield {
...entry,
block: undefined
}
}

if (previous != null) {
previous.single = true
yield previous
}
}
Expand All @@ -43,49 +56,32 @@ interface ReduceOptions extends ProgressOptions {
signal?: AbortSignal
}

function isSingleBlockImport (result: any): result is SingleBlockImportResult {
return result.single === true
}

const reduce = (file: File, blockstore: Blockstore, options: ReduceOptions): Reducer => {
const reducer: Reducer = async function (leaves) {
if (leaves.length === 1 && leaves[0]?.single === true && options.reduceSingleLeafToSelf) {
if (leaves.length === 1 && isSingleBlockImport(leaves[0]) && options.reduceSingleLeafToSelf) {
const leaf = leaves[0]

if (file.mtime !== undefined || file.mode !== undefined) {
if (isSingleBlockImport(leaf) && (file.mtime !== undefined || file.mode !== undefined)) {
// only one leaf node which is a raw leaf - we have metadata so convert it into a
// UnixFS entry otherwise we'll have nowhere to store the metadata
let buffer = await blockstore.get(leaf.cid, options)

leaf.unixfs = new UnixFS({
type: 'file',
mtime: file.mtime,
mode: file.mode,
data: buffer
data: leaf.block
})

buffer = encode(prepare({ Data: leaf.unixfs.marshal() }))

// // TODO vmx 2021-03-26: This is what the original code does, it checks
// // the multihash of the original leaf node and uses then the same
// // hasher. i wonder if that's really needed or if we could just use
// // the hasher from `options.hasher` instead.
// const multihash = mh.decode(leaf.cid.multihash.bytes)
// let hasher
// switch multihash {
// case sha256.code {
// hasher = sha256
// break;
// }
// //case identity.code {
// // hasher = identity
// // break;
// //}
// default: {
// throw new Error(`Unsupported hasher "${multihash}"`)
// }
// }
leaf.cid = await persist(buffer, blockstore, {
leaf.block = encode(prepare({ Data: leaf.unixfs.marshal() }))

leaf.cid = await persist(leaf.block, blockstore, {
...options,
cidVersion: options.cidVersion
})
leaf.size = BigInt(buffer.length)
leaf.size = BigInt(leaf.block.length)
}

return {
Expand Down Expand Up @@ -147,15 +143,16 @@ const reduce = (file: File, blockstore: Blockstore, options: ReduceOptions): Red
Data: f.marshal(),
Links: links
}
const buffer = encode(prepare(node))
const cid = await persist(buffer, blockstore, options)
const block = encode(prepare(node))
const cid = await persist(block, blockstore, options)

return {
cid,
path: file.path,
unixfs: f,
size: BigInt(buffer.length + node.Links.reduce((acc, curr) => acc + (curr.Tsize ?? 0), 0)),
originalPath: file.originalPath
size: BigInt(block.length + node.Links.reduce((acc, curr) => acc + (curr.Tsize ?? 0), 0)),
originalPath: file.originalPath,
block
}
}

Expand Down
15 changes: 9 additions & 6 deletions packages/ipfs-unixfs-importer/src/dir-flat.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { encode, PBNode, prepare } from '@ipld/dag-pb'
import type { Blockstore } from 'interface-blockstore'
import { UnixFS } from 'ipfs-unixfs'
import type { CID } from 'multiformats/cid'
import { Dir, CID_V0, CID_V1, DirProps } from './dir.js'
import type { ImportResult, InProgressImportResult } from './index.js'
import { persist, PersistOptions } from './utils/persist.js'
Expand Down Expand Up @@ -68,20 +69,22 @@ export class DirFlat extends Dir {
async * flush (block: Blockstore): AsyncGenerator<ImportResult> {
const links = []

for (let [name, child] of this._children.entries()) {
for (const [name, child] of this._children.entries()) {
let result: { size?: bigint | number, cid?: CID } = child

if (child instanceof Dir) {
for await (const entry of child.flush(block)) {
child = entry
result = entry

yield child
yield entry
}
}

if (child.size != null && (child.cid != null)) {
if (result.size != null && (result.cid != null)) {
links.push({
Name: name,
Tsize: Number(child.size),
Hash: child.cid
Tsize: Number(result.size),
Hash: result.cid
})
}
}
Expand Down
23 changes: 15 additions & 8 deletions packages/ipfs-unixfs-importer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ import type { ProgressOptions } from 'progress-events'
export type ByteStream = AwaitIterable<Uint8Array>
export type ImportContent = ByteStream | Uint8Array

export interface BlockstoreOptions {
signal?: AbortSignal
}

export type Blockstore = Pick<InterfaceBlockstore, 'has' | 'put' | 'get'>
export type Blockstore = Pick<InterfaceBlockstore, 'put'>

export interface FileCandidate {
path?: string
Expand Down Expand Up @@ -60,14 +56,25 @@ export interface ImportResult {
unixfs?: UnixFS
}

export interface InProgressImportResult extends ImportResult {
single?: boolean
export interface MultipleBlockImportResult extends ImportResult {
originalPath?: string
}

export interface SingleBlockImportResult extends ImportResult {
single: true
originalPath?: string
block: Uint8Array
}

export type InProgressImportResult = SingleBlockImportResult | MultipleBlockImportResult

export interface BufferImporterResult extends ImportResult {
block: Uint8Array
}

export interface HamtHashFn { (value: Uint8Array): Promise<Uint8Array> }
export interface TreeBuilder { (source: AsyncIterable<InProgressImportResult>, blockstore: Blockstore): AsyncIterable<ImportResult> }
export interface BufferImporter { (file: File, blockstore: Blockstore): AsyncIterable<() => Promise<InProgressImportResult>> }
export interface BufferImporter { (file: File, blockstore: Blockstore): AsyncIterable<() => Promise<BufferImporterResult>> }

export type ImportProgressEvents =
BufferImportProgressEvents
Expand Down
12 changes: 8 additions & 4 deletions packages/ipfs-unixfs-importer/test/builder-balanced.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ describe('builder: balanced', () => {
it('reduces one value into itself', async () => {
const source = [{
cid: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn'),
size: 0n
size: 0n,
block: Uint8Array.from([])
}]

const result = await balanced(options)((async function * () {
Expand All @@ -37,13 +38,16 @@ describe('builder: balanced', () => {
it('reduces 3 values into parent', async () => {
const source = [{
cid: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn'),
size: 0n
size: 0n,
block: Uint8Array.from([])
}, {
cid: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn'),
size: 0n
size: 0n,
block: Uint8Array.from([])
}, {
cid: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn'),
size: 0n
size: 0n,
block: Uint8Array.from([])
}]

const result = await balanced(options)((async function * () {
Expand Down
5 changes: 3 additions & 2 deletions packages/ipfs-unixfs-importer/test/chunker-custom.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('custom chunker', function () {
const block = new MemoryBlockstore()

const fromPartsTest = (content: AsyncIterable<Uint8Array>, size: bigint) => async () => {
const put = async (buf: Uint8Array): Promise<{ cid: CID, size: bigint, unixfs: UnixFS }> => {
const put = async (buf: Uint8Array): Promise<{ cid: CID, size: bigint, unixfs: UnixFS, block: Uint8Array }> => {
const encodedBlock = await Block.encode({
value: buf,
codec: rawCodec,
Expand All @@ -29,7 +29,8 @@ describe('custom chunker', function () {
return {
cid: encodedBlock.cid,
size: BigInt(buf.length),
unixfs: new UnixFS()
unixfs: new UnixFS(),
block: buf
}
}

Expand Down