diff --git a/examples/pubsub/index.html b/examples/pubsub/index.html
new file mode 100644
index 000000000..b6e8e0876
--- /dev/null
+++ b/examples/pubsub/index.html
@@ -0,0 +1,194 @@
+
+
+
+ Pubsub in the browser
+
+
+
+
+
+
+
+
+ Pubsub
+
+
+
+
Connect to peer
+
+
+
+
+
Subscribe to pubsub topic
+
+
+
+
+
Send pubsub message
+
+
+
+
+
+
+
+
diff --git a/package.json b/package.json
index 673b5e58d..a5cdbdda1 100644
--- a/package.json
+++ b/package.json
@@ -17,7 +17,8 @@
"browser": {
"glob": false,
"fs": false,
- "stream": "readable-stream"
+ "stream": "readable-stream",
+ "./src/lib/configure.js": "./src/lib/configure.browser.js"
},
"repository": "github:ipfs/js-ipfs-http-client",
"scripts": {
@@ -33,6 +34,7 @@
"coverage": "npx nyc -r html npm run test:node -- --bail"
},
"dependencies": {
+ "abort-controller": "^3.0.0",
"async": "^2.6.1",
"bignumber.js": "^9.0.0",
"bl": "^3.0.0",
@@ -44,6 +46,7 @@
"detect-node": "^2.0.4",
"end-of-stream": "^1.4.1",
"err-code": "^1.1.2",
+ "explain-error": "^1.0.4",
"flatmap": "0.0.3",
"glob": "^7.1.3",
"ipfs-block": "~0.8.1",
@@ -56,6 +59,7 @@
"is-stream": "^2.0.0",
"iso-stream-http": "~0.1.2",
"iso-url": "~0.4.6",
+ "iterable-ndjson": "^1.1.0",
"just-kebab-case": "^1.1.0",
"just-map-keys": "^1.1.0",
"kind-of": "^6.0.2",
@@ -65,6 +69,7 @@
"multicodec": "~0.5.1",
"multihashes": "~0.4.14",
"ndjson": "github:hugomrdias/ndjson#feat/readable-stream3",
+ "node-fetch": "^2.6.0",
"once": "^1.4.0",
"peer-id": "~0.12.2",
"peer-info": "~0.15.1",
@@ -86,7 +91,7 @@
"cross-env": "^5.2.0",
"dirty-chai": "^2.0.1",
"go-ipfs-dep": "~0.4.21",
- "interface-ipfs-core": "^0.107.1",
+ "interface-ipfs-core": "github:ipfs/interface-js-ipfs-core#fix/max-pubsub-reqs-browser",
"ipfsd-ctl": "~0.43.0",
"nock": "^10.0.2",
"stream-equal": "^1.1.1"
diff --git a/src/lib/callbackify.js b/src/lib/callbackify.js
new file mode 100644
index 000000000..3a041612d
--- /dev/null
+++ b/src/lib/callbackify.js
@@ -0,0 +1,17 @@
+'use strict'
+
+module.exports = (fn, opts) => {
+ opts = opts || {}
+ // Min number of non-callback args
+ opts.minArgs = opts.minArgs == null ? 0 : opts.minArgs
+
+ return (...args) => {
+ const cb = args[args.length - 1]
+
+ if (typeof cb !== 'function' || args.length === opts.minArgs) {
+ return fn(...args)
+ }
+
+ fn(...args.slice(0, -1)).then(res => cb(null, res), cb)
+ }
+}
diff --git a/src/lib/configure.browser.js b/src/lib/configure.browser.js
new file mode 100644
index 000000000..3597554ec
--- /dev/null
+++ b/src/lib/configure.browser.js
@@ -0,0 +1,45 @@
+'use strict'
+/* eslint-env browser */
+
+const { toUri } = require('./multiaddr')
+
+// Set default configuration and call create function with them
+module.exports = create => config => {
+ config = config || {}
+
+ if (typeof config === 'string') {
+ config = { apiAddr: config }
+ } else if (config.constructor && config.constructor.isMultiaddr) {
+ config = { apiAddr: config }
+ } else {
+ config = { ...config }
+ }
+
+ config.fetch = config.fetch || require('./fetch').fetch
+ config.apiAddr = (config.apiAddr || getDefaultApiAddr(config)).toString()
+ config.apiAddr = config.apiAddr.startsWith('/')
+ ? toUri(config.apiAddr)
+ : config.apiAddr
+ config.apiPath = config.apiPath || config['api-path'] || '/api/v0'
+
+ if (config.apiPath.endsWith('/')) {
+ config.apiPath = config.apiPath.slice(0, -1)
+ }
+
+ config.headers = new Headers(config.headers)
+
+ return create(config)
+}
+
+function getDefaultApiAddr ({ protocol, host, port }) {
+ if (!protocol) {
+ protocol = location.protocol.startsWith('http')
+ ? location.protocol.split(':')[0]
+ : 'http'
+ }
+
+ host = host || location.hostname
+ port = port || location.port
+
+ return `${protocol}://${host}${port ? ':' + port : ''}`
+}
diff --git a/src/lib/configure.js b/src/lib/configure.js
new file mode 100644
index 000000000..9dd48b32a
--- /dev/null
+++ b/src/lib/configure.js
@@ -0,0 +1,43 @@
+'use strict'
+
+const { Headers } = require('node-fetch')
+const { toUri } = require('./multiaddr')
+const pkg = require('../../package.json')
+
+// Set default configuration and call create function with them
+module.exports = create => config => {
+ config = config || {}
+
+ if (typeof config === 'string') {
+ config = { apiAddr: config }
+ } else if (config.constructor && config.constructor.isMultiaddr) {
+ config = { apiAddr: config }
+ } else {
+ config = { ...config }
+ }
+
+ config.fetch = config.fetch || require('./fetch').fetch
+
+ if (config.protocol || config.host || config.port) {
+ const port = config.port ? `:${config.port}` : ''
+ config.apiAddr = `${config.protocol || 'http'}://${config.host || 'localhost'}${port}`
+ }
+
+ config.apiAddr = (config.apiAddr || 'http://localhost:5001').toString()
+ config.apiAddr = config.apiAddr.startsWith('/')
+ ? toUri(config.apiAddr)
+ : config.apiAddr
+ config.apiPath = config.apiPath || config['api-path'] || '/api/v0'
+
+ if (config.apiPath.endsWith('/')) {
+ config.apiPath = config.apiPath.slice(0, -1)
+ }
+
+ config.headers = new Headers(config.headers)
+
+ if (!config.headers.has('User-Agent')) {
+ config.headers.append('User-Agent', `${pkg.name}/${pkg.version}`)
+ }
+
+ return create(config)
+}
diff --git a/src/lib/fetch.js b/src/lib/fetch.js
new file mode 100644
index 000000000..b3cf03cbb
--- /dev/null
+++ b/src/lib/fetch.js
@@ -0,0 +1,53 @@
+'use strict'
+
+const explain = require('explain-error')
+
+exports.fetch = require('node-fetch')
+
+// Ensure fetch response is ok (200)
+// and if not, attempt to JSON parse body, extract error message and throw
+exports.ok = async res => {
+ res = await res
+
+ if (!res.ok) {
+ const { status } = res
+ const defaultMsg = `unexpected status ${status}`
+ let msg
+ try {
+ let data = await res.text()
+ try {
+ data = JSON.parse(data)
+ msg = data.message || data.Message
+ } catch (err) {
+ msg = data
+ }
+ } catch (err) {
+ throw Object.assign(explain(err, defaultMsg), { status })
+ }
+ throw Object.assign(new Error(msg || defaultMsg), { status })
+ }
+
+ return res
+}
+
+exports.toIterable = body => {
+ if (body[Symbol.asyncIterator]) return body
+
+ if (body.getReader) {
+ return (async function * () {
+ const reader = body.getReader()
+
+ try {
+ while (true) {
+ const { done, value } = await reader.read()
+ if (done) return
+ yield value
+ }
+ } finally {
+ reader.releaseLock()
+ }
+ })()
+ }
+
+ throw new Error('unknown stream')
+}
diff --git a/src/lib/multiaddr.js b/src/lib/multiaddr.js
new file mode 100644
index 000000000..09462ab34
--- /dev/null
+++ b/src/lib/multiaddr.js
@@ -0,0 +1,18 @@
+'use strict'
+
+// Convert a multiaddr to a URI
+// Assumes multiaddr is in a format that can be converted to a HTTP(s) URI
+exports.toUri = ma => {
+ const parts = `${ma}`.split('/')
+ const port = getPort(parts)
+ return `${getProtocol(parts)}://${parts[2]}${port == null ? '' : ':' + port}`
+}
+
+function getProtocol (maParts) {
+ return maParts.indexOf('https') === -1 ? 'http' : 'https'
+}
+
+function getPort (maParts) {
+ const tcpIndex = maParts.indexOf('tcp')
+ return tcpIndex === -1 ? null : maParts[tcpIndex + 1]
+}
diff --git a/src/lib/querystring.js b/src/lib/querystring.js
new file mode 100644
index 000000000..07de0edda
--- /dev/null
+++ b/src/lib/querystring.js
@@ -0,0 +1,16 @@
+'use strict'
+
+const Qs = require('qs')
+
+// Convert an object to a query string INCLUDING leading ?
+// Excludes null/undefined values
+exports.objectToQuery = obj => {
+ if (!obj) return ''
+
+ const qs = Object.entries(obj).reduce((obj, [key, value]) => {
+ if (value != null) obj[key] = value
+ return obj
+ }, {})
+
+ return Object.keys(qs).length ? `?${Qs.stringify(qs, { arrayFormat: 'repeat' })}` : ''
+}
diff --git a/src/pubsub.js b/src/pubsub.js
deleted file mode 100644
index 6b298351d..000000000
--- a/src/pubsub.js
+++ /dev/null
@@ -1,212 +0,0 @@
-'use strict'
-
-const promisify = require('promisify-es6')
-const EventEmitter = require('events')
-const eos = require('end-of-stream')
-const isNode = require('detect-node')
-const setImmediate = require('async/setImmediate')
-const PubsubMessageStream = require('./utils/pubsub-message-stream')
-const stringlistToArray = require('./utils/stringlist-to-array')
-const moduleConfig = require('./utils/module-config')
-
-const NotSupportedError = () => new Error('pubsub is currently not supported when run in the browser')
-
-/* Public API */
-module.exports = (arg) => {
- const send = moduleConfig(arg)
-
- /* Internal subscriptions state and functions */
- const ps = new EventEmitter()
- const subscriptions = {}
- ps.id = Math.random()
- return {
- subscribe: (topic, handler, options, callback) => {
- const defaultOptions = {
- discover: false
- }
-
- if (typeof options === 'function') {
- callback = options
- options = defaultOptions
- }
-
- if (!options) {
- options = defaultOptions
- }
-
- // Throw an error if ran in the browsers
- if (!isNode) {
- if (!callback) {
- return Promise.reject(NotSupportedError())
- }
-
- return setImmediate(() => callback(NotSupportedError()))
- }
-
- // promisify doesn't work as we always pass a
- // function as last argument (`handler`)
- if (!callback) {
- return new Promise((resolve, reject) => {
- subscribe(topic, handler, options, (err) => {
- if (err) {
- return reject(err)
- }
- resolve()
- })
- })
- }
-
- subscribe(topic, handler, options, callback)
- },
- unsubscribe: (topic, handler, callback) => {
- if (!isNode) {
- if (!callback) {
- return Promise.reject(NotSupportedError())
- }
-
- return setImmediate(() => callback(NotSupportedError()))
- }
-
- if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) {
- const err = new Error(`Not subscribed to '${topic}'`)
-
- if (!callback) {
- return Promise.reject(err)
- }
-
- return setImmediate(() => callback(err))
- }
-
- if (!handler && !callback) {
- ps.removeAllListeners(topic)
- } else {
- ps.removeListener(topic, handler)
- }
-
- // Drop the request once we are actually done
- if (ps.listenerCount(topic) === 0) {
- if (!callback) {
- return new Promise((resolve, reject) => {
- // When the response stream has ended, resolve the promise
- eos(subscriptions[topic].res, (err) => {
- // FIXME: Artificial timeout needed to ensure unsubscribed
- setTimeout(() => {
- if (err) return reject(err)
- resolve()
- })
- })
- subscriptions[topic].req.abort()
- subscriptions[topic] = null
- })
- }
-
- // When the response stream has ended, call the callback
- eos(subscriptions[topic].res, (err) => {
- // FIXME: Artificial timeout needed to ensure unsubscribed
- setTimeout(() => callback(err))
- })
- subscriptions[topic].req.abort()
- subscriptions[topic] = null
- return
- }
-
- if (!callback) {
- return Promise.resolve()
- }
-
- setImmediate(() => callback())
- },
- publish: promisify((topic, data, callback) => {
- if (!isNode) {
- return callback(NotSupportedError())
- }
-
- if (!Buffer.isBuffer(data)) {
- return callback(new Error('data must be a Buffer'))
- }
-
- const request = {
- path: 'pubsub/pub',
- args: [topic, data]
- }
-
- send(request, callback)
- }),
- ls: promisify((callback) => {
- if (!isNode) {
- return callback(NotSupportedError())
- }
-
- const request = {
- path: 'pubsub/ls'
- }
-
- send.andTransform(request, stringlistToArray, callback)
- }),
- peers: promisify((topic, callback) => {
- if (!isNode) {
- return callback(NotSupportedError())
- }
-
- const request = {
- path: 'pubsub/peers',
- args: [topic]
- }
-
- send.andTransform(request, stringlistToArray, callback)
- }),
- setMaxListeners (n) {
- return ps.setMaxListeners(n)
- }
- }
-
- function subscribe (topic, handler, options, callback) {
- ps.on(topic, handler)
-
- if (subscriptions[topic]) {
- // TODO: should a callback error be returned?
- return callback()
- }
-
- // Request params
- const request = {
- path: 'pubsub/sub',
- args: [topic],
- qs: {
- discover: options.discover
- }
- }
-
- // Start the request and transform the response
- // stream to Pubsub messages stream
- subscriptions[topic] = {}
- subscriptions[topic].req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
- if (err) {
- subscriptions[topic] = null
- ps.removeListener(topic, handler)
- return callback(err)
- }
-
- subscriptions[topic].res = stream
-
- stream.on('data', (msg) => {
- ps.emit(topic, msg)
- })
-
- stream.on('error', (err) => {
- ps.emit('error', err)
- })
-
- eos(stream, (err) => {
- if (err) {
- ps.emit('error', err)
- }
-
- subscriptions[topic] = null
- ps.removeListener(topic, handler)
- })
-
- callback()
- })
- }
-}
diff --git a/src/pubsub/index.js b/src/pubsub/index.js
new file mode 100644
index 000000000..8562e9ea3
--- /dev/null
+++ b/src/pubsub/index.js
@@ -0,0 +1,18 @@
+'use strict'
+
+const callbackify = require('../lib/callbackify')
+
+// This file is temporary and for compatibility with legacy usage
+module.exports = (send, options) => {
+ if (typeof send !== 'function') {
+ options = send
+ }
+
+ return {
+ ls: callbackify(require('./ls')(options)),
+ peers: callbackify(require('./peers')(options)),
+ publish: callbackify(require('./publish')(options)),
+ subscribe: callbackify(require('./subscribe')(options), { minArgs: 2 }),
+ unsubscribe: callbackify(require('./unsubscribe')(options), { minArgs: 2 })
+ }
+}
diff --git a/src/pubsub/ls.js b/src/pubsub/ls.js
new file mode 100644
index 000000000..bfbf8239e
--- /dev/null
+++ b/src/pubsub/ls.js
@@ -0,0 +1,20 @@
+'use strict'
+
+const configure = require('../lib/configure')
+const { ok } = require('../lib/fetch')
+const { objectToQuery } = require('../lib/querystring')
+
+module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
+ return async (options) => {
+ options = options || {}
+
+ const qs = objectToQuery(options.qs)
+ const url = `${apiAddr}${apiPath}/pubsub/ls${qs}`
+ const res = await ok(fetch(url, {
+ signal: options.signal,
+ headers: options.headers || headers
+ }))
+ const data = await res.json()
+ return data.Strings || []
+ }
+})
diff --git a/src/pubsub/peers.js b/src/pubsub/peers.js
new file mode 100644
index 000000000..7ee860e33
--- /dev/null
+++ b/src/pubsub/peers.js
@@ -0,0 +1,29 @@
+'use strict'
+
+const { objectToQuery } = require('../lib/querystring')
+const configure = require('../lib/configure')
+const { ok } = require('../lib/fetch')
+
+module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
+ return async (topic, options) => {
+ if (!options && typeof topic === 'object') {
+ options = topic
+ topic = null
+ }
+
+ options = options || {}
+
+ const qs = objectToQuery({
+ arg: topic,
+ ...(options.qs || {})
+ })
+
+ const url = `${apiAddr}${apiPath}/pubsub/peers${qs}`
+ const res = await ok(fetch(url, {
+ signal: options.signal,
+ headers: options.headers || headers
+ }))
+ const data = await res.json()
+ return data.Strings || []
+ }
+})
diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js
new file mode 100644
index 000000000..d214ddbc8
--- /dev/null
+++ b/src/pubsub/publish.js
@@ -0,0 +1,51 @@
+'use strict'
+
+const { Buffer } = require('buffer')
+const configure = require('../lib/configure')
+const { objectToQuery } = require('../lib/querystring')
+const { ok } = require('../lib/fetch')
+
+module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
+ return async (topic, data, options) => {
+ options = options || {}
+
+ if (!Buffer.isBuffer(data)) {
+ throw new Error('data must be a Buffer')
+ }
+
+ let qs = objectToQuery(options.qs)
+ qs = qs ? `&${qs.slice(1)}` : qs
+
+ const url = `${apiAddr}${apiPath}/pubsub/pub?arg=${encodeURIComponent(topic)}&arg=${encodeBuffer(data)}${qs}`
+ const res = await ok(fetch(url, {
+ method: 'POST',
+ signal: options.signal,
+ headers: options.headers || headers
+ }))
+
+ return res.text()
+ }
+})
+
+function encodeBuffer (buf) {
+ let uriEncoded = ''
+ for (const byte of buf) {
+ // https://tools.ietf.org/html/rfc3986#page-14
+ // ALPHA (%41-%5A and %61-%7A), DIGIT (%30-%39), hyphen (%2D), period (%2E),
+ // underscore (%5F), or tilde (%7E)
+ if (
+ (byte >= 0x41 && byte <= 0x5A) ||
+ (byte >= 0x61 && byte <= 0x7A) ||
+ (byte >= 0x30 && byte <= 0x39) ||
+ (byte === 0x2D) ||
+ (byte === 0x2E) ||
+ (byte === 0x5F) ||
+ (byte === 0x7E)
+ ) {
+ uriEncoded += String.fromCharCode(byte)
+ } else {
+ uriEncoded += `%${byte.toString(16).padStart(2, '0')}`
+ }
+ }
+ return uriEncoded
+}
diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js
new file mode 100644
index 000000000..90eeaed0f
--- /dev/null
+++ b/src/pubsub/subscribe.js
@@ -0,0 +1,88 @@
+'use strict'
+
+const ndjson = require('iterable-ndjson')
+const explain = require('explain-error')
+const bs58 = require('bs58')
+const { Buffer } = require('buffer')
+const log = require('debug')('ipfs-http-client:pubsub:subscribe')
+const { objectToQuery } = require('../lib/querystring')
+const configure = require('../lib/configure')
+const { ok, toIterable } = require('../lib/fetch')
+const SubscriptionTracker = require('./subscription-tracker')
+
+module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
+ const subsTracker = SubscriptionTracker.singleton()
+ const publish = require('./publish')({ fetch, apiAddr, apiPath, headers })
+
+ return async (topic, handler, options) => {
+ options = options || {}
+ options.signal = subsTracker.subscribe(topic, handler, options.signal)
+
+ const qs = objectToQuery({
+ arg: topic,
+ discover: options.discover,
+ ...(options.qs || {})
+ })
+
+ const url = `${apiAddr}${apiPath}/pubsub/sub${qs}`
+ let res
+
+ // In Firefox, the initial call to fetch does not resolve until some data
+ // is received. If this doesn't happen within 1 second send an empty message
+ // to kickstart the process.
+ const ffWorkaround = setTimeout(async () => {
+ log(`Publishing empty message to "${topic}" to resolve subscription request`)
+ try {
+ await publish(topic, Buffer.alloc(0), options)
+ } catch (err) {
+ log('Failed to publish empty message', err)
+ }
+ }, 1000)
+
+ try {
+ res = await ok(fetch(url, {
+ method: 'POST',
+ signal: options.signal,
+ headers: options.headers || headers
+ }))
+ } catch (err) { // Initial subscribe fail, ensure we clean up
+ subsTracker.unsubscribe(topic, handler)
+ throw err
+ }
+
+ clearTimeout(ffWorkaround)
+
+ readMessages(ndjson(toIterable(res.body)), {
+ onMessage: handler,
+ onEnd: () => subsTracker.unsubscribe(topic, handler),
+ onError: options.onError
+ })
+ }
+})
+
+async function readMessages (msgStream, { onMessage, onEnd, onError }) {
+ onError = onError || log
+
+ try {
+ for await (const msg of msgStream) {
+ try {
+ onMessage({
+ from: bs58.encode(Buffer.from(msg.from, 'base64')).toString(),
+ data: Buffer.from(msg.data, 'base64'),
+ seqno: Buffer.from(msg.seqno, 'base64'),
+ topicIDs: msg.topicIDs
+ })
+ } catch (err) {
+ onError(explain(err, 'Failed to parse pubsub message'), false, msg) // Not fatal
+ }
+ }
+ } catch (err) {
+ // FIXME: In testing with Chrome, err.type is undefined (should not be!)
+ // Temporarily use the name property instead.
+ if (err.type !== 'aborted' && err.name !== 'AbortError') {
+ onError(err, true) // Fatal
+ }
+ } finally {
+ onEnd()
+ }
+}
diff --git a/src/pubsub/subscription-tracker.js b/src/pubsub/subscription-tracker.js
new file mode 100644
index 000000000..bbd7c2d7a
--- /dev/null
+++ b/src/pubsub/subscription-tracker.js
@@ -0,0 +1,52 @@
+'use strict'
+
+const AbortController = require('abort-controller')
+
+class SubscriptionTracker {
+ constructor () {
+ this._subs = new Map()
+ }
+
+ static singleton () {
+ if (SubscriptionTracker.instance) return SubscriptionTracker.instance
+ SubscriptionTracker.instance = new SubscriptionTracker()
+ return SubscriptionTracker.instance
+ }
+
+ subscribe (topic, handler, signal) {
+ const topicSubs = this._subs.get(topic) || []
+
+ if (topicSubs.find(s => s.handler === handler)) {
+ throw new Error(`Already subscribed to ${topic} with this handler`)
+ }
+
+ // Create controller so a call to unsubscribe can cancel the request
+ const controller = new AbortController()
+
+ this._subs.set(topic, [{ handler, controller }].concat(topicSubs))
+
+ // If there is an external signal, forward the abort event
+ if (signal) {
+ signal.addEventListener('abort', () => this.unsubscribe(topic, handler))
+ }
+
+ return controller.signal
+ }
+
+ unsubscribe (topic, handler) {
+ const subs = this._subs.get(topic) || []
+ let unsubs
+
+ if (handler) {
+ this._subs.set(topic, subs.filter(s => s.handler !== handler))
+ unsubs = subs.filter(s => s.handler === handler)
+ } else {
+ this._subs.set(topic, [])
+ unsubs = subs
+ }
+
+ unsubs.forEach(s => s.controller.abort())
+ }
+}
+
+module.exports = SubscriptionTracker
diff --git a/src/pubsub/unsubscribe.js b/src/pubsub/unsubscribe.js
new file mode 100644
index 000000000..a8bd14944
--- /dev/null
+++ b/src/pubsub/unsubscribe.js
@@ -0,0 +1,10 @@
+'use strict'
+
+const configure = require('../lib/configure')
+const SubscriptionTracker = require('./subscription-tracker')
+
+module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
+ const subsTracker = SubscriptionTracker.singleton()
+ // eslint-disable-next-line require-await
+ return async (topic, handler) => subsTracker.unsubscribe(topic, handler)
+})
diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js
deleted file mode 100644
index 992529213..000000000
--- a/src/utils/pubsub-message-stream.js
+++ /dev/null
@@ -1,34 +0,0 @@
-'use strict'
-
-const TransformStream = require('readable-stream').Transform
-const PubsubMessage = require('./pubsub-message-utils')
-
-class PubsubMessageStream extends TransformStream {
- constructor (options) {
- const opts = Object.assign(options || {}, { objectMode: true })
- super(opts)
- }
-
- static from (inputStream, callback) {
- let outputStream = inputStream.pipe(new PubsubMessageStream())
- inputStream.on('end', () => outputStream.emit('end'))
- callback(null, outputStream)
- }
-
- _transform (obj, enc, callback) {
- // go-ipfs returns '{}' as the very first object atm, we skip that
- if (Object.keys(obj).length === 0) {
- return callback()
- }
-
- try {
- const msg = PubsubMessage.deserialize(obj, 'base64')
- this.push(msg)
- callback()
- } catch (err) {
- return callback(err)
- }
- }
-}
-
-module.exports = PubsubMessageStream
diff --git a/src/utils/pubsub-message-utils.js b/src/utils/pubsub-message-utils.js
deleted file mode 100644
index 53d1e397a..000000000
--- a/src/utils/pubsub-message-utils.js
+++ /dev/null
@@ -1,39 +0,0 @@
-'use strict'
-
-const bs58 = require('bs58')
-
-module.exports = {
- deserialize (data, enc) {
- enc = enc ? enc.toLowerCase() : 'json'
-
- if (enc === 'json') {
- return deserializeFromJson(data)
- } else if (enc === 'base64') {
- return deserializeFromBase64(data)
- }
-
- throw new Error(`Unsupported encoding: '${enc}'`)
- }
-}
-
-function deserializeFromJson (data) {
- const json = JSON.parse(data)
- return deserializeFromBase64(json)
-}
-
-function deserializeFromBase64 (obj) {
- if (!isPubsubMessage(obj)) {
- throw new Error(`Not a pubsub message`)
- }
-
- return {
- from: bs58.encode(Buffer.from(obj.from, 'base64')).toString(),
- seqno: Buffer.from(obj.seqno, 'base64'),
- data: Buffer.from(obj.data, 'base64'),
- topicIDs: obj.topicIDs || obj.topicCIDs
- }
-}
-
-function isPubsubMessage (obj) {
- return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs)
-}
diff --git a/src/utils/stringlist-to-array.js b/src/utils/stringlist-to-array.js
deleted file mode 100644
index df28ee6df..000000000
--- a/src/utils/stringlist-to-array.js
+++ /dev/null
@@ -1,9 +0,0 @@
-'use strict'
-
-// Converts a go-ipfs "stringList" to an array
-// { Strings: ['A', 'B'] } --> ['A', 'B']
-function stringlistToArray (res, cb) {
- cb(null, res.Strings || [])
-}
-
-module.exports = stringlistToArray
diff --git a/test/interface.spec.js b/test/interface.spec.js
index 1c7ed2b7d..d5fe86288 100644
--- a/test/interface.spec.js
+++ b/test/interface.spec.js
@@ -49,6 +49,10 @@ describe('interface-ipfs-core tests', () => {
{
name: 'replace',
reason: 'FIXME Waiting for fix on go-ipfs https://github.com/ipfs/js-ipfs-http-client/pull/307#discussion_r69281789 and https://github.com/ipfs/go-ipfs/issues/2927'
+ },
+ {
+ name: 'profile',
+ reason: 'TODO not yet implemented https://github.com/ipfs/js-ipfs-http-client/pull/1030'
}
]
})
@@ -182,7 +186,14 @@ describe('interface-ipfs-core tests', () => {
]
})
- tests.filesMFS(defaultCommonFactory)
+ tests.filesMFS(defaultCommonFactory, {
+ skip: [
+ {
+ name: 'should ls directory with long option',
+ reason: 'TODO unskip when go-ipfs supports --long https://github.com/ipfs/go-ipfs/pull/6528'
+ }
+ ]
+ })
tests.key(defaultCommonFactory, {
skip: [
@@ -219,7 +230,7 @@ describe('interface-ipfs-core tests', () => {
tests.namePubsub(CommonFactory.create({
spawnOptions: {
args: ['--enable-namesys-pubsub'],
- initOptions: { bits: 1024 }
+ initOptions: { bits: 1024, profile: 'test' }
}
}), {
skip: [
@@ -260,22 +271,20 @@ describe('interface-ipfs-core tests', () => {
tests.pubsub(CommonFactory.create({
spawnOptions: {
args: ['--enable-pubsub-experiment'],
- initOptions: { bits: 1024 }
+ initOptions: { bits: 1024, profile: 'test' }
}
}), {
- skip: isNode ? [
+ skip: isWindows ? [
// pubsub.subscribe
- isWindows ? {
+ {
name: 'should send/receive 100 messages',
reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778'
- } : null,
- isWindows ? {
+ },
+ {
name: 'should receive multiple messages',
reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778'
- } : null
- ] : {
- reason: 'FIXME pubsub is not supported in the browser https://github.com/ipfs/js-ipfs-http-client/issues/518'
- }
+ }
+ ] : null
})
tests.repo(defaultCommonFactory)
diff --git a/test/pubsub-in-browser.spec.js b/test/pubsub-in-browser.spec.js
deleted file mode 100644
index ff1a22347..000000000
--- a/test/pubsub-in-browser.spec.js
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- We currently don't support pubsub when run in the browser,
- and we test it with separate set of tests to make sure
- if it's being used in the browser, pubsub errors.
-
- More info: https://github.com/ipfs/js-ipfs-http-client/issues/518
-
- This means:
- - You can use pubsub from js-ipfs-http-client in Node.js
- - You can use pubsub from js-ipfs-http-client in Electron
- (when js-ipfs-http-client is ran in the main process of Electron)
-
- - You can't use pubsub from js-ipfs-http-client in the browser
- - You can't use pubsub from js-ipfs-http-client in Electron's
- renderer process
-
- - You can use pubsub from js-ipfs in the browsers
- - You can use pubsub from js-ipfs in Node.js
- - You can use pubsub from js-ipfs in Electron
- (in both the main process and the renderer process)
- - See https://github.com/ipfs/js-ipfs for details on
- pubsub in js-ipfs
-*/
-
-/* eslint-env mocha */
-/* eslint max-nested-callbacks: ['error', 8] */
-'use strict'
-
-const isNode = require('detect-node')
-const chai = require('chai')
-const dirtyChai = require('dirty-chai')
-const expect = chai.expect
-chai.use(dirtyChai)
-
-const ipfsClient = require('../src')
-const f = require('./utils/factory')
-
-const expectedError = 'pubsub is currently not supported when run in the browser'
-
-describe('.pubsub is not supported in the browser, yet!', function () {
- this.timeout(50 * 1000)
-
- if (isNode) { return }
-
- const topic = 'pubsub-tests'
- let ipfs
- let ipfsd
-
- before((done) => {
- f.spawn({ initOptions: { bits: 1024, profile: 'test' } }, (err, _ipfsd) => {
- expect(err).to.not.exist()
- ipfsd = _ipfsd
- ipfs = ipfsClient(_ipfsd.apiAddr)
- done()
- })
- })
-
- after((done) => {
- if (!ipfsd) return done()
- ipfsd.stop(done)
- })
-
- describe('everything errors', () => {
- describe('Callback API', () => {
- describe('.publish', () => {
- it('throws an error if called in the browser', (done) => {
- ipfs.pubsub.publish(topic, 'hello friend', (err, topics) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- done()
- })
- })
- })
-
- describe('.subscribe', () => {
- const handler = () => {}
- it('throws an error if called in the browser', (done) => {
- ipfs.pubsub.subscribe(topic, handler, {}, (err, topics) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- done()
- })
- })
- })
-
- describe('.peers', () => {
- it('throws an error if called in the browser', (done) => {
- ipfs.pubsub.peers(topic, (err, topics) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- done()
- })
- })
- })
-
- describe('.ls', () => {
- it('throws an error if called in the browser', (done) => {
- ipfs.pubsub.ls((err, topics) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- done()
- })
- })
- })
- })
-
- describe('Promise API', () => {
- describe('.publish', () => {
- it('throws an error if called in the browser', () => {
- return ipfs.pubsub.publish(topic, 'hello friend')
- .catch((err) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- })
- })
- })
-
- describe('.subscribe', () => {
- const handler = () => {}
- it('throws an error if called in the browser', (done) => {
- ipfs.pubsub.subscribe(topic, handler, {})
- .catch((err) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- done()
- })
- })
- })
-
- describe('.peers', () => {
- it('throws an error if called in the browser', (done) => {
- ipfs.pubsub.peers(topic)
- .catch((err) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- done()
- })
- })
- })
-
- describe('.ls', () => {
- it('throws an error if called in the browser', () => {
- return ipfs.pubsub.ls()
- .catch((err) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- })
- })
- })
- })
-
- describe('.unsubscribe', () => {
- it('throws an error if called in the browser', (done) => {
- ipfs.pubsub.unsubscribe('test', () => {}, (err) => {
- expect(err).to.exist()
- expect(err.message).to.equal(expectedError)
- done()
- })
- })
- })
- })
-})