Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit fef4325

Browse files
committed
Fix stuff
1 parent ff6e6c8 commit fef4325

File tree

4 files changed

+100
-60
lines changed

4 files changed

+100
-60
lines changed

src/api/pubsub.js

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const bs58 = require('bs58')
55
var Base64 = require('js-base64').Base64
66
// const Wreck = require('wreck')
77
var http = require('http')
8+
var Writable = require('stream').Writable;
89

910
module.exports = (send) => {
1011
const api = {
@@ -17,50 +18,65 @@ module.exports = (send) => {
1718
options = {}
1819
}
1920

21+
22+
// var ws = Writable();
23+
var Stream = require('stream');
24+
var rs = new Stream;
25+
rs.readable = true;
26+
27+
// ws._write = function (chunk, enc, next) {
28+
// // console.log('>', d.toString())
29+
// var parsed = JSON.parse(chunk)
30+
// parsed.from = bs58.encode(parsed.from)
31+
// parsed.data = Base64.decode(parsed.data)
32+
// parsed.seqno = Base64.decode(parsed.seqno)
33+
// console.log(parsed)
34+
// next();
35+
// };
36+
2037
console.log('Sub', topic)
2138
http.get({
2239
host: 'localhost',
2340
port: 5001,
2441
path: '/api/v0/pubsub/sub/' + topic
2542
}, function (response) {
2643
// Continuously update stream with data
27-
var body = ''
44+
// var body = ''
45+
// response.pipe
2846
response.on('data', function (d) {
29-
// console.log('>', d.toString())
47+
// console.log("chunk", d.toString())
3048
var parsed = JSON.parse(d)
3149
parsed.from = bs58.encode(parsed.from)
3250
parsed.data = Base64.decode(parsed.data)
3351
parsed.seqno = Base64.decode(parsed.seqno)
34-
console.log(parsed)
35-
body += d
52+
// console.log(parsed)
53+
rs.emit('data', parsed)
54+
// rs.push(JSON.stringify(parsed))
55+
// ws.write(d)
56+
// body += d
3657
})
3758
response.on('end', function () {
59+
rs.emit('end')
3860
// Data reception is done, do whatever with it!
39-
var parsed = JSON.parse(body)
40-
console.log(parsed)
41-
callback(null, parsed)
61+
// var parsed = JSON.parse(body)
62+
// console.log(parsed)
63+
// callback(null, parsed)
4264
})
43-
})
44-
45-
// Wreck.get('http://localhost:5001/api/v0/pubsub/sub/' + topic, (err, res, payload) => {
46-
// if (err) {
47-
// return callback(err)
48-
// }
4965

50-
// console.log(payload.toString())
51-
// // const result = JSON.parse(res)
52-
// callback(null, payload)
53-
// })
66+
callback(null, rs)
67+
})
5468

5569
// send({
5670
// path: 'pubsub/sub/' + topic
57-
// }, (err, result) => {
58-
// console.log('RESULT', err, result)
71+
// }, (err, response) => {
72+
// console.log('RESULT', err, response)
5973
// if (err) {
6074
// return callback(err)
6175
// }
6276

63-
// callback(null, result) // result is a Stream
77+
// console.log("THIS IS THE CALLBACK")
78+
// callback(null, response.pipe(ndjson.parse()))
79+
// // callback(null, result) // result is a Stream
6480
// })
6581
}),
6682
pub: promisify((topic, data, options, callback) => {
@@ -81,8 +97,7 @@ module.exports = (send) => {
8197

8298
send({
8399
path: 'pubsub/pub',
84-
qs: { topic: topic },
85-
files: buf
100+
args: [topic, buf]
86101
}, (err, result) => {
87102
if (err) {
88103
return callback(err)

test.js

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,40 @@
33
var http = require('http')
44
var Base64 = require('js-base64').Base64
55
const bs58 = require('bs58')
6+
const ipfsApi = require('./src')
67

7-
http.get({
8-
host: 'localhost',
9-
port: 5001,
10-
path: '/api/v0/pubsub/sub/testi1'
11-
}, function (response) {
12-
// Continuously update stream with data
13-
var body = ''
14-
response.on('data', function (d) {
15-
// console.log(">", d.toString())
16-
var parsed = JSON.parse(d)
17-
parsed.from = bs58.encode(parsed.from)
18-
parsed.data = Base64.decode(parsed.data)
19-
parsed.seqno = Base64.decode(parsed.seqno)
20-
console.log(parsed)
21-
body += d
22-
})
23-
response.on('end', function () {
24-
// Data reception is done, do whatever with it!
25-
var parsed = JSON.parse(body)
26-
console.log(parsed)
27-
// callback(null, parsed)
8+
const ipfs = ipfsApi()
9+
10+
ipfs.pubsub.sub("testi1", (err, stream) => {
11+
stream.on('data', function (d) {
12+
console.log("<" + d.from + ">", d.data)
2813
})
2914
})
15+
16+
setInterval(() => {
17+
ipfs.pubsub.pub("testi1", "hi")
18+
}, 1000)
19+
20+
// http.get({
21+
// host: 'localhost',
22+
// port: 5001,
23+
// path: '/api/v0/pubsub/sub/testi1'
24+
// }, function (response) {
25+
// // Continuously update stream with data
26+
// var body = ''
27+
// response.on('data', function (d) {
28+
// // console.log(">", d.toString())
29+
// var parsed = JSON.parse(d)
30+
// parsed.from = bs58.encode(parsed.from)
31+
// parsed.data = Base64.decode(parsed.data)
32+
// parsed.seqno = Base64.decode(parsed.seqno)
33+
// console.log(parsed)
34+
// body += d
35+
// })
36+
// response.on('end', function () {
37+
// // Data reception is done, do whatever with it!
38+
// var parsed = JSON.parse(body)
39+
// console.log(parsed)
40+
// // callback(null, parsed)
41+
// })
42+
// })

test/factory/daemon-spawner.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,15 @@ function Factory () {
6363
}
6464

6565
function spawnLocalNode (callback) {
66+
console.log("Spawn local node")
6667
ipfsd.local((err, node) => {
6768
if (err) {
6869
return callback(err)
6970
}
7071

72+
console.log("start daemon")
7173
node.startDaemon((err, ipfs) => {
74+
console.log("started")
7275
if (err) {
7376
return callback(err)
7477
}

test/ipfs-api/pubsub.spec.js

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
const expect = require('chai').expect
66
const isNode = require('detect-node')
77
const FactoryClient = require('../factory/factory-client')
8+
const ipfsApi = require('../../src')
89

910
const useLocalDaemon = true
1011

@@ -18,33 +19,41 @@ describe('.pubsub', () => {
1819

1920
before(function (done) {
2021
this.timeout(20 * 1000) // slow CI
21-
fc = new FactoryClient()
22-
fc.spawnNode(null, null, useLocalDaemon, (err, node) => {
23-
expect(err).to.not.exist
24-
ipfs = node
22+
// fc = new FactoryClient()
23+
// fc.spawnNode(null, null, useLocalDaemon, (err, node) => {
24+
// console.log("aaaa", err, node)
25+
// expect(err).to.not.exist
26+
// if(err) done(err)
27+
ipfs = ipfsApi()
2528
done()
26-
})
29+
// })
2730
})
2831

29-
after((done) => {
30-
fc.dismantle(done)
31-
})
32+
// after((done) => {
33+
// // fc.dismantle(done)
34+
// })
3235

3336
it.only('sub', (done) => {
34-
console.log('1')
3537
ipfs.pubsub.sub('testi1', (err, result) => {
36-
console.log('RESULT1', err, result)
38+
// console.log('RESULT1', err, result)
3739
expect(err).to.not.exist
38-
expect(result.length).to.equal(1)
39-
done()
40+
// expect(result.length).to.equal(1)
41+
result.on('data', function (d) {
42+
// console.log("-->", d)
43+
expect(d.data).to.equal('hi')
44+
done()
45+
})
46+
result.on('end', function () {
47+
console.log("END!!")
48+
})
4049
})
41-
// setTimeout(() => {
42-
// ipfs.pubsub.pub('testi1', 'hi')
43-
// }, 1000)
50+
setTimeout(() => {
51+
ipfs.pubsub.pub('testi1', 'hi')
52+
}, 100)
4453
})
4554

4655
describe('.pub', () => {
47-
it('publishes a message - from string', (done) => {
56+
it.only('publishes a message - from string', (done) => {
4857
const data = 'hello friend'
4958
ipfs.pubsub.pub('testi1', data, (err, result) => {
5059
expect(err).to.not.exist
@@ -53,7 +62,7 @@ describe('.pubsub', () => {
5362
})
5463
})
5564

56-
it('publishes a message - from Buffer', (done) => {
65+
it.only('publishes a message - from Buffer', (done) => {
5766
const data = new Buffer('hello buffer')
5867
ipfs.pubsub.pub('testi1', data, (err, result) => {
5968
expect(err).to.not.exist

0 commit comments

Comments
 (0)