Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 02985ae

Browse files
committed
feat: modularises pubsub
License: MIT Signed-off-by: Alan Shaw <alan@tableflip.io>
1 parent 76612ed commit 02985ae

File tree

8 files changed

+842
-678
lines changed

8 files changed

+842
-678
lines changed

js/src/pubsub.js

Lines changed: 0 additions & 678 deletions
This file was deleted.

js/src/pubsub/index.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
'use strict'
2+
const { createSuite } = require('../utils/suite')
3+
4+
const tests = {
5+
publish: require('./publish'),
6+
subscribe: require('./subscribe'),
7+
unsubscribe: require('./unsubscribe'),
8+
peers: require('./peers'),
9+
ls: require('./ls')
10+
}
11+
12+
module.exports = createSuite(tests)

js/src/pubsub/ls.js

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/* eslint-env mocha */
2+
'use strict'
3+
4+
const chai = require('chai')
5+
const dirtyChai = require('dirty-chai')
6+
const each = require('async/each')
7+
const { getTopic } = require('./utils')
8+
const { getDescribe, getIt } = require('../utils/mocha')
9+
10+
const expect = chai.expect
11+
chai.use(dirtyChai)
12+
13+
module.exports = (createCommon, options) => {
14+
const describe = getDescribe(options)
15+
const it = getIt(options)
16+
const common = createCommon()
17+
18+
describe('.pubsub.ls', function () {
19+
this.timeout(80 * 1000)
20+
21+
let ipfs
22+
23+
before(function (done) {
24+
// CI takes longer to instantiate the daemon, so we need to increase the
25+
// timeout for the before step
26+
this.timeout(60 * 1000)
27+
28+
common.setup((err, factory) => {
29+
expect(err).to.not.exist()
30+
factory.spawnNode((err, node) => {
31+
expect(err).to.not.exist()
32+
ipfs = node
33+
done()
34+
})
35+
})
36+
})
37+
38+
after((done) => common.teardown(done))
39+
40+
it('should return an empty list when no topics are subscribed', (done) => {
41+
ipfs.pubsub.ls((err, topics) => {
42+
expect(err).to.not.exist()
43+
expect(topics.length).to.equal(0)
44+
done()
45+
})
46+
})
47+
48+
it('should return a list with 1 subscribed topic', (done) => {
49+
const sub1 = (msg) => {}
50+
const topic = getTopic()
51+
52+
ipfs.pubsub.subscribe(topic, sub1, (err) => {
53+
expect(err).to.not.exist()
54+
55+
ipfs.pubsub.ls((err, topics) => {
56+
expect(err).to.not.exist()
57+
expect(topics).to.be.eql([topic])
58+
59+
ipfs.pubsub.unsubscribe(topic, sub1, done)
60+
})
61+
})
62+
})
63+
64+
it('should return a list with 3 subscribed topics', (done) => {
65+
const topics = [{
66+
name: 'one',
67+
handler () {}
68+
}, {
69+
name: 'two',
70+
handler () {}
71+
}, {
72+
name: 'three',
73+
handler () {}
74+
}]
75+
76+
each(topics, (t, cb) => {
77+
ipfs.pubsub.subscribe(t.name, t.handler, cb)
78+
}, (err) => {
79+
expect(err).to.not.exist()
80+
81+
ipfs.pubsub.ls((err, list) => {
82+
expect(err).to.not.exist()
83+
84+
expect(list.sort())
85+
.to.eql(topics.map((t) => t.name).sort())
86+
87+
each(topics, (t, cb) => {
88+
ipfs.pubsub.unsubscribe(t.name, t.handler, cb)
89+
}, done)
90+
})
91+
})
92+
})
93+
})
94+
}

