From 47ba6541e53885d80c92ea0da835d3b5dc3b60a5 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Nov 2016 12:53:48 +0000 Subject: [PATCH 1/4] feat: upgrade to latest dag-pb API --- src/importer/flush-tree.js | 87 ++++++++++++--------- src/importer/index.js | 151 +++++++++++++++++++------------------ test/test-importer.js | 4 +- 3 files changed, 130 insertions(+), 112 deletions(-) diff --git a/src/importer/flush-tree.js b/src/importer/flush-tree.js index e71395e7..f74b31b3 100644 --- a/src/importer/flush-tree.js +++ b/src/importer/flush-tree.js @@ -5,7 +5,8 @@ const UnixFS = require('ipfs-unixfs') const CID = require('cids') const dagPB = require('ipld-dag-pb') const mapValues = require('async/mapValues') -const parallel = require('async/parallel') +const series = require('async/series') +const each = require('async/each') const DAGLink = dagPB.DAGLink const DAGNode = dagPB.DAGNode @@ -121,43 +122,59 @@ function traverse (tree, sizeIndex, path, ipldResolver, source, done) { const keys = Object.keys(tree) - const ufsDir = new UnixFS('directory') - const node = new DAGNode(ufsDir.marshal()) - - keys.forEach((key) => { - const b58mh = mh.toB58String(tree[key]) - const link = new DAGLink(key, sizeIndex[b58mh], tree[key]) - node.addRawLink(link) - }) - - parallel([ - (cb) => node.multihash(cb), - (cb) => node.size(cb) - ], (err, res) => { + let n + + series([ + (cb) => { + const d = new UnixFS('directory') + DAGNode.create(d.marshal(), (err, node) => { + if (err) { + return cb(err) + } + n = node + cb() + }) + }, + (cb) => { + each(keys, (key, next) => { + const b58mh = mh.toB58String(tree[key]) + const link = new DAGLink(key, sizeIndex[b58mh], tree[key]) + + DAGNode.addLink(n, link, (err, node) => { + if (err) { + return next(err) + } + n = node + next() + }) + }, cb) + }, + (cb) => { + sizeIndex[mh.toB58String(n.multihash)] = n.size + + ipldResolver.put({ + node: n, + cid: new CID(n.multihash) + }, (err) => { + if (err) { + source.push(new Error('failed to store dirNode')) + return cb(err) + } + if (path) { + source.push({ + path: path, + multihash: n.multihash, + size: n.size + }) + } + cb() + }) + } + ], (err) => { if (err) { return done(err) } - - const multihash = res[0] - const size = res[1] - - sizeIndex[mh.toB58String(multihash)] = size - ipldResolver.put({ - node: node, - cid: new CID(multihash) - }, (err) => { - if (err) { - source.push(new Error('failed to store dirNode')) - } else if (path) { - source.push({ - path: path, - multihash: multihash, - size: size - }) - } - - done(null, multihash) - }) + done(null, n.multihash) }) }) } diff --git a/src/importer/index.js b/src/importer/index.js index a26ea662..dbee29a6 100644 --- a/src/importer/index.js +++ b/src/importer/index.js @@ -8,12 +8,13 @@ const pullWrite = require('pull-write') const parallel = require('async/parallel') const dagPB = require('ipld-dag-pb') const CID = require('cids') +const series = require('async/series') +const each = require('async/each') const fsc = require('./../chunker/fixed-size') const createAndStoreTree = require('./flush-tree') const DAGNode = dagPB.DAGNode -const DAGLink = dagPB.DAGLink const CHUNK_SIZE = 262144 @@ -71,49 +72,48 @@ function makeWriter (source, files, ipldResolver) { } } -function createAndStoreDir (item, ipldResolver, cb) { +function createAndStoreDir (item, ipldResolver, callback) { // 1. create the empty dir dag node // 2. write it to the dag store const d = new UnixFS('directory') - const n = new DAGNode() - n.data = d.marshal() + let n - n.multihash((err, multihash) => { - if (err) { - return cb(err) - } - - ipldResolver.put({ - node: n, - cid: new CID(multihash) - }, (err) => { - if (err) { - return cb(err) - } - - n.size((err, size) => { + series([ + (cb) => { + DAGNode.create(d.marshal(), (err, node) => { if (err) { return cb(err) } - - cb(null, { - path: item.path, - multihash: multihash, - size: size - }) + n = node + cb() }) + }, + (cb) => { + ipldResolver.put({ + node: n, + cid: new CID(n.multihash) + }, cb) + } + ], (err) => { + if (err) { + return callback(err) + } + callback(null, { + path: item.path, + multihash: n.multihash, + size: n.size }) }) } -function createAndStoreFile (file, ipldResolver, cb) { +function createAndStoreFile (file, ipldResolver, callback) { if (Buffer.isBuffer(file.content)) { file.content = pull.values([file.content]) } if (typeof file.content !== 'function') { - return cb(new Error('invalid content')) + return callback(new Error('invalid content')) } // 1. create the unixfs merkledag node @@ -128,44 +128,37 @@ function createAndStoreFile (file, ipldResolver, cb) { file.content, fsc(CHUNK_SIZE), pull.asyncMap((chunk, cb) => { - const l = new UnixFS('file', Buffer(chunk)) - const n = new DAGNode(l.marshal()) + const l = new UnixFS('file', new Buffer(chunk)) - n.multihash((err, multihash) => { + DAGNode.create(l.marshal(), (err, node) => { if (err) { return cb(err) } ipldResolver.put({ - node: n, - cid: new CID(multihash) + node: node, + cid: new CID(node.multihash) }, (err) => { if (err) { - return cb(new Error('Failed to store chunk')) + return cb(err) } - n.size((err, size) => { - if (err) { - return cb(err) - } - - cb(null, { - Hash: multihash, - Size: size, - leafSize: l.fileSize(), - Name: '' - }) + cb(null, { + Hash: node.multihash, + Size: node.size, + leafSize: l.fileSize(), + Name: '' }) }) }) }), pull.collect((err, leaves) => { if (err) { - return cb(err) + return callback(err) } if (leaves.length === 1) { - return cb(null, { + return callback(null, { path: file.path, multihash: leaves[0].Hash, size: leaves[0].Size @@ -175,41 +168,49 @@ function createAndStoreFile (file, ipldResolver, cb) { // create a parent node and add all the leafs const f = new UnixFS('file') - const n = new DAGNode() - - for (let leaf of leaves) { - f.addBlockSize(leaf.leafSize) - n.addRawLink( - new DAGLink(leaf.Name, leaf.Size, leaf.Hash) - ) - } + let n - n.data = f.marshal() - - n.multihash((err, multihash) => { - if (err) { - return cb(err) - } - - ipldResolver.put({ - node: n, - cid: new CID(multihash) - }, (err) => { - if (err) { - return cb(err) - } - - n.size((err, size) => { + series([ + (cb) => { + DAGNode.create(f.marshal(), (err, node) => { if (err) { return cb(err) } - - cb(null, { - path: file.path, - multihash: multihash, - size: size - }) + n = node + cb() }) + }, + (cb) => { + each(leaves, (leaf, next) => { + f.addBlockSize(leaf.leafSize) + DAGNode.addLink(n, { + name: leaf.Name, + size: leaf.Size, + multihash: leaf.Hash + }, (err, node) => { + if (err) { + return next(err) + } + n = node + next() + }) + }, cb) + }, + (cb) => { + ipldResolver.put({ + node: n, + cid: new CID(n.multihash) + }, cb) + } + ], (err) => { + if (err) { + return callback(err) + } + + callback(null, { + path: file.path, + multihash: n.multihash, + size: n.size }) }) }) diff --git a/test/test-importer.js b/test/test-importer.js index 98a1f6cf..34f5fe1d 100644 --- a/test/test-importer.js +++ b/test/test-importer.js @@ -19,8 +19,8 @@ function stringifyMh (files) { const bigFile = loadFixture(__dirname, 'fixtures/1.2MiB.txt') const smallFile = loadFixture(__dirname, 'fixtures/200Bytes.txt') -module.exports = function (repo) { - describe('importer', function () { +module.exports = (repo) => { + describe('importer', () => { let ipldResolver before(() => { From 48fa481ca6b198349c671156ba7f1b598cb814be Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 24 Nov 2016 11:29:14 +0000 Subject: [PATCH 2/4] fix: marshal unixfs file after adding the leaf sizes --- src/importer/index.js | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/importer/index.js b/src/importer/index.js index dbee29a6..3fcc1c46 100644 --- a/src/importer/index.js +++ b/src/importer/index.js @@ -172,7 +172,7 @@ function createAndStoreFile (file, ipldResolver, callback) { series([ (cb) => { - DAGNode.create(f.marshal(), (err, node) => { + DAGNode.create(new Buffer(0), (err, node) => { if (err) { return cb(err) } @@ -196,6 +196,15 @@ function createAndStoreFile (file, ipldResolver, callback) { }) }, cb) }, + (cb) => { + DAGNode.create(f.marshal(), n.links, (err, node) => { + if (err) { + return cb(err) + } + n = node + cb() + }) + }, (cb) => { ipldResolver.put({ node: n, From b219d4edc081f491c8595416aa2bebc98ea03d0f Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 24 Nov 2016 13:19:22 +0100 Subject: [PATCH 3/4] cleanup and exporter is passing --- src/exporter/dir.js | 2 +- src/exporter/file.js | 2 +- src/importer/flush-tree.js | 75 ++++++++++-------------------- src/importer/index.js | 93 +++++++++++--------------------------- 4 files changed, 54 insertions(+), 118 deletions(-) diff --git a/src/exporter/dir.js b/src/exporter/dir.js index dbb4d361..5a53c19a 100644 --- a/src/exporter/dir.js +++ b/src/exporter/dir.js @@ -24,7 +24,7 @@ module.exports = (node, name, ipldResolver) => { pull.values(node.links), pull.map((link) => ({ path: path.join(name, link.name), - hash: link.hash + hash: link.multihash })), paramap((item, cb) => ipldResolver.get(new CID(item.hash), (err, n) => { if (err) { diff --git a/src/exporter/file.js b/src/exporter/file.js index 0595efa7..9ec72cbb 100644 --- a/src/exporter/file.js +++ b/src/exporter/file.js @@ -20,7 +20,7 @@ module.exports = (node, name, ipldResolver) => { function visitor (node) { return pull( pull.values(node.links), - paramap((link, cb) => ipldResolver.get(new CID(link.hash), cb)) + paramap((link, cb) => ipldResolver.get(new CID(link.multihash), cb)) ) } diff --git a/src/importer/flush-tree.js b/src/importer/flush-tree.js index f74b31b3..4d3e89a7 100644 --- a/src/importer/flush-tree.js +++ b/src/importer/flush-tree.js @@ -5,9 +5,7 @@ const UnixFS = require('ipfs-unixfs') const CID = require('cids') const dagPB = require('ipld-dag-pb') const mapValues = require('async/mapValues') -const series = require('async/series') -const each = require('async/each') - +const waterfall = require('async/waterfall') const DAGLink = dagPB.DAGLink const DAGNode = dagPB.DAGNode @@ -121,60 +119,37 @@ function traverse (tree, sizeIndex, path, ipldResolver, source, done) { // return this new node multihash const keys = Object.keys(tree) + const dir = new UnixFS('directory') + const links = keys.map((key) => { + const b58mh = mh.toB58String(tree[key]) + return new DAGLink(key, sizeIndex[b58mh], tree[key]) + }) - let n - - series([ - (cb) => { - const d = new UnixFS('directory') - DAGNode.create(d.marshal(), (err, node) => { - if (err) { - return cb(err) - } - n = node - cb() - }) - }, - (cb) => { - each(keys, (key, next) => { - const b58mh = mh.toB58String(tree[key]) - const link = new DAGLink(key, sizeIndex[b58mh], tree[key]) - - DAGNode.addLink(n, link, (err, node) => { - if (err) { - return next(err) - } - n = node - next() - }) - }, cb) - }, - (cb) => { - sizeIndex[mh.toB58String(n.multihash)] = n.size + waterfall([ + (cb) => DAGNode.create(dir.marshal(), links, cb), + (node, cb) => { + sizeIndex[mh.toB58String(node.multihash)] = node.size ipldResolver.put({ - node: n, - cid: new CID(n.multihash) - }, (err) => { - if (err) { - source.push(new Error('failed to store dirNode')) - return cb(err) - } - if (path) { - source.push({ - path: path, - multihash: n.multihash, - size: n.size - }) - } - cb() - }) + node: node, + cid: new CID(node.multihash) + }, (err) => cb(err, node)) } - ], (err) => { + ], (err, node) => { if (err) { + source.push(new Error('failed to store dirNode')) return done(err) } - done(null, n.multihash) + + if (path) { + source.push({ + path: path, + multihash: node.multihash, + size: node.size + }) + } + + done(null, node.multihash) }) }) } diff --git a/src/importer/index.js b/src/importer/index.js index 3fcc1c46..7a3c5631 100644 --- a/src/importer/index.js +++ b/src/importer/index.js @@ -8,13 +8,13 @@ const pullWrite = require('pull-write') const parallel = require('async/parallel') const dagPB = require('ipld-dag-pb') const CID = require('cids') -const series = require('async/series') -const each = require('async/each') +const waterfall = require('async/waterfall') const fsc = require('./../chunker/fixed-size') const createAndStoreTree = require('./flush-tree') const DAGNode = dagPB.DAGNode +const DAGLink = dagPB.DAGLink const CHUNK_SIZE = 262144 @@ -77,32 +77,22 @@ function createAndStoreDir (item, ipldResolver, callback) { // 2. write it to the dag store const d = new UnixFS('directory') - let n - - series([ - (cb) => { - DAGNode.create(d.marshal(), (err, node) => { - if (err) { - return cb(err) - } - n = node - cb() - }) - }, - (cb) => { + waterfall([ + (cb) => DAGNode.create(d.marshal(), cb), + (node, cb) => { ipldResolver.put({ - node: n, - cid: new CID(n.multihash) - }, cb) + node: node, + cid: new CID(node.multihash) + }, (err) => cb(err, node)) } - ], (err) => { + ], (err, node) => { if (err) { return callback(err) } callback(null, { path: item.path, - multihash: n.multihash, - size: n.size + multihash: node.multihash, + size: node.size }) }) } @@ -166,60 +156,31 @@ function createAndStoreFile (file, ipldResolver, callback) { } // create a parent node and add all the leafs - const f = new UnixFS('file') - let n - - series([ - (cb) => { - DAGNode.create(new Buffer(0), (err, node) => { - if (err) { - return cb(err) - } - n = node - cb() - }) - }, - (cb) => { - each(leaves, (leaf, next) => { - f.addBlockSize(leaf.leafSize) - DAGNode.addLink(n, { - name: leaf.Name, - size: leaf.Size, - multihash: leaf.Hash - }, (err, node) => { - if (err) { - return next(err) - } - n = node - next() - }) - }, cb) - }, - (cb) => { - DAGNode.create(f.marshal(), n.links, (err, node) => { - if (err) { - return cb(err) - } - n = node - cb() - }) - }, - (cb) => { + + const links = leaves.map((leaf) => { + f.addBlockSize(leaf.leafSize) + + return new DAGLink(leaf.Name, leaf.Size, leaf.Hash) + }) + + waterfall([ + (cb) => DAGNode.create(f.marshal(), links, cb), + (node, cb) => { ipldResolver.put({ - node: n, - cid: new CID(n.multihash) - }, cb) + node: node, + cid: new CID(node.multihash) + }, (err) => cb(err, node)) } - ], (err) => { + ], (err, node) => { if (err) { return callback(err) } callback(null, { path: file.path, - multihash: n.multihash, - size: n.size + multihash: node.multihash, + size: node.size }) }) }) From d8db2fc2af2959805c3474b578f06ebf899cd2d6 Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 24 Nov 2016 14:55:22 +0000 Subject: [PATCH 4/4] chore: update deps --- package.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index e0d80e0c..8f88b9d4 100644 --- a/package.json +++ b/package.json @@ -52,15 +52,15 @@ "rimraf": "^2.5.4" }, "dependencies": { - "async": "^2.1.2", + "async": "^2.1.4", "cids": "^0.2.0", "ipfs-unixfs": "^0.1.5", - "ipld-dag-pb": "^0.8.0", - "ipld-resolver": "^0.2.0", + "ipld-dag-pb": "^0.9.1", + "ipld-resolver": "^0.3.0", "is-ipfs": "^0.2.1", "multihashes": "^0.2.2", "pull-block": "^1.0.2", - "pull-paramap": "^1.2.0", + "pull-paramap": "^1.2.1", "pull-pushable": "^2.0.1", "pull-stream": "^3.5.0", "pull-traverse": "^1.0.3",