@@ -4,13 +4,15 @@ const ndjson = require('iterable-ndjson')
4
4
const explain = require ( 'explain-error' )
5
5
const bs58 = require ( 'bs58' )
6
6
const { Buffer } = require ( 'buffer' )
7
+ const log = require ( 'debug' ) ( 'ipfs-http-client:pubsub:subscribe' )
7
8
const { objectToQuery } = require ( '../lib/querystring' )
8
9
const configure = require ( '../lib/configure' )
9
10
const { ok, toIterable } = require ( '../lib/fetch' )
10
11
const SubscriptionTracker = require ( './subscription-tracker' )
11
12
12
13
module . exports = configure ( ( { fetch, apiAddr, apiPath, headers } ) => {
13
14
const subsTracker = SubscriptionTracker . singleton ( )
15
+ const publish = require ( './publish' ) ( { fetch, apiAddr, apiPath, headers } )
14
16
15
17
return async ( topic , handler , options ) => {
16
18
options = options || { }
@@ -25,6 +27,18 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
25
27
const url = `${ apiAddr } ${ apiPath } /pubsub/sub${ qs } `
26
28
let res
27
29
30
+ // In Firefox, the initial call to fetch does not resolve until some data
31
+ // is received. If this doesn't happen within 1 second send an empty message
32
+ // to kickstart the process.
33
+ const ffWorkaround = setTimeout ( async ( ) => {
34
+ log ( `Publishing empty message to "${ topic } " to resolve subscription request` )
35
+ try {
36
+ await publish ( topic , Buffer . alloc ( 0 ) , options )
37
+ } catch ( err ) {
38
+ log ( 'Failed to publish empty message' , err )
39
+ }
40
+ } , 1000 )
41
+
28
42
try {
29
43
res = await ok ( fetch ( url , {
30
44
method : 'POST' ,
@@ -36,6 +50,8 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
36
50
throw err
37
51
}
38
52
53
+ clearTimeout ( ffWorkaround )
54
+
39
55
readMessages ( ndjson ( toIterable ( res . body ) ) , {
40
56
onMessage : handler ,
41
57
onEnd : ( ) => subsTracker . unsubscribe ( topic , handler ) ,
@@ -45,8 +61,7 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
45
61
} )
46
62
47
63
async function readMessages ( msgStream , { onMessage, onEnd, onError } ) {
48
- // eslint-disable-next-line no-console
49
- onError = onError || ( err => console . error ( err ) )
64
+ onError = onError || log
50
65
51
66
try {
52
67
for await ( const msg of msgStream ) {
@@ -58,7 +73,7 @@ async function readMessages (msgStream, { onMessage, onEnd, onError }) {
58
73
topicIDs : msg . topicIDs
59
74
} )
60
75
} catch ( err ) {
61
- onError ( explain ( err , 'Failed to parse pubsub message' ) , false ) // Not fatal
76
+ onError ( explain ( err , 'Failed to parse pubsub message' ) , false , msg ) // Not fatal
62
77
}
63
78
}
64
79
} catch ( err ) {
0 commit comments