Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

(WIP) feature: floodsub #470

Merged
merged 2 commits into from
Dec 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"ipld-dag-pb": "^0.9.2",
"is-ipfs": "^0.2.1",
"isstream": "^0.1.2",
"js-base64": "^2.1.9",
"lru-cache": "^4.0.2",
"multiaddr": "^2.1.1",
"multipart-stream": "^2.0.1",
Expand Down Expand Up @@ -63,7 +64,7 @@
"gulp": "^3.9.1",
"hapi": "^16.0.1",
"interface-ipfs-core": "^0.22.0",
"ipfsd-ctl": "^0.17.0",
"@haad/ipfsd-ctl": "^0.18.0-beta.5",
"pre-commit": "^1.1.3",
"socket.io": "^1.7.1",
"socket.io-client": "^1.7.1",
Expand Down Expand Up @@ -115,4 +116,4 @@
"url": "https://github.com/ipfs/js-ipfs-api/issues"
},
"homepage": "https://github.com/ipfs/js-ipfs-api"
}
}
18 changes: 10 additions & 8 deletions src/api/add.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
'use strict'

const isStream = require('isstream')
const addToDagNodesTransform = require('../add-to-dagnode-transform')
const promisify = require('promisify-es6')
const DAGNodeStream = require('../dagnode-stream')

