@@ -36,32 +36,38 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
36
36
throw err
37
37
}
38
38
39
- // eslint-disable-next-line no-console
40
- const onError = options . onError || ( err => console . error ( err ) )
39
+ readMessages ( ndjson ( toIterable ( res . body ) ) , {
40
+ onMessage : handler ,
41
+ onEnd : ( ) => subsTracker . unsubscribe ( topic , handler ) ,
42
+ onError : options . onError
43
+ } )
44
+ }
45
+ } )
46
+
47
+ async function readMessages ( msgStream , { onMessage, onEnd, onError } ) {
48
+ // eslint-disable-next-line no-console
49
+ onError = onError || ( err => console . error ( err ) )
41
50
42
- ; ( async ( ) => {
51
+ try {
52
+ for await ( const msg of msgStream ) {
43
53
try {
44
- for await ( const msg of ndjson ( toIterable ( res . body ) ) ) {
45
- try {
46
- handler ( {
47
- from : bs58 . encode ( Buffer . from ( msg . from , 'base64' ) ) . toString ( ) ,
48
- data : Buffer . from ( msg . data , 'base64' ) ,
49
- seqno : Buffer . from ( msg . seqno , 'base64' ) ,
50
- topicIDs : msg . topicIDs
51
- } )
52
- } catch ( err ) {
53
- onError ( explain ( err , 'Failed to parse pubsub message' ) , false ) // Not fatal
54
- }
55
- }
54
+ onMessage ( {
55
+ from : bs58 . encode ( Buffer . from ( msg . from , 'base64' ) ) . toString ( ) ,
56
+ data : Buffer . from ( msg . data , 'base64' ) ,
57
+ seqno : Buffer . from ( msg . seqno , 'base64' ) ,
58
+ topicIDs : msg . topicIDs
59
+ } )
56
60
} catch ( err ) {
57
- // FIXME: In testing with Chrome, err.type is undefined (should not be!)
58
- // Temporarily use the name property instead.
59
- if ( err . type !== 'aborted' && err . name !== 'AbortError' ) {
60
- onError ( err , true ) // Fatal
61
- }
62
- } finally {
63
- subsTracker . unsubscribe ( topic , handler )
61
+ onError ( explain ( err , 'Failed to parse pubsub message' ) , false ) // Not fatal
64
62
}
65
- } ) ( )
63
+ }
64
+ } catch ( err ) {
65
+ // FIXME: In testing with Chrome, err.type is undefined (should not be!)
66
+ // Temporarily use the name property instead.
67
+ if ( err . type !== 'aborted' && err . name !== 'AbortError' ) {
68
+ onError ( err , true ) // Fatal
69
+ }
70
+ } finally {
71
+ onEnd ( )
66
72
}
67
- } )
73
+ }
0 commit comments