diff --git a/package.json b/package.json index f96c0b12..99abddb7 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,7 @@ "dependencies": { "babel-runtime": "^6.11.6", "base32.js": "^0.1.0", - "ipfs-block": "^0.3.0", + "ipfs-block": "^0.4.0", "lock": "^0.1.3", "multihashes": "^0.2.2", "pull-defer": "^0.2.2", @@ -67,4 +67,4 @@ "nginnever ", "npmcdn-to-unpkg-bot " ] -} \ No newline at end of file +} diff --git a/src/stores/blockstore.js b/src/stores/blockstore.js index 21312a9e..4f1765dd 100644 --- a/src/stores/blockstore.js +++ b/src/stores/blockstore.js @@ -1,23 +1,23 @@ 'use strict' const Block = require('ipfs-block') -const pull = require('pull-stream') const Lock = require('lock') const base32 = require('base32.js') const path = require('path') -const write = require('pull-write') const parallel = require('run-parallel') -const defer = require('pull-defer/source') +const pull = require('pull-stream') +const pullWrite = require('pull-write') +const pullDefer = require('pull-defer/source') const PREFIX_LENGTH = 5 +const EXTENSION = 'data' exports = module.exports -function multihashToPath (multihash, extension) { - extension = extension || 'data' +function multihashToPath (multihash) { const encoder = new base32.Encoder() const hash = encoder.write(multihash).finalize() - const filename = `${hash}.${extension}` + const filename = `${hash}.${EXTENSION}` const folder = filename.slice(0, PREFIX_LENGTH) return path.join(folder, filename) @@ -27,82 +27,103 @@ exports.setUp = (basePath, BlobStore, locks) => { const store = new BlobStore(basePath + '/blocks') const lock = new Lock() - function writeBlock (block, cb) { - if (!block || !block.data) { - return cb(new Error('Invalid block')) + // blockBlob is an object with: + // { data: <>, key: <> } + function writeBlock (blockBlob, callback) { + if (!blockBlob || !blockBlob.data) { + return callback(new Error('Invalid block')) } - const key = multihashToPath(block.key, block.extension) - - lock(key, (release) => pull( - pull.values([block.data]), - store.write(key, release((err) => { - if (err) { - return cb(err) - } - cb(null, {key}) - })) - )) + const key = multihashToPath(blockBlob.key) + + lock(key, (release) => { + pull( + pull.values([ + blockBlob.data + ]), + store.write(key, release(released)) + ) + }) + + // called once the lock is released + function released (err) { + if (err) { + return callback(err) + } + callback(null, { key: key }) + } } return { - getStream (key, extension) { + // returns a pull-stream of one block being read + getStream (key) { if (!key) { return pull.error(new Error('Invalid key')) } - const p = multihashToPath(key, extension) - const deferred = defer() + const blockPath = multihashToPath(key) + const deferred = pullDefer() - lock(p, (release) => { - const ext = extension === 'data' ? 'protobuf' : extension + lock(blockPath, (release) => { pull( - store.read(p), - pull.collect(release((err, data) => { - if (err) { - return deferred.abort(err) - } - - deferred.resolve(pull.values([ - new Block(Buffer.concat(data), ext) - ])) - })) + store.read(blockPath), + pull.collect(release(released)) ) }) + function released (err, data) { + if (err) { + return deferred.abort(err) + } + + deferred.resolve( + pull.values([ + new Block(Buffer.concat(data)) + ]) + ) + } + return deferred }, + /* + * putStream - write multiple blocks + * + * returns a pull-stream that expects blockBlobs + * + * NOTE: blockBlob is a { data: <>, key: <> } and not a + * ipfs-block instance. This is because Block instances support + * several types of hashing and it is up to the BlockService + * to understand the right one to use (given the CID) + */ + // TODO + // consider using a more explicit name, this can cause some confusion + // since the natural association is + // getStream - createReadStream - read one + // putStream - createWriteStream - write one + // where in fact it is: + // getStream - createReadStream - read one (the same) + // putStream - createFilesWriteStream = write several + // putStream () { let ended = false let written = [] let push = null - const sink = write((blocks, cb) => { - parallel(blocks.map((block) => (cb) => { - writeBlock(block, (err, meta) => { - if (err) { - return cb(err) - } - - if (push) { - const read = push - push = null - read(null, meta) - return cb() - } - - written.push(meta) - cb() - }) - }), cb) + const sink = pullWrite((blockBlobs, cb) => { + const tasks = writeTasks(blockBlobs) + parallel(tasks, cb) }, null, 100, (err) => { ended = err || true - if (push) push(ended) + if (push) { + push(ended) + } }) const source = (end, cb) => { - if (end) ended = end + if (end) { + ended = end + } if (ended) { return cb(ended) } @@ -114,35 +135,54 @@ exports.setUp = (basePath, BlobStore, locks) => { push = cb } - return {source, sink} - }, + /* + * Creates individual tasks to write each block blob that can be + * exectured in parallel + */ + function writeTasks (blockBlobs) { + return blockBlobs.map((blockBlob) => { + return (cb) => { + writeBlock(blockBlob, (err, meta) => { + if (err) { + return cb(err) + } + + if (push) { + const read = push + push = null + read(null, meta) + return cb() + } + + written.push(meta) + cb() + }) + } + }) + } - has (key, extension, cb) { - if (typeof extension === 'function') { - cb = extension - extension = undefined + return { + source: source, + sink: sink } + }, + has (key, callback) { if (!key) { - return cb(new Error('Invalid key')) + return callback(new Error('Invalid key')) } - const p = multihashToPath(key, extension) - store.exists(p, cb) + const blockPath = multihashToPath(key) + store.exists(blockPath, callback) }, - delete (key, extension, cb) { - if (typeof extension === 'function') { - cb = extension - extension = undefined - } - + delete (key, callback) { if (!key) { - return cb(new Error('Invalid key')) + return callback(new Error('Invalid key')) } - const p = multihashToPath(key, extension) - store.remove(p, cb) + const blockPath = multihashToPath(key) + store.remove(blockPath, callback) } } } diff --git a/src/stores/locks.js b/src/stores/locks.js index 03f6c345..f9bf3244 100644 --- a/src/stores/locks.js +++ b/src/stores/locks.js @@ -13,13 +13,17 @@ exports.setUp = (basePath, BlobStore) => { lock (callback) { function createLock () { pull( - pull.values([new Buffer('LOCK')]), + pull.values([ + new Buffer('LOCK') + ]), store.write(lockFile, callback) ) } function doesExist (err, exists) { - if (err) return callback(err) + if (err) { + return callback(err) + } if (exists) { // default 100ms @@ -37,16 +41,22 @@ exports.setUp = (basePath, BlobStore) => { unlock (callback) { series([ - (cb) => store.remove(lockFile, cb), - (cb) => store.exists(lockFile, (err, exists) => { - if (err) return cb(err) - - if (exists) { - return cb(new Error('failed to remove lock')) - } - - cb() - }) + (cb) => { + store.remove(lockFile, cb) + }, + (cb) => { + store.exists(lockFile, (err, exists) => { + if (err) { + return cb(err) + } + + if (exists) { + return cb(new Error('failed to remove lock')) + } + + cb() + }) + } ], callback) } } diff --git a/test/blockstore-test.js b/test/blockstore-test.js index 2814610b..68726e79 100644 --- a/test/blockstore-test.js +++ b/test/blockstore-test.js @@ -11,14 +11,16 @@ const _ = require('lodash') module.exports = (repo) => { describe('blockstore', () => { const helloKey = 'CIQLS/CIQLSTJHXGJU2PQIUUXFFV62PWV7VREE57RXUU4A52IIR55M4LX432I.data' - const helloIpldKey = 'CIQO2/CIQO2EUTF47PSTAHSL54KUTDS2AAN2DH4URM7H5KRATUGQFCM4OUIQI.ipld' + const blockCollection = _.range(100).map((i) => new Block(new Buffer(`hello-${i}-${Math.random()}`))) describe('.putStream', () => { it('simple', (done) => { const b = new Block('hello world') pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect((err, meta) => { expect(err).to.not.exist @@ -41,13 +43,17 @@ module.exports = (repo) => { } pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect(finish) ) pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect(finish) ) @@ -57,6 +63,9 @@ module.exports = (repo) => { parallel(_.range(50).map(() => (cb) => { pull( pull.values(blockCollection), + pull.map((b) => { + return { data: b.data, key: b.key() } + }), repo.blockstore.putStream(), pull.collect((err, meta) => { expect(err).to.not.exist @@ -67,19 +76,6 @@ module.exports = (repo) => { }), done) }) - it('custom extension', function (done) { - const b = new Block('hello world 2', 'ipld') - pull( - pull.values([b]), - repo.blockstore.putStream(), - pull.collect((err, meta) => { - expect(err).to.not.exist - expect(meta[0].key).to.be.eql(helloIpldKey) - done() - }) - ) - }) - it('returns an error on invalid block', (done) => { pull( pull.values(['hello']), @@ -97,10 +93,10 @@ module.exports = (repo) => { const b = new Block('hello world') pull( - repo.blockstore.getStream(b.key), + repo.blockstore.getStream(b.key()), pull.collect((err, data) => { expect(err).to.not.exist - expect(data[0]).to.be.eql(b) + expect(data[0].key()).to.be.eql(b.key()) done() }) @@ -111,30 +107,17 @@ module.exports = (repo) => { parallel(_.range(20 * 100).map((i) => (cb) => { const j = i % blockCollection.length pull( - repo.blockstore.getStream(blockCollection[j].key), + repo.blockstore.getStream(blockCollection[j].key()), pull.collect((err, meta) => { expect(err).to.not.exist - expect(meta).to.be.eql([blockCollection[j]]) + expect(meta[0].key()) + .to.be.eql(blockCollection[j].key()) cb() }) ) }), done) }) - it('custom extension', (done) => { - const b = new Block('hello world 2', 'ipld') - - pull( - repo.blockstore.getStream(b.key, b.extension), - pull.collect((err, data) => { - expect(err).to.not.exist - expect(data[0]).to.be.eql(b) - - done() - }) - ) - }) - it('returns an error on invalid block', (done) => { pull( repo.blockstore.getStream(), @@ -150,17 +133,7 @@ module.exports = (repo) => { it('existing block', (done) => { const b = new Block('hello world') - repo.blockstore.has(b.key, (err, exists) => { - expect(err).to.not.exist - expect(exists).to.equal(true) - done() - }) - }) - - it('with extension', (done) => { - const b = new Block('hello world') - - repo.blockstore.has(b.key, 'data', (err, exists) => { + repo.blockstore.has(b.key(), (err, exists) => { expect(err).to.not.exist expect(exists).to.equal(true) done() @@ -170,7 +143,7 @@ module.exports = (repo) => { it('non existent block', (done) => { const b = new Block('wooot') - repo.blockstore.has(b.key, (err, exists) => { + repo.blockstore.has(b.key(), (err, exists) => { expect(err).to.not.exist expect(exists).to.equal(false) done() @@ -182,24 +155,10 @@ module.exports = (repo) => { it('simple', (done) => { const b = new Block('hello world') - repo.blockstore.delete(b.key, (err) => { - expect(err).to.not.exist - - repo.blockstore.has(b.key, (err, exists) => { - expect(err).to.not.exist - expect(exists).to.equal(false) - done() - }) - }) - }) - - it('custom extension', (done) => { - const b = new Block('hello world', 'ipld') - - repo.blockstore.delete(b.key, b.extension, (err) => { + repo.blockstore.delete(b.key(), (err) => { expect(err).to.not.exist - repo.blockstore.has(b.key, b.extension, (err, exists) => { + repo.blockstore.has(b.key(), (err, exists) => { expect(err).to.not.exist expect(exists).to.equal(false) done()