js/src/pubsub/peers.js

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/* eslint-env mocha */
2+
'use strict'
3+
4+
const chai = require('chai')
5+
const dirtyChai = require('dirty-chai')
6+
const parallel = require('async/parallel')
7+
const auto = require('async/auto')
8+
const { spawnNodesWithId } = require('../utils/spawn')
9+
const { waitForPeers, getTopic } = require('./utils')
10+
const { getDescribe, getIt } = require('../utils/mocha')
11+
12+
const expect = chai.expect
13+
chai.use(dirtyChai)
14+
15+
module.exports = (createCommon, options) => {
16+
const describe = getDescribe(options)
17+
const it = getIt(options)
18+
const common = createCommon()
19+
20+
describe('.pubsub.peers', function () {
21+
this.timeout(80 * 1000)
22+
23+
let ipfs1
24+
let ipfs2
25+
let ipfs3
26+
27+
before(function (done) {
28+
// CI takes longer to instantiate the daemon, so we need to increase the
29+
// timeout for the before step
30+
this.timeout(100 * 1000)
31+
32+
common.setup((err, factory) => {
33+
if (err) return done(err)
34+
35+
spawnNodesWithId(3, factory, (err, nodes) => {
36+
if (err) return done(err)
37+
38+
ipfs1 = nodes[0]
39+
ipfs2 = nodes[1]
40+
ipfs3 = nodes[2]
41+
42+
done()
43+
})
44+
})
45+
})
46+
47+
after((done) => common.teardown(done))
48+
49+
before((done) => {
50+
const ipfs2Addr = ipfs2.peerId.addresses.find((a) => a.includes('127.0.0.1'))
51+
const ipfs3Addr = ipfs3.peerId.addresses.find((a) => a.includes('127.0.0.1'))
52+
53+
parallel([
54+
(cb) => ipfs1.swarm.connect(ipfs2Addr, cb),
55+
(cb) => ipfs1.swarm.connect(ipfs3Addr, cb),
56+
(cb) => ipfs2.swarm.connect(ipfs3Addr, cb)
57+
], done)
58+
})
59+
60+
describe('.peers', () => {
61+
it('should not error when not subscribed to a topic', (done) => {
62+
const topic = getTopic()
63+
ipfs1.pubsub.peers(topic, (err, peers) => {
64+
expect(err).to.not.exist()
65+
// Should be empty() but as mentioned below go-ipfs returns more than it should
66+
// expect(peers).to.be.empty()
67+
68+
done()
69+
})
70+
})
71+
72+
it('should not return extra peers', (done) => {
73+
// Currently go-ipfs returns peers that have not been
74+
// subscribed to the topic. Enable when go-ipfs has been fixed
75+
const sub1 = (msg) => {}
76+
const sub2 = (msg) => {}
77+
const sub3 = (msg) => {}
78+
79+
const topic = getTopic()
80+
const topicOther = topic + 'different topic'
81+
82+
parallel([
83+
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
84+
(cb) => ipfs2.pubsub.subscribe(topicOther, sub2, cb),
85+
(cb) => ipfs3.pubsub.subscribe(topicOther, sub3, cb)
86+
], (err) => {
87+
expect(err).to.not.exist()
88+
89+
ipfs1.pubsub.peers(topic, (err, peers) => {
90+
expect(err).to.not.exist()
91+
expect(peers).to.be.empty()
92+
93+
parallel([
94+
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
95+
(cb) => ipfs2.pubsub.unsubscribe(topicOther, sub2, cb),
96+
(cb) => ipfs3.pubsub.unsubscribe(topicOther, sub3, cb)
97+
], done)
98+
})
99+
})
100+
})
101+
102+
it('should return peers for a topic - one peer', (done) => {
103+
// Currently go-ipfs returns peers that have not been
104+
// subscribed to the topic. Enable when go-ipfs has been fixed
105+
const sub1 = (msg) => {}
106+
const sub2 = (msg) => {}
107+
const sub3 = (msg) => {}
108+
const topic = getTopic()
109+
110+
auto({
111+
sub1: (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
112+
sub2: (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
113+
sub3: (cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
114+
peers: ['sub1', 'sub2', 'sub3', (_, cb) => {
115+
waitForPeers(ipfs1, topic, [ipfs2.peerId.id], cb)
116+
}]
117+
}, (err) => {
118+
expect(err).to.not.exist()
119+
120+
parallel([
121+
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
122+
(cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb),
123+
(cb) => ipfs3.pubsub.unsubscribe(topic, sub3, cb)
124+
], done)
125+
})
126+
})
127+
128+
it('should return peers for a topic - multiple peers', (done) => {
129+
const sub1 = (msg) => {}
130+
const sub2 = (msg) => {}
131+
const sub3 = (msg) => {}
132+
const topic = getTopic()
133+
134+
auto({
135+
sub1: (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
136+
sub2: (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
137+
sub3: (cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
138+
peers: ['sub1', 'sub2', 'sub3', (_, cb) => {
139+
waitForPeers(ipfs1, topic, [
140+
ipfs2.peerId.id,
141+
ipfs3.peerId.id
142+
], cb)
143+
}]
144+
}, (err) => {
145+
expect(err).to.not.exist()
146+
147+
parallel([
148+
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
149+
(cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb),
150+
(cb) => ipfs3.pubsub.unsubscribe(topic, sub3, cb)
151+
], done)
152+
})
153+
})
154+
})
155+
})
156+
}

js/src/pubsub/publish.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/* eslint-env mocha */
2+
/* eslint max-nested-callbacks: ['error', 8] */
3+
'use strict'
4+
5+
const chai = require('chai')
6+
const dirtyChai = require('dirty-chai')
7+
const times = require('async/times')
8+
const hat = require('hat')
9+
const { getTopic } = require('./utils')
10+
const { getDescribe, getIt } = require('../utils/mocha')
11+
12+
const expect = chai.expect
13+
chai.use(dirtyChai)
14+
15+
module.exports = (createCommon, options) => {
16+
const describe = getDescribe(options)
17+
const it = getIt(options)
18+
const common = createCommon()
19+
20+
describe('.pubsub.publish', function () {
21+
this.timeout(80 * 1000)
22+
23+
let ipfs
24+
25+
before(function (done) {
26+
// CI takes longer to instantiate the daemon, so we need to increase the
27+
// timeout for the before step
28+
this.timeout(60 * 1000)
29+
30+
common.setup((err, factory) => {
31+
expect(err).to.not.exist()
32+
factory.spawnNode((err, node) => {
33+
expect(err).to.not.exist()
34+
ipfs = node
35+
done()
36+
})
37+
})
38+
})
39+
40+
after((done) => common.teardown(done))
41+
42+
it('should error on string messags', (done) => {
43+
const topic = getTopic()
44+
ipfs.pubsub.publish(topic, 'hello friend', (err) => {
45+
expect(err).to.exist()
46+
done()
47+
})
48+
})
49+
50+
it('should publish message from buffer', (done) => {
51+
const topic = getTopic()
52+
ipfs.pubsub.publish(topic, Buffer.from(hat()), done)
53+
})
54+
55+
it('should publish 10 times within time limit', (done) => {
56+
const count = 10
57+
const topic = getTopic()
58+
59+
times(count, (_, cb) => {
60+
ipfs.pubsub.publish(topic, Buffer.from(hat()), cb)
61+
}, done)
62+
})
63+
})
64+
}

0 commit comments

Comments
 (0)