Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit a8e2502

Browse files
committed
Pubsub commands working (WIP)
1 parent fef4325 commit a8e2502

File tree

2 files changed

+11
-34
lines changed

2 files changed

+11
-34
lines changed

src/api/pubsub.js

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
const promisify = require('promisify-es6')
44
const bs58 = require('bs58')
55
var Base64 = require('js-base64').Base64
6+
var Stream = require('stream')
67
// const Wreck = require('wreck')
78
var http = require('http')
8-
var Writable = require('stream').Writable;
99

1010
module.exports = (send) => {
1111
const api = {
@@ -19,48 +19,28 @@ module.exports = (send) => {
1919
}
2020

2121

22-
// var ws = Writable();
23-
var Stream = require('stream');
24-
var rs = new Stream;
25-
rs.readable = true;
26-
27-
// ws._write = function (chunk, enc, next) {
28-
// // console.log('>', d.toString())
29-
// var parsed = JSON.parse(chunk)
30-
// parsed.from = bs58.encode(parsed.from)
31-
// parsed.data = Base64.decode(parsed.data)
32-
// parsed.seqno = Base64.decode(parsed.seqno)
33-
// console.log(parsed)
34-
// next();
35-
// };
22+
var rs = new Stream()
23+
rs.readable = true
3624

3725
console.log('Sub', topic)
3826
http.get({
3927
host: 'localhost',
4028
port: 5001,
4129
path: '/api/v0/pubsub/sub/' + topic
4230
}, function (response) {
43-
// Continuously update stream with data
44-
// var body = ''
45-
// response.pipe
4631
response.on('data', function (d) {
47-
// console.log("chunk", d.toString())
4832
var parsed = JSON.parse(d)
49-
parsed.from = bs58.encode(parsed.from)
50-
parsed.data = Base64.decode(parsed.data)
51-
parsed.seqno = Base64.decode(parsed.seqno)
52-
// console.log(parsed)
53-
rs.emit('data', parsed)
54-
// rs.push(JSON.stringify(parsed))
55-
// ws.write(d)
56-
// body += d
33+
34+
// skip "double subscription" error
35+
if(!parsed.Message) {
36+
parsed.from = bs58.encode(parsed.from)
37+
parsed.data = Base64.decode(parsed.data)
38+
parsed.seqno = Base64.decode(parsed.seqno)
39+
rs.emit('data', parsed)
40+
}
5741
})
5842
response.on('end', function () {
5943
rs.emit('end')
60-
// Data reception is done, do whatever with it!
61-
// var parsed = JSON.parse(body)
62-
// console.log(parsed)
63-
// callback(null, parsed)
6444
})
6545

6646
callback(null, rs)
@@ -74,7 +54,6 @@ module.exports = (send) => {
7454
// return callback(err)
7555
// }
7656

77-
// console.log("THIS IS THE CALLBACK")
7857
// callback(null, response.pipe(ndjson.parse()))
7958
// // callback(null, result) // result is a Stream
8059
// })

test/ipfs-api/pubsub.spec.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@ describe('.pubsub', () => {
3535

3636
it.only('sub', (done) => {
3737
ipfs.pubsub.sub('testi1', (err, result) => {
38-
// console.log('RESULT1', err, result)
3938
expect(err).to.not.exist
40-
// expect(result.length).to.equal(1)
4139
result.on('data', function (d) {
4240
// console.log("-->", d)
4341
expect(d.data).to.equal('hi')

0 commit comments

Comments
 (0)