From 024c1d57d220e77877fd6e45167b2347f27b56ac Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 13 Jan 2020 16:04:46 +0000 Subject: [PATCH 1/2] feat: allow overriding of internal functions The interesting bits of this module are the various ways of building DAGs for files and folders (flat, sharded, etc). Sometimes we want to utilise bits of this logic to build DAGs without having to reimplement big chunks of this module. This PR allows the user to pass in functions to replace key parts of this import pipeline. Enables: * https://github.com/ipfs/js-ipfs-mfs/pull/73 * https://github.com/ipfs/js-ipfs-unixfs-importer/pull/46 --- README.md | 21 +++++++ src/dag-builder/dir.js | 2 +- src/dag-builder/file/buffer-importer.js | 50 +++++++++++++++++ src/dag-builder/file/index.js | 59 +++++--------------- src/dag-builder/index.js | 22 ++++++-- src/dir-flat.js | 4 +- src/dir-sharded.js | 8 +-- src/index.js | 22 +++++++- test/chunker-custom.spec.js | 73 +++++++++++++++++++++++++ test/importer.spec.js | 61 +++++++++++++++++++++ 10 files changed, 262 insertions(+), 60 deletions(-) create mode 100644 src/dag-builder/file/buffer-importer.js create mode 100644 test/chunker-custom.spec.js diff --git a/README.md b/README.md index 674f3e4..3c020c0 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ - [Example](#example) - [API](#api) - [const import = importer(source, ipld [, options])](#const-import--importersource-ipld--options) +- [Overriding internals](#overriding-internals) - [Contribute](#contribute) - [License](#license) @@ -145,8 +146,28 @@ The input's file paths and directory structure will be preserved in the [`dag-pb - `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50). - `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50). +## Overriding internals + +Several aspects of the importer are overridable by specifying functions as part of the options object with these keys: + +- `chunkValidator` (function): Optional function that supports the signature `async function * (source, options)` + - It should yield `Buffer` objects constructed from the `source` or throw an `Error` +- `chunker` (function): Optional function that supports the signature `async function * (source, options)` where `source` is an async generator and `options` is an options object + - It should yield `Buffer` objects. +- `bufferImporter` (function): Optional function that supports the signature `async function * (entry, source, ipld, options)` + - `entry` is the `{ path, content }` entry, `source` is an async generator that yields Buffers + - It should yield functions that return a Promise that resolves to an object with the properties `{ cid, unixfs, size }` where `cid` is a [CID], `unixfs` is a [UnixFS] entry and `size` is a `Number` that represents the serialized size of the [IPLD] node that holds the buffer data. + - Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `blockWriteConcurrency` option (default: 10) +- `dagBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)` + - It should yield a `function` that returns a `Promise` that resolves to `{ cid, path, unixfs, node }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `node` is a `DAGNode`. + - Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `fileImportConcurrency` option (default: 50) +- `treeBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)` + - It should yield an object with the properties `{ cid, path, unixfs, size }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `size` is a `Number`. + [ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver [UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs +[IPLD]: https://github.com/ipld/js-ipld +[CID]: https://github.com/multiformats/js-cid ## Contribute diff --git a/src/dag-builder/dir.js b/src/dag-builder/dir.js index c3f8381..42cce15 100644 --- a/src/dag-builder/dir.js +++ b/src/dag-builder/dir.js @@ -21,7 +21,7 @@ const dirBuilder = async (item, ipld, options) => { cid, path, unixfs, - node + size: node.size } } diff --git a/src/dag-builder/file/buffer-importer.js b/src/dag-builder/file/buffer-importer.js new file mode 100644 index 0000000..88d89bd --- /dev/null +++ b/src/dag-builder/file/buffer-importer.js @@ -0,0 +1,50 @@ +'use strict' + +const UnixFS = require('ipfs-unixfs') +const persist = require('../../utils/persist') +const { + DAGNode +} = require('ipld-dag-pb') + +async function * bufferImporter (file, source, ipld, options) { + for await (const buffer of source) { + yield async () => { + options.progress(buffer.length) + let node + let unixfs + let size + + const opts = { + ...options + } + + if (options.rawLeaves) { + node = buffer + size = buffer.length + + opts.codec = 'raw' + opts.cidVersion = 1 + } else { + unixfs = new UnixFS({ + type: options.leafType, + data: buffer, + mtime: file.mtime, + mode: file.mode + }) + + node = new DAGNode(unixfs.marshal()) + size = node.size + } + + const cid = await persist(node, ipld, opts) + + return { + cid: cid, + unixfs, + size + } + } + } +} + +module.exports = bufferImporter diff --git a/src/dag-builder/file/index.js b/src/dag-builder/file/index.js index f9df4fd..0f32c33 100644 --- a/src/dag-builder/file/index.js +++ b/src/dag-builder/file/index.js @@ -16,49 +16,18 @@ const dagBuilders = { trickle: require('./trickle') } -async function * importBuffer (file, source, ipld, options) { - for await (const buffer of source) { - yield async () => { - options.progress(buffer.length) - let node - let unixfs - - const opts = { - ...options - } - - if (options.rawLeaves) { - node = buffer - - opts.codec = 'raw' - opts.cidVersion = 1 - } else { - unixfs = new UnixFS({ - type: options.leafType, - data: buffer, - mtime: file.mtime, - mode: file.mode - }) - - node = new DAGNode(unixfs.marshal()) - } - - const cid = await persist(node, ipld, opts) - - return { - cid: cid, - unixfs, - node - } - } - } -} - async function * buildFileBatch (file, source, ipld, options) { let count = -1 let previous + let bufferImporter + + if (typeof options.bufferImporter === 'function') { + bufferImporter = options.bufferImporter + } else { + bufferImporter = require('./buffer-importer') + } - for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) { + for await (const entry of parallelBatch(bufferImporter(file, source, ipld, options), options.blockWriteConcurrency)) { count++ if (count === 0) { @@ -86,9 +55,8 @@ const reduce = (file, ipld, options) => { return { cid: leaf.cid, path: file.path, - name: (file.path || '').split('/').pop(), unixfs: leaf.unixfs, - node: leaf.node + size: leaf.size } } @@ -101,7 +69,7 @@ const reduce = (file, ipld, options) => { const links = leaves .filter(leaf => { - if (leaf.cid.codec === 'raw' && leaf.node.length) { + if (leaf.cid.codec === 'raw' && leaf.size) { return true } @@ -114,9 +82,9 @@ const reduce = (file, ipld, options) => { .map((leaf) => { if (leaf.cid.codec === 'raw') { // node is a leaf buffer - f.addBlockSize(leaf.node.length) + f.addBlockSize(leaf.size) - return new DAGLink(leaf.name, leaf.node.length, leaf.cid) + return new DAGLink(leaf.name, leaf.size, leaf.cid) } if (!leaf.unixfs.data) { @@ -127,7 +95,7 @@ const reduce = (file, ipld, options) => { f.addBlockSize(leaf.unixfs.data.length) } - return new DAGLink(leaf.name, leaf.node.size, leaf.cid) + return new DAGLink(leaf.name, leaf.size, leaf.cid) }) const node = new DAGNode(f.marshal(), links) @@ -137,7 +105,6 @@ const reduce = (file, ipld, options) => { cid, path: file.path, unixfs: f, - node, size: node.size } } diff --git a/src/dag-builder/index.js b/src/dag-builder/index.js index bc2f4b2..a55888d 100644 --- a/src/dag-builder/index.js +++ b/src/dag-builder/index.js @@ -2,8 +2,6 @@ const dirBuilder = require('./dir') const fileBuilder = require('./file') -const createChunker = require('../chunker') -const validateChunks = require('./validate-chunks') async function * dagBuilder (source, ipld, options) { for await (const entry of source) { @@ -30,10 +28,26 @@ async function * dagBuilder (source, ipld, options) { } } - const chunker = createChunker(options.chunker, validateChunks(source), options) + let chunker + + if (typeof options.chunker === 'function') { + chunker = options.chunker + } else if (options.chunker === 'rabin') { + chunker = require('../chunker/rabin') + } else { + chunker = require('../chunker/fixed-size') + } + + let chunkValidator + + if (typeof options.chunkValidator === 'function') { + chunkValidator = options.chunkValidator + } else { + chunkValidator = require('./validate-chunks') + } // item is a file - yield () => fileBuilder(entry, chunker, ipld, options) + yield () => fileBuilder(entry, chunker(chunkValidator(source, options), options), ipld, options) } else { // item is a directory yield () => dirBuilder(entry, ipld, options) diff --git a/src/dir-flat.js b/src/dir-flat.js index 266901b..5086604 100644 --- a/src/dir-flat.js +++ b/src/dir-flat.js @@ -65,7 +65,7 @@ class DirFlat extends Dir { } } - links.push(new DAGLink(children[i], child.node.length || child.node.size, child.cid)) + links.push(new DAGLink(children[i], child.size, child.cid)) } const unixfs = new UnixFS({ @@ -84,7 +84,7 @@ class DirFlat extends Dir { cid, unixfs, path, - node + size: node.size } } } diff --git a/src/dir-sharded.js b/src/dir-sharded.js index 26065c7..e295984 100644 --- a/src/dir-sharded.js +++ b/src/dir-sharded.js @@ -107,7 +107,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) { shard = subShard } - links.push(new DAGLink(labelPrefix, shard.node.size, shard.cid)) + links.push(new DAGLink(labelPrefix, shard.size, shard.cid)) } else if (typeof child.value.flush === 'function') { const dir = child.value let flushedDir @@ -119,7 +119,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) { } const label = labelPrefix + child.key - links.push(new DAGLink(label, flushedDir.node.size, flushedDir.cid)) + links.push(new DAGLink(label, flushedDir.size, flushedDir.cid)) } else { const value = child.value @@ -155,8 +155,8 @@ async function * flush (path, bucket, ipld, shardRoot, options) { yield { cid, - node, unixfs: dir, - path + path, + size: node.size } } diff --git a/src/index.js b/src/index.js index e1240e9..052acff 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,5 @@ 'use strict' -const dagBuilder = require('./dag-builder') -const treeBuilder = require('./tree-builder') const parallelBatch = require('it-parallel-batch') const mergeOptions = require('merge-options').bind({ ignoreUndefined: true }) @@ -30,7 +28,9 @@ const defaultOptions = { pin: true, recursive: false, hidden: false, - preload: true + preload: true, + chunkValidator: null, + importBuffer: null } module.exports = async function * (source, ipld, options = {}) { @@ -58,6 +58,22 @@ module.exports = async function * (source, ipld, options = {}) { opts.codec = options.format } + let dagBuilder + + if (typeof options.dagBuilder === 'function') { + dagBuilder = options.dagBuilder + } else { + dagBuilder = require('./dag-builder') + } + + let treeBuilder + + if (typeof options.treeBuilder === 'function') { + treeBuilder = options.treeBuilder + } else { + treeBuilder = require('./tree-builder') + } + for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) { yield { cid: entry.cid, diff --git a/test/chunker-custom.spec.js b/test/chunker-custom.spec.js new file mode 100644 index 0000000..2dd00f0 --- /dev/null +++ b/test/chunker-custom.spec.js @@ -0,0 +1,73 @@ +/* eslint-env mocha */ +'use strict' + +const importer = require('../src') + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const IPLD = require('ipld') +const inMemory = require('ipld-in-memory') +const mc = require('multicodec') + +// eslint bug https://github.com/eslint/eslint/issues/12459 +// eslint-disable-next-line require-await +const iter = async function * () { + yield Buffer.from('one') + yield Buffer.from('two') +} + +describe('custom chunker', function () { + let inmem + + const fromPartsTest = (iter, size) => async () => { + for await (const part of importer([{ + content: iter() + }], inmem, { + chunkValidator: source => source, + chunker: source => source, + bufferImporter: async function * (file, source, ipld, options) { + for await (const item of source) { + yield () => Promise.resolve(item) + } + } + })) { + expect(part.size).to.equal(size) + } + } + + before(async () => { + inmem = await inMemory(IPLD) + }) + + it('keeps custom chunking', async () => { + const chunker = source => source + const content = iter() + for await (const part of importer([{ path: 'test', content }], inmem, { + chunker + })) { + expect(part.size).to.equal(116) + } + }) + + // eslint bug https://github.com/eslint/eslint/issues/12459 + const multi = async function * () { + yield { + size: 11, + cid: await inmem.put(Buffer.from('hello world'), mc.RAW) + } + yield { + size: 11, + cid: await inmem.put(Buffer.from('hello world'), mc.RAW) + } + } + it('works with multiple parts', fromPartsTest(multi, 120)) + + const single = async function * () { + yield { + size: 11, + cid: await inmem.put(Buffer.from('hello world'), mc.RAW) + } + } + it('works with single part', fromPartsTest(single, 11)) +}) diff --git a/test/importer.spec.js b/test/importer.spec.js index 50bef67..cf6784e 100644 --- a/test/importer.spec.js +++ b/test/importer.spec.js @@ -924,3 +924,64 @@ strategies.forEach((strategy) => { }) }) }) + +describe('configuration', () => { + it('alllows configuring with custom dag and tree builder', async () => { + let builtTree = false + const ipld = 'ipld' + const entries = await all(importer([{ + path: 'path', + content: 'content' + }], ipld, { + dagBuilder: async function * (source, ipld, opts) { // eslint-disable-line require-await + yield function () { + return Promise.resolve({ + cid: 'cid', + path: 'path', + unixfs: 'unixfs' + }) + } + }, + treeBuilder: async function * (source, ipld, opts) { // eslint-disable-line require-await + builtTree = true + yield * source + } + })) + + expect(entries).to.have.lengthOf(1) + expect(entries).to.have.nested.property('[0].cid', 'cid') + expect(entries).to.have.nested.property('[0].path', 'path') + expect(entries).to.have.nested.property('[0].unixfs', 'unixfs') + + expect(builtTree).to.be.true() + }) + + it('alllows configuring with custom chunker', async () => { + let validated = false + let chunked = false + const ipld = { + put: () => 'cid' + } + const entries = await all(importer([{ + path: 'path', + content: 'content' + }], ipld, { + chunkValidator: async function * (source, opts) { // eslint-disable-line require-await + validated = true + yield * source + }, + chunker: async function * (source, opts) { // eslint-disable-line require-await + chunked = true + yield * source + } + })) + + expect(entries).to.have.lengthOf(1) + expect(entries).to.have.nested.property('[0].cid', 'cid') + expect(entries).to.have.nested.property('[0].path', 'path') + expect(entries).to.have.nested.property('[0].unixfs') + + expect(validated).to.be.true() + expect(chunked).to.be.true() + }) +}) From 3086865e5536f0ae6e45d327a7ad0d055d95fd53 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 15 Jan 2020 10:41:24 +0000 Subject: [PATCH 2/2] docs: add description about internal funcs --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 3c020c0..da32517 100644 --- a/README.md +++ b/README.md @@ -151,17 +151,21 @@ The input's file paths and directory structure will be preserved in the [`dag-pb Several aspects of the importer are overridable by specifying functions as part of the options object with these keys: - `chunkValidator` (function): Optional function that supports the signature `async function * (source, options)` + - This function takes input from the `content` field of imported entries. It should transform them into `Buffer`s, throwing an error if it cannot. - It should yield `Buffer` objects constructed from the `source` or throw an `Error` - `chunker` (function): Optional function that supports the signature `async function * (source, options)` where `source` is an async generator and `options` is an options object - It should yield `Buffer` objects. - `bufferImporter` (function): Optional function that supports the signature `async function * (entry, source, ipld, options)` + - This function should read `Buffer`s from `source` and persist them using `ipld.put` or similar - `entry` is the `{ path, content }` entry, `source` is an async generator that yields Buffers - It should yield functions that return a Promise that resolves to an object with the properties `{ cid, unixfs, size }` where `cid` is a [CID], `unixfs` is a [UnixFS] entry and `size` is a `Number` that represents the serialized size of the [IPLD] node that holds the buffer data. - Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `blockWriteConcurrency` option (default: 10) - `dagBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)` + - This function should read `{ path, content }` entries from `source` and turn them into DAGs - It should yield a `function` that returns a `Promise` that resolves to `{ cid, path, unixfs, node }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `node` is a `DAGNode`. - Values will be pulled from this generator in parallel - the amount of parallelisation is controlled by the `fileImportConcurrency` option (default: 50) - `treeBuilder` (function): Optional function that supports the signature `async function * (source, ipld, options)` + - This function should read `{ cid, path, unixfs, node }` entries from `source` and place them in a directory structure - It should yield an object with the properties `{ cid, path, unixfs, size }` where `cid` is a `CID`, `path` is a string, `unixfs` is a UnixFS entry and `size` is a `Number`. [ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver