Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 370a801

Browse files
author
Alan Shaw
committed
refactor: more readable code for consuming message stream
License: MIT Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
1 parent 34682d4 commit 370a801

File tree

1 file changed

+30
-24
lines changed

1 file changed

+30
-24
lines changed

src/pubsub/subscribe.js

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,32 +36,38 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => {
3636
throw err
3737
}
3838

39-
// eslint-disable-next-line no-console
40-
const onError = options.onError || (err => console.error(err))
39+
readMessages(ndjson(toIterable(res.body)), {
40+
onMessage: handler,
41+
onEnd: () => subsTracker.unsubscribe(topic, handler),
42+
onError: options.onError
43+
})
44+
}
45+
})
46+
47+
async function readMessages (msgStream, { onMessage, onEnd, onError }) {
48+
// eslint-disable-next-line no-console
49+
onError = onError || (err => console.error(err))
4150

42-
;(async () => {
51+
try {
52+
for await (const msg of msgStream) {
4353
try {
44-
for await (const msg of ndjson(toIterable(res.body))) {
45-
try {
46-
handler({
47-
from: bs58.encode(Buffer.from(msg.from, 'base64')).toString(),
48-
data: Buffer.from(msg.data, 'base64'),
49-
seqno: Buffer.from(msg.seqno, 'base64'),
50-
topicIDs: msg.topicIDs
51-
})
52-
} catch (err) {
53-
onError(explain(err, 'Failed to parse pubsub message'), false) // Not fatal
54-
}
55-
}
54+
onMessage({
55+
from: bs58.encode(Buffer.from(msg.from, 'base64')).toString(),
56+
data: Buffer.from(msg.data, 'base64'),
57+
seqno: Buffer.from(msg.seqno, 'base64'),
58+
topicIDs: msg.topicIDs
59+
})
5660
} catch (err) {
57-
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
58-
// Temporarily use the name property instead.
59-
if (err.type !== 'aborted' && err.name !== 'AbortError') {
60-
onError(err, true) // Fatal
61-
}
62-
} finally {
63-
subsTracker.unsubscribe(topic, handler)
61+
onError(explain(err, 'Failed to parse pubsub message'), false) // Not fatal
6462
}
65-
})()
63+
}
64+
} catch (err) {
65+
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
66+
// Temporarily use the name property instead.
67+
if (err.type !== 'aborted' && err.name !== 'AbortError') {
68+
onError(err, true) // Fatal
69+
}
70+
} finally {
71+
onEnd()
6672
}
67-
})
73+
}

0 commit comments

Comments
 (0)