Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit ff6e6c8

Browse files
committed
Pubsub
1 parent 2f6ba3d commit ff6e6c8

File tree

5 files changed

+194
-1
lines changed

5 files changed

+194
-1
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
"hapi": "^15.0.2",
5252
"interface-ipfs-core": "^0.14.2",
5353
"ipfsd-ctl": "^0.14.0",
54+
"js-base64": "^2.1.9",
5455
"pre-commit": "^1.1.3",
5556
"socket.io": "^1.4.8",
5657
"socket.io-client": "^1.4.8",
@@ -100,4 +101,4 @@
100101
"url": "https://github.com/ipfs/js-ipfs-api/issues"
101102
},
102103
"homepage": "https://github.com/ipfs/js-ipfs-api"
103-
}
104+
}

src/api/pubsub.js

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
const bs58 = require('bs58')
5+
var Base64 = require('js-base64').Base64
6+
// const Wreck = require('wreck')
7+
var http = require('http')
8+
9+
module.exports = (send) => {
10+
const api = {
11+
sub: promisify((topic, options, callback) => {
12+
if (typeof options === 'function') {
13+
callback = options
14+
options = {}
15+
}
16+
if (!options) {
17+
options = {}
18+
}
19+
20+
console.log('Sub', topic)
21+
http.get({
22+
host: 'localhost',
23+
port: 5001,
24+
path: '/api/v0/pubsub/sub/' + topic
25+
}, function (response) {
26+
// Continuously update stream with data
27+
var body = ''
28+
response.on('data', function (d) {
29+
// console.log('>', d.toString())
30+
var parsed = JSON.parse(d)
31+
parsed.from = bs58.encode(parsed.from)
32+
parsed.data = Base64.decode(parsed.data)
33+
parsed.seqno = Base64.decode(parsed.seqno)
34+
console.log(parsed)
35+
body += d
36+
})
37+
response.on('end', function () {
38+
// Data reception is done, do whatever with it!
39+
var parsed = JSON.parse(body)
40+
console.log(parsed)
41+
callback(null, parsed)
42+
})
43+
})
44+
45+
// Wreck.get('http://localhost:5001/api/v0/pubsub/sub/' + topic, (err, res, payload) => {
46+
// if (err) {
47+
// return callback(err)
48+
// }
49+
50+
// console.log(payload.toString())
51+
// // const result = JSON.parse(res)
52+
// callback(null, payload)
53+
// })
54+
55+
// send({
56+
// path: 'pubsub/sub/' + topic
57+
// }, (err, result) => {
58+
// console.log('RESULT', err, result)
59+
// if (err) {
60+
// return callback(err)
61+
// }
62+
63+
// callback(null, result) // result is a Stream
64+
// })
65+
}),
66+
pub: promisify((topic, data, options, callback) => {
67+
if (typeof options === 'function') {
68+
callback = options
69+
options = {}
70+
}
71+
if (!options) {
72+
options = {}
73+
}
74+
75+
let buf
76+
if (Buffer.isBuffer(data)) {
77+
buf = data
78+
} else {
79+
buf = new Buffer(data)
80+
}
81+
82+
send({
83+
path: 'pubsub/pub',
84+
qs: { topic: topic },
85+
files: buf
86+
}, (err, result) => {
87+
if (err) {
88+
return callback(err)
89+
}
90+
91+
callback(null, true)
92+
})
93+
})
94+
}
95+
96+
return api
97+
}

src/load-commands.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ function requireCommands () {
2222
object: require('./api/object'),
2323
pin: require('./api/pin'),
2424
ping: require('./api/ping'),
25+
pubsub: require('./api/pubsub'),
2526
refs: require('./api/refs'),
2627
repo: require('./api/repo'),
2728
swarm: require('./api/swarm'),

test.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict'
2+
3+
var http = require('http')
4+
var Base64 = require('js-base64').Base64
5+
const bs58 = require('bs58')
6+
7+
http.get({
8+
host: 'localhost',
9+
port: 5001,
10+
path: '/api/v0/pubsub/sub/testi1'
11+
}, function (response) {
12+
// Continuously update stream with data
13+
var body = ''
14+
response.on('data', function (d) {
15+
// console.log(">", d.toString())
16+
var parsed = JSON.parse(d)
17+
parsed.from = bs58.encode(parsed.from)
18+
parsed.data = Base64.decode(parsed.data)
19+
parsed.seqno = Base64.decode(parsed.seqno)
20+
console.log(parsed)
21+
body += d
22+
})
23+
response.on('end', function () {
24+
// Data reception is done, do whatever with it!
25+
var parsed = JSON.parse(body)
26+
console.log(parsed)
27+
// callback(null, parsed)
28+
})
29+
})

test/ipfs-api/pubsub.spec.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/* eslint-env mocha */
2+
/* eslint max-nested-callbacks: ['error', 8] */
3+
'use strict'
4+
5+
const expect = require('chai').expect
6+
const isNode = require('detect-node')
7+
const FactoryClient = require('../factory/factory-client')
8+
9+
const useLocalDaemon = true
10+
11+
describe('.pubsub', () => {
12+
if (!isNode) {
13+
return
14+
}
15+
16+
let ipfs
17+
let fc
18+
19+
before(function (done) {
20+
this.timeout(20 * 1000) // slow CI
21+
fc = new FactoryClient()
22+
fc.spawnNode(null, null, useLocalDaemon, (err, node) => {
23+
expect(err).to.not.exist
24+
ipfs = node
25+
done()
26+
})
27+
})
28+
29+
after((done) => {
30+
fc.dismantle(done)
31+
})
32+
33+
it.only('sub', (done) => {
34+
console.log('1')
35+
ipfs.pubsub.sub('testi1', (err, result) => {
36+
console.log('RESULT1', err, result)
37+
expect(err).to.not.exist
38+
expect(result.length).to.equal(1)
39+
done()
40+
})
41+
// setTimeout(() => {
42+
// ipfs.pubsub.pub('testi1', 'hi')
43+
// }, 1000)
44+
})
45+
46+
describe('.pub', () => {
47+
it('publishes a message - from string', (done) => {
48+
const data = 'hello friend'
49+
ipfs.pubsub.pub('testi1', data, (err, result) => {
50+
expect(err).to.not.exist
51+
expect(result).to.equal(true)
52+
done()
53+
})
54+
})
55+
56+
it('publishes a message - from Buffer', (done) => {
57+
const data = new Buffer('hello buffer')
58+
ipfs.pubsub.pub('testi1', data, (err, result) => {
59+
expect(err).to.not.exist
60+
expect(result).to.equal(true)
61+
done()
62+
})
63+
})
64+
})
65+
})

0 commit comments

Comments
 (0)