Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit adc2fcc

Browse files
committed
Better tests + implementation
1 parent a8e2502 commit adc2fcc

File tree

2 files changed

+93
-46
lines changed

2 files changed

+93
-46
lines changed

src/api/pubsub.js

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,21 @@ var Stream = require('stream')
77
// const Wreck = require('wreck')
88
var http = require('http')
99

10+
let activeSubscriptions = []
11+
12+
const subscriptionExists = (subscriptions, topic) => {
13+
return subscriptions.indexOf(topic) !== -1
14+
}
15+
const removeSubscription = (subscriptions, topic) => {
16+
const indexToRemove = subscriptions.indexOf(topic)
17+
return subscriptions.filter((el, index) => {
18+
return index !== indexToRemove
19+
})
20+
}
21+
const addSubscription = (subscriptions, topic) => {
22+
return subscriptions.concat([topic])
23+
}
24+
1025
module.exports = (send) => {
1126
const api = {
1227
sub: promisify((topic, options, callback) => {
@@ -18,12 +33,17 @@ module.exports = (send) => {
1833
options = {}
1934
}
2035

21-
2236
var rs = new Stream()
2337
rs.readable = true
2438

25-
console.log('Sub', topic)
26-
http.get({
39+
if (!subscriptionExists(activeSubscriptions, topic)) {
40+
activeSubscriptions = addSubscription(activeSubscriptions, topic)
41+
} else {
42+
return callback(new Error('Already subscribed to ' + topic), null)
43+
}
44+
45+
// we're using http.get here to have more control
46+
const request = http.get({
2747
host: 'localhost',
2848
port: 5001,
2949
path: '/api/v0/pubsub/sub/' + topic
@@ -32,7 +52,7 @@ module.exports = (send) => {
3252
var parsed = JSON.parse(d)
3353

3454
// skip "double subscription" error
35-
if(!parsed.Message) {
55+
if (!parsed.Message) {
3656
parsed.from = bs58.encode(parsed.from)
3757
parsed.data = Base64.decode(parsed.data)
3858
parsed.seqno = Base64.decode(parsed.seqno)
@@ -42,21 +62,17 @@ module.exports = (send) => {
4262
response.on('end', function () {
4363
rs.emit('end')
4464
})
45-
46-
callback(null, rs)
65+
rs.cancel = () => {
66+
request.abort()
67+
response.destroy()
68+
activeSubscriptions = removeSubscription(activeSubscriptions, topic)
69+
}
4770
})
48-
49-
// send({
50-
// path: 'pubsub/sub/' + topic
51-
// }, (err, response) => {
52-
// console.log('RESULT', err, response)
53-
// if (err) {
54-
// return callback(err)
55-
// }
56-
57-
// callback(null, response.pipe(ndjson.parse()))
58-
// // callback(null, result) // result is a Stream
59-
// })
71+
rs.cancel = () => {
72+
request.abort()
73+
activeSubscriptions = removeSubscription(activeSubscriptions, topic)
74+
}
75+
callback(null, rs)
6076
}),
6177
pub: promisify((topic, data, options, callback) => {
6278
if (typeof options === 'function') {
@@ -81,7 +97,6 @@ module.exports = (send) => {
8197
if (err) {
8298
return callback(err)
8399
}
84-
85100
callback(null, true)
86101
})
87102
})

test/ipfs-api/pubsub.spec.js

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ const ipfsApi = require('../../src')
99

1010
const useLocalDaemon = true
1111

12+
const topicName = 'js-ipfs-api-tests'
13+
14+
const publish = (ipfs, data, callback) => {
15+
ipfs.pubsub.pub(topicName, data, (err, result) => {
16+
expect(err).to.not.exist
17+
expect(result).to.equal(true)
18+
callback()
19+
})
20+
}
21+
1222
describe('.pubsub', () => {
1323
if (!isNode) {
1424
return
@@ -33,40 +43,62 @@ describe('.pubsub', () => {
3343
// // fc.dismantle(done)
3444
// })
3545

36-
it.only('sub', (done) => {
37-
ipfs.pubsub.sub('testi1', (err, result) => {
38-
expect(err).to.not.exist
39-
result.on('data', function (d) {
40-
// console.log("-->", d)
41-
expect(d.data).to.equal('hi')
42-
done()
43-
})
44-
result.on('end', function () {
45-
console.log("END!!")
46-
})
46+
describe('.publish', () => {
47+
it('message from string', (done) => {
48+
publish(ipfs, 'hello friend', done)
49+
})
50+
it('message from buffer', (done) => {
51+
publish(ipfs, new Buffer('hello friend'), done)
4752
})
48-
setTimeout(() => {
49-
ipfs.pubsub.pub('testi1', 'hi')
50-
}, 100)
5153
})
5254

53-
describe('.pub', () => {
54-
it.only('publishes a message - from string', (done) => {
55-
const data = 'hello friend'
56-
ipfs.pubsub.pub('testi1', data, (err, result) => {
55+
describe('.subscribe', () => {
56+
it('one topic', (done) => {
57+
ipfs.pubsub.sub(topicName, (err, subscription) => {
5758
expect(err).to.not.exist
58-
expect(result).to.equal(true)
59-
done()
59+
subscription.on('data', (d) => {
60+
expect(d.data).to.equal('hi')
61+
subscription.cancel()
62+
})
63+
subscription.on('end', () => {
64+
done()
65+
})
6066
})
67+
setTimeout(publish.bind(null, ipfs, 'hi', () => {}), 0)
68+
})
69+
it('fails when already subscribed', (done) => {
70+
ipfs.pubsub.sub(topicName, (firstErr, firstSub) => {
71+
expect(firstErr).to.not.exist
72+
ipfs.pubsub.sub(topicName, (secondErr, secondSub) => {
73+
expect(secondErr).to.be.an('error')
74+
expect(secondErr.toString()).to.equal('Error: Already subscribed to ' + topicName)
75+
firstSub.cancel()
76+
done()
77+
}).catch(done)
78+
}).catch(done)
6179
})
80+
it('receive multiple messages', (done) => {
81+
let receivedMessages = []
82+
let interval = null
83+
const expectedMessages = 2
84+
ipfs.pubsub.sub(topicName, (err, subscription) => {
85+
expect(err).to.not.exists
86+
subscription.on('data', (d) => {
87+
receivedMessages.push(d.data)
88+
if (receivedMessages.length === expectedMessages) {
89+
receivedMessages.forEach((msg) => {
90+
expect(msg).to.be.equal('hi')
91+
})
92+
clearInterval(interval)
93+
subscription.cancel()
94+
done()
95+
}
96+
})
97+
}).catch(done)
6298

63-
it.only('publishes a message - from Buffer', (done) => {
64-
const data = new Buffer('hello buffer')
65-
ipfs.pubsub.pub('testi1', data, (err, result) => {
66-
expect(err).to.not.exist
67-
expect(result).to.equal(true)
68-
done()
69-
})
99+
setTimeout(() => {
100+
interval = setInterval(publish.bind(null, ipfs, 'hi', () => {}), 10)
101+
}, 10)
70102
})
71103
})
72104
})

0 commit comments

Comments
 (0)