module.exports = (send) => {
return promisify((files, callback) => {
const good = Buffer.isBuffer(files) ||
const ok = Buffer.isBuffer(files) ||
isStream.isReadable(files) ||
Array.isArray(files)

if (!good) {
callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
if (!ok) {
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
}

const sendWithTransform = send.withTransform(addToDagNodesTransform)

return sendWithTransform({
const request = {
path: 'add',
files: files
}, callback)
}

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
send.andTransform(request, transform, callback)
})
}
7 changes: 5 additions & 2 deletions src/api/dht.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const promisify = require('promisify-es6')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
return {
Expand All @@ -19,11 +20,13 @@ module.exports = (send) => {
opts = {}
}

send({
const request = {
path: 'dht/findprovs',
args: args,
qs: opts
}, callback)
}

send.andTransform(request, streamToValue, callback)
}),
get: promisify((key, opts, callback) => {
if (typeof opts === 'function' &&
Expand Down
15 changes: 8 additions & 7 deletions src/api/get.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'use strict'

const tarStreamToObjects = require('../tar-stream-to-objects')
const cleanMultihash = require('../clean-multihash')
const promisify = require('promisify-es6')
const cleanMultihash = require('../clean-multihash')
const TarStreamToObjects = require('../tar-stream-to-objects')

module.exports = (send) => {
return promisify(function get (path, opts, callback) {
return promisify((path, opts, callback) => {
if (typeof opts === 'function' &&
!callback) {
callback = opts
Expand All @@ -26,12 +26,13 @@ module.exports = (send) => {
return callback(err)
}

var sendWithTransform = send.withTransform(tarStreamToObjects)

sendWithTransform({
const request = {
path: 'get',
args: path,
qs: opts
}, callback)
}

// Convert the response stream to TarStream objects
send.andTransform(request, TarStreamToObjects.from, callback)
})
}
32 changes: 25 additions & 7 deletions src/api/ping.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
'use strict'

const promisify = require('promisify-es6')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
return promisify((id, callback) => {
send({
const request = {
path: 'ping',
args: id,
qs: { n: 1 }
}, function (err, res) {
if (err) {
return callback(err, null)
}
callback(null, res[1])
})
}

// Transform the response stream to a value:
// { Success: <boolean>, Time: <number>, Text: <string> }
const transform = (res, callback) => {
streamToValue(res, (err, res) => {
if (err) {
return callback(err)
}

// go-ipfs http api currently returns 3 lines for a ping.
// they're a little messed, so take the correct values from each lines.
const pingResult = {
Success: res[1].Success,
Time: res[1].Time,
Text: res[2].Text
}

callback(null, pingResult)
})
}

send.andTransform(request, transform, callback)
})
}
98 changes: 98 additions & 0 deletions src/api/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
'use strict'

const promisify = require('promisify-es6')
const PubsubMessageStream = require('../pubsub-message-stream')
const stringlistToArray = require('../stringlist-to-array')

/* Internal subscriptions state and functions */
let subscriptions = {}

const addSubscription = (topic, request) => {
subscriptions[topic] = { request: request }
}

const removeSubscription = promisify((topic, callback) => {
if (!subscriptions[topic]) {
return callback(new Error(`Not subscribed to ${topic}`))
}

subscriptions[topic].request.abort()
delete subscriptions[topic]

if (callback) {
callback(null)
}
})

/* Public API */
module.exports = (send) => {
return {
subscribe: promisify((topic, options, callback) => {
const defaultOptions = {
discover: false
}

if (typeof options === 'function') {
callback = options
options = defaultOptions
}

if (!options) {
options = defaultOptions
}

// If we're already subscribed, return an error
if (subscriptions[topic]) {
return callback(new Error(`Already subscribed to '${topic}'`))
}

// Request params
const request = {
path: 'pubsub/sub',
args: [topic],
qs: { discover: options.discover }
}

// Start the request and transform the response stream to Pubsub messages stream
const req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
if (err) {
return callback(err)
}
// Add a cancel method to the stream so that the subscription can be cleanly cancelled
stream.cancel = promisify((cb) => removeSubscription(topic, cb))
// Add the request to the active subscriptions and return the stream
addSubscription(topic, req)
callback(null, stream)
})
}),
publish: promisify((topic, data, callback) => {
const buf = Buffer.isBuffer(data) ? data : new Buffer(data)

const request = {
path: 'pubsub/pub',
args: [topic, buf]
}

send(request, callback)
}),
ls: promisify((callback) => {
const request = {
path: 'pubsub/ls'
}

send.andTransform(request, stringlistToArray, callback)
}),
peers: promisify((topic, callback) => {
if (!subscriptions[topic]) {
return callback(new Error(`Not subscribed to '${topic}'`))
}

const request = {
path: 'pubsub/peers',
args: [topic]
}

send.andTransform(request, stringlistToArray, callback)
})
}
}
16 changes: 12 additions & 4 deletions src/api/refs.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
'use strict'

const promisify = require('promisify-es6')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
const refs = promisify((args, opts, callback) => {
if (typeof (opts) === 'function') {
callback = opts
opts = {}
}
return send({

const request = {
path: 'refs',
args: args,
qs: opts
}, callback)
}

send.andTransform(request, streamToValue, callback)
})

refs.local = promisify((opts, callback) => {
if (typeof (opts) === 'function') {
callback = opts
opts = {}
}
return send({

const request = {
path: 'refs',
qs: opts
}, callback)
}

send.andTransform(request, streamToValue, callback)
})

return refs
Expand Down
12 changes: 7 additions & 5 deletions src/api/util/fs-add.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use strict'

const isNode = require('detect-node')
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
const promisify = require('promisify-es6')
const DAGNodeStream = require('../../dagnode-stream')

module.exports = (send) => {
return promisify((path, opts, callback) => {
Expand All @@ -28,12 +28,14 @@ module.exports = (send) => {
return callback(new Error('"path" must be a string'))
}

const sendWithTransform = send.withTransform(addToDagNodesTransform)

sendWithTransform({
const request = {
path: 'add',
qs: opts,
files: path
}, callback)
}

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
send.andTransform(request, transform, callback)
})
}
12 changes: 7 additions & 5 deletions src/api/util/url-add.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
const promisify = require('promisify-es6')
const once = require('once')
const parseUrl = require('url').parse

const request = require('../../request')
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
const DAGNodeStream = require('../../dagnode-stream')

module.exports = (send) => {
return promisify((url, opts, callback) => {
Expand All @@ -28,7 +27,6 @@ module.exports = (send) => {
return callback(new Error('"url" param must be an http(s) url'))
}

const sendWithTransform = send.withTransform(addToDagNodesTransform)
callback = once(callback)

request(parseUrl(url).protocol)(url, (res) => {
Expand All @@ -37,11 +35,15 @@ module.exports = (send) => {
return callback(new Error(`Failed to download with ${res.statusCode}`))
}

sendWithTransform({
const params = {
path: 'add',
qs: opts,
files: res
}, callback)
}

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
send.andTransform(params, transform, callback)
}).end()
})
}
Loading