From 311551a9a20596c76172c7f60d2afbeb4ffadb9a Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 30 Sep 2016 21:59:30 +0100 Subject: [PATCH 1/2] feat: no optional extension + simplify some of blockstore code --- src/stores/blockstore.js | 155 +++++++++++++++++++++------------------ src/stores/locks.js | 34 ++++++--- test/blockstore-test.js | 61 ++++----------- 3 files changed, 119 insertions(+), 131 deletions(-) diff --git a/src/stores/blockstore.js b/src/stores/blockstore.js index 21312a9e..54ce0df9 100644 --- a/src/stores/blockstore.js +++ b/src/stores/blockstore.js @@ -5,16 +5,16 @@ const pull = require('pull-stream') const Lock = require('lock') const base32 = require('base32.js') const path = require('path') -const write = require('pull-write') +const pullWrite = require('pull-write') const parallel = require('run-parallel') -const defer = require('pull-defer/source') +const pullDefer = require('pull-defer/source') const PREFIX_LENGTH = 5 exports = module.exports -function multihashToPath (multihash, extension) { - extension = extension || 'data' +function multihashToPath (multihash) { + const extension = 'data' const encoder = new base32.Encoder() const hash = encoder.write(multihash).finalize() const filename = `${hash}.${extension}` @@ -27,82 +27,105 @@ exports.setUp = (basePath, BlobStore, locks) => { const store = new BlobStore(basePath + '/blocks') const lock = new Lock() - function writeBlock (block, cb) { + function writeBlock (block, callback) { if (!block || !block.data) { - return cb(new Error('Invalid block')) + return callback(new Error('Invalid block')) } - const key = multihashToPath(block.key, block.extension) - - lock(key, (release) => pull( - pull.values([block.data]), - store.write(key, release((err) => { - if (err) { - return cb(err) - } - cb(null, {key}) - })) - )) + const key = multihashToPath(block.key()) + + lock(key, (release) => { + pull( + pull.values([ + block.data + ]), + store.write(key, release(released)) + ) + }) + + // called once the lock is released + function released (err) { + if (err) { + return callback(err) + } + callback(null, { key: key }) + } } return { - getStream (key, extension) { + // returns a pull-stream of one block being read + getStream (key) { if (!key) { return pull.error(new Error('Invalid key')) } - const p = multihashToPath(key, extension) - const deferred = defer() + const blockPath = multihashToPath(key) + const deferred = pullDefer() - lock(p, (release) => { - const ext = extension === 'data' ? 'protobuf' : extension + lock(blockPath, (release) => { pull( - store.read(p), - pull.collect(release((err, data) => { - if (err) { - return deferred.abort(err) - } - - deferred.resolve(pull.values([ - new Block(Buffer.concat(data), ext) - ])) - })) + store.read(blockPath), + pull.collect(release(released)) ) }) + function released (err, data) { + if (err) { + return deferred.abort(err) + } + + deferred.resolve( + pull.values([ + new Block(Buffer.concat(data)) + ]) + ) + } + return deferred }, + // returns a pull-stream to write blocks into + // TODO use a more explicit name, given that getStream is just for + // one block, multiple blocks should have different naming putStream () { let ended = false let written = [] let push = null - const sink = write((blocks, cb) => { - parallel(blocks.map((block) => (cb) => { - writeBlock(block, (err, meta) => { - if (err) { - return cb(err) - } - - if (push) { - const read = push - push = null - read(null, meta) - return cb() - } - - written.push(meta) - cb() - }) - }), cb) + const sink = pullWrite((blocks, cb) => { + const tasks = blocks.map((block) => { + return (cb) => { + writeBlock(block, (err, meta) => { + if (err) { + return cb(err) + } + + if (push) { + const read = push + push = null + read(null, meta) + return cb() + } + + written.push(meta) + cb() + }) + } + }) + + parallel(tasks, cb) }, null, 100, (err) => { ended = err || true - if (push) push(ended) + if (push) { + push(ended) + } }) + // TODO ??Why does a putStream need to be a source as well?? const source = (end, cb) => { - if (end) ended = end + if (end) { + ended = end + } if (ended) { return cb(ended) } @@ -114,35 +137,25 @@ exports.setUp = (basePath, BlobStore, locks) => { push = cb } - return {source, sink} + return { source: source, sink: sink } }, - has (key, extension, cb) { - if (typeof extension === 'function') { - cb = extension - extension = undefined - } - + has (key, callback) { if (!key) { - return cb(new Error('Invalid key')) + return callback(new Error('Invalid key')) } - const p = multihashToPath(key, extension) - store.exists(p, cb) + const blockPath = multihashToPath(key) + store.exists(blockPath, callback) }, - delete (key, extension, cb) { - if (typeof extension === 'function') { - cb = extension - extension = undefined - } - + delete (key, callback) { if (!key) { - return cb(new Error('Invalid key')) + return callback(new Error('Invalid key')) } - const p = multihashToPath(key, extension) - store.remove(p, cb) + const blockPath = multihashToPath(key) + store.remove(blockPath, callback) } } } diff --git a/src/stores/locks.js b/src/stores/locks.js index 03f6c345..f9bf3244 100644 --- a/src/stores/locks.js +++ b/src/stores/locks.js @@ -13,13 +13,17 @@ exports.setUp = (basePath, BlobStore) => { lock (callback) { function createLock () { pull( - pull.values([new Buffer('LOCK')]), + pull.values([ + new Buffer('LOCK') + ]), store.write(lockFile, callback) ) } function doesExist (err, exists) { - if (err) return callback(err) + if (err) { + return callback(err) + } if (exists) { // default 100ms @@ -37,16 +41,22 @@ exports.setUp = (basePath, BlobStore) => { unlock (callback) { series([ - (cb) => store.remove(lockFile, cb), - (cb) => store.exists(lockFile, (err, exists) => { - if (err) return cb(err) - - if (exists) { - return cb(new Error('failed to remove lock')) - } - - cb() - }) + (cb) => { + store.remove(lockFile, cb) + }, + (cb) => { + store.exists(lockFile, (err, exists) => { + if (err) { + return cb(err) + } + + if (exists) { + return cb(new Error('failed to remove lock')) + } + + cb() + }) + } ], callback) } } diff --git a/test/blockstore-test.js b/test/blockstore-test.js index 2814610b..7252e7b8 100644 --- a/test/blockstore-test.js +++ b/test/blockstore-test.js @@ -11,7 +11,9 @@ const _ = require('lodash') module.exports = (repo) => { describe('blockstore', () => { const helloKey = 'CIQLS/CIQLSTJHXGJU2PQIUUXFFV62PWV7VREE57RXUU4A52IIR55M4LX432I.data' - const helloIpldKey = 'CIQO2/CIQO2EUTF47PSTAHSL54KUTDS2AAN2DH4URM7H5KRATUGQFCM4OUIQI.ipld' + + const helloIpldKey = 'CIQO2/CIQO2EUTF47PSTAHSL54KUTDS2AAN2DH4URM7H5KRATUGQFCM4OUIQI.data' + const blockCollection = _.range(100).map((i) => new Block(new Buffer(`hello-${i}-${Math.random()}`))) describe('.putStream', () => { @@ -68,7 +70,7 @@ module.exports = (repo) => { }) it('custom extension', function (done) { - const b = new Block('hello world 2', 'ipld') + const b = new Block('hello world 2') pull( pull.values([b]), repo.blockstore.putStream(), @@ -97,10 +99,10 @@ module.exports = (repo) => { const b = new Block('hello world') pull( - repo.blockstore.getStream(b.key), + repo.blockstore.getStream(b.key()), pull.collect((err, data) => { expect(err).to.not.exist - expect(data[0]).to.be.eql(b) + expect(data[0].key()).to.be.eql(b.key()) done() }) @@ -111,30 +113,17 @@ module.exports = (repo) => { parallel(_.range(20 * 100).map((i) => (cb) => { const j = i % blockCollection.length pull( - repo.blockstore.getStream(blockCollection[j].key), + repo.blockstore.getStream(blockCollection[j].key()), pull.collect((err, meta) => { expect(err).to.not.exist - expect(meta).to.be.eql([blockCollection[j]]) + expect(meta[0].key()) + .to.be.eql(blockCollection[j].key()) cb() }) ) }), done) }) - it('custom extension', (done) => { - const b = new Block('hello world 2', 'ipld') - - pull( - repo.blockstore.getStream(b.key, b.extension), - pull.collect((err, data) => { - expect(err).to.not.exist - expect(data[0]).to.be.eql(b) - - done() - }) - ) - }) - it('returns an error on invalid block', (done) => { pull( repo.blockstore.getStream(), @@ -150,17 +139,7 @@ module.exports = (repo) => { it('existing block', (done) => { const b = new Block('hello world') - repo.blockstore.has(b.key, (err, exists) => { - expect(err).to.not.exist - expect(exists).to.equal(true) - done() - }) - }) - - it('with extension', (done) => { - const b = new Block('hello world') - - repo.blockstore.has(b.key, 'data', (err, exists) => { + repo.blockstore.has(b.key(), (err, exists) => { expect(err).to.not.exist expect(exists).to.equal(true) done() @@ -170,7 +149,7 @@ module.exports = (repo) => { it('non existent block', (done) => { const b = new Block('wooot') - repo.blockstore.has(b.key, (err, exists) => { + repo.blockstore.has(b.key(), (err, exists) => { expect(err).to.not.exist expect(exists).to.equal(false) done() @@ -182,24 +161,10 @@ module.exports = (repo) => { it('simple', (done) => { const b = new Block('hello world') - repo.blockstore.delete(b.key, (err) => { - expect(err).to.not.exist - - repo.blockstore.has(b.key, (err, exists) => { - expect(err).to.not.exist - expect(exists).to.equal(false) - done() - }) - }) - }) - - it('custom extension', (done) => { - const b = new Block('hello world', 'ipld') - - repo.blockstore.delete(b.key, b.extension, (err) => { + repo.blockstore.delete(b.key(), (err) => { expect(err).to.not.exist - repo.blockstore.has(b.key, b.extension, (err, exists) => { + repo.blockstore.has(b.key(), (err, exists) => { expect(err).to.not.exist expect(exists).to.equal(false) done() From f7e4047938e74ffb6341dba215473c13f40adcab Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 1 Oct 2016 07:52:30 +0100 Subject: [PATCH 2/2] feat: blockstore gets blockBlobs instead of blocks (the difference is that now it receives the key in which it should store it --- package.json | 4 +- src/stores/blockstore.js | 95 ++++++++++++++++++++++++++-------------- test/blockstore-test.js | 30 +++++-------- 3 files changed, 75 insertions(+), 54 deletions(-) diff --git a/package.json b/package.json index f96c0b12..99abddb7 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,7 @@ "dependencies": { "babel-runtime": "^6.11.6", "base32.js": "^0.1.0", - "ipfs-block": "^0.3.0", + "ipfs-block": "^0.4.0", "lock": "^0.1.3", "multihashes": "^0.2.2", "pull-defer": "^0.2.2", @@ -67,4 +67,4 @@ "nginnever ", "npmcdn-to-unpkg-bot " ] -} \ No newline at end of file +} diff --git a/src/stores/blockstore.js b/src/stores/blockstore.js index 54ce0df9..4f1765dd 100644 --- a/src/stores/blockstore.js +++ b/src/stores/blockstore.js @@ -1,23 +1,23 @@ 'use strict' const Block = require('ipfs-block') -const pull = require('pull-stream') const Lock = require('lock') const base32 = require('base32.js') const path = require('path') -const pullWrite = require('pull-write') const parallel = require('run-parallel') +const pull = require('pull-stream') +const pullWrite = require('pull-write') const pullDefer = require('pull-defer/source') const PREFIX_LENGTH = 5 +const EXTENSION = 'data' exports = module.exports function multihashToPath (multihash) { - const extension = 'data' const encoder = new base32.Encoder() const hash = encoder.write(multihash).finalize() - const filename = `${hash}.${extension}` + const filename = `${hash}.${EXTENSION}` const folder = filename.slice(0, PREFIX_LENGTH) return path.join(folder, filename) @@ -27,17 +27,19 @@ exports.setUp = (basePath, BlobStore, locks) => { const store = new BlobStore(basePath + '/blocks') const lock = new Lock() - function writeBlock (block, callback) { - if (!block || !block.data) { + // blockBlob is an object with: + // { data: <>, key: <> } + function writeBlock (blockBlob, callback) { + if (!blockBlob || !blockBlob.data) { return callback(new Error('Invalid block')) } - const key = multihashToPath(block.key()) + const key = multihashToPath(blockBlob.key) lock(key, (release) => { pull( pull.values([ - block.data + blockBlob.data ]), store.write(key, release(released)) ) @@ -84,35 +86,32 @@ exports.setUp = (basePath, BlobStore, locks) => { return deferred }, - // returns a pull-stream to write blocks into - // TODO use a more explicit name, given that getStream is just for - // one block, multiple blocks should have different naming + /* + * putStream - write multiple blocks + * + * returns a pull-stream that expects blockBlobs + * + * NOTE: blockBlob is a { data: <>, key: <> } and not a + * ipfs-block instance. This is because Block instances support + * several types of hashing and it is up to the BlockService + * to understand the right one to use (given the CID) + */ + // TODO + // consider using a more explicit name, this can cause some confusion + // since the natural association is + // getStream - createReadStream - read one + // putStream - createWriteStream - write one + // where in fact it is: + // getStream - createReadStream - read one (the same) + // putStream - createFilesWriteStream = write several + // putStream () { let ended = false let written = [] let push = null - const sink = pullWrite((blocks, cb) => { - const tasks = blocks.map((block) => { - return (cb) => { - writeBlock(block, (err, meta) => { - if (err) { - return cb(err) - } - - if (push) { - const read = push - push = null - read(null, meta) - return cb() - } - - written.push(meta) - cb() - }) - } - }) - + const sink = pullWrite((blockBlobs, cb) => { + const tasks = writeTasks(blockBlobs) parallel(tasks, cb) }, null, 100, (err) => { ended = err || true @@ -121,7 +120,6 @@ exports.setUp = (basePath, BlobStore, locks) => { } }) - // TODO ??Why does a putStream need to be a source as well?? const source = (end, cb) => { if (end) { ended = end @@ -137,7 +135,36 @@ exports.setUp = (basePath, BlobStore, locks) => { push = cb } - return { source: source, sink: sink } + /* + * Creates individual tasks to write each block blob that can be + * exectured in parallel + */ + function writeTasks (blockBlobs) { + return blockBlobs.map((blockBlob) => { + return (cb) => { + writeBlock(blockBlob, (err, meta) => { + if (err) { + return cb(err) + } + + if (push) { + const read = push + push = null + read(null, meta) + return cb() + } + + written.push(meta) + cb() + }) + } + }) + } + + return { + source: source, + sink: sink + } }, has (key, callback) { diff --git a/test/blockstore-test.js b/test/blockstore-test.js index 7252e7b8..68726e79 100644 --- a/test/blockstore-test.js +++ b/test/blockstore-test.js @@ -12,15 +12,15 @@ module.exports = (repo) => { describe('blockstore', () => { const helloKey = 'CIQLS/CIQLSTJHXGJU2PQIUUXFFV62PWV7VREE57RXUU4A52IIR55M4LX432I.data' - const helloIpldKey = 'CIQO2/CIQO2EUTF47PSTAHSL54KUTDS2AAN2DH4URM7H5KRATUGQFCM4OUIQI.data' - const blockCollection = _.range(100).map((i) => new Block(new Buffer(`hello-${i}-${Math.random()}`))) describe('.putStream', () => { it('simple', (done) => { const b = new Block('hello world') pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect((err, meta) => { expect(err).to.not.exist @@ -43,13 +43,17 @@ module.exports = (repo) => { } pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect(finish) ) pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect(finish) ) @@ -59,6 +63,9 @@ module.exports = (repo) => { parallel(_.range(50).map(() => (cb) => { pull( pull.values(blockCollection), + pull.map((b) => { + return { data: b.data, key: b.key() } + }), repo.blockstore.putStream(), pull.collect((err, meta) => { expect(err).to.not.exist @@ -69,19 +76,6 @@ module.exports = (repo) => { }), done) }) - it('custom extension', function (done) { - const b = new Block('hello world 2') - pull( - pull.values([b]), - repo.blockstore.putStream(), - pull.collect((err, meta) => { - expect(err).to.not.exist - expect(meta[0].key).to.be.eql(helloIpldKey) - done() - }) - ) - }) - it('returns an error on invalid block', (done) => { pull( pull.values(['hello']),