@@ -19,6 +19,67 @@ module.exports = (arg) => {
19
19
const ps = new EventEmitter ( )
20
20
const subscriptions = { }
21
21
ps . id = Math . random ( )
22
+ // unsubscribe from topics
23
+ const _internalUnsubscribe = ( input ) => {
24
+ let { topic, handler, callback} = input
25
+ if ( ! isNode ) {
26
+ if ( ! callback ) {
27
+ return Promise . reject ( NotSupportedError ( ) )
28
+ }
29
+
30
+ return setImmediate ( ( ) => callback ( NotSupportedError ( ) ) )
31
+ }
32
+
33
+ if ( ps . listenerCount ( topic ) === 0 || ! subscriptions [ topic ] ) {
34
+ const err = new Error ( `Not subscribed to '${ topic } '` )
35
+
36
+ if ( ! callback ) {
37
+ return Promise . reject ( err )
38
+ }
39
+
40
+ return setImmediate ( ( ) => callback ( err ) )
41
+ }
42
+
43
+ // unsubscribe from specific or all handlers
44
+ if ( handler ) {
45
+ ps . removeListener ( topic , handler )
46
+ } else {
47
+ ps . removeAllListeners ( topic )
48
+ }
49
+
50
+ // Drop the request once we are actually done
51
+ if ( ps . listenerCount ( topic ) === 0 ) {
52
+ if ( ! callback ) {
53
+ return new Promise ( ( resolve , reject ) => {
54
+ // When the response stream has ended, resolve the promise
55
+ eos ( subscriptions [ topic ] . res , ( err ) => {
56
+ // FIXME: Artificial timeout needed to ensure unsubscribed
57
+ setTimeout ( ( ) => {
58
+ if ( err ) return reject ( err )
59
+ resolve ( )
60
+ } )
61
+ } )
62
+ subscriptions [ topic ] . req . abort ( )
63
+ subscriptions [ topic ] = null
64
+ } )
65
+ }
66
+
67
+ // When the response stream has ended, call the callback
68
+ eos ( subscriptions [ topic ] . res , ( err ) => {
69
+ // FIXME: Artificial timeout needed to ensure unsubscribed
70
+ setTimeout ( ( ) => callback ( err ) )
71
+ } )
72
+ subscriptions [ topic ] . req . abort ( )
73
+ subscriptions [ topic ] = null
74
+ return
75
+ }
76
+
77
+ if ( ! callback ) {
78
+ return Promise . resolve ( )
79
+ }
80
+
81
+ setImmediate ( ( ) => callback ( ) )
82
+ }
22
83
return {
23
84
subscribe : ( topic , handler , options , callback ) => {
24
85
const defaultOptions = {
@@ -59,58 +120,10 @@ module.exports = (arg) => {
59
120
subscribe ( topic , handler , options , callback )
60
121
} ,
61
122
unsubscribe : ( topic , handler , callback ) => {
62
- if ( ! isNode ) {
63
- if ( ! callback ) {
64
- return Promise . reject ( NotSupportedError ( ) )
65
- }
66
-
67
- return setImmediate ( ( ) => callback ( NotSupportedError ( ) ) )
68
- }
69
-
70
- if ( ps . listenerCount ( topic ) === 0 || ! subscriptions [ topic ] ) {
71
- const err = new Error ( `Not subscribed to '${ topic } '` )
72
-
73
- if ( ! callback ) {
74
- return Promise . reject ( err )
75
- }
76
-
77
- return setImmediate ( ( ) => callback ( err ) )
78
- }
79
-
80
- ps . removeListener ( topic , handler )
81
-
82
- // Drop the request once we are actually done
83
- if ( ps . listenerCount ( topic ) === 0 ) {
84
- if ( ! callback ) {
85
- return new Promise ( ( resolve , reject ) => {
86
- // When the response stream has ended, resolve the promise
87
- eos ( subscriptions [ topic ] . res , ( err ) => {
88
- // FIXME: Artificial timeout needed to ensure unsubscribed
89
- setTimeout ( ( ) => {
90
- if ( err ) return reject ( err )
91
- resolve ( )
92
- } )
93
- } )
94
- subscriptions [ topic ] . req . abort ( )
95
- subscriptions [ topic ] = null
96
- } )
97
- }
98
-
99
- // When the response stream has ended, call the callback
100
- eos ( subscriptions [ topic ] . res , ( err ) => {
101
- // FIXME: Artificial timeout needed to ensure unsubscribed
102
- setTimeout ( ( ) => callback ( err ) )
103
- } )
104
- subscriptions [ topic ] . req . abort ( )
105
- subscriptions [ topic ] = null
106
- return
107
- }
108
-
109
- if ( ! callback ) {
110
- return Promise . resolve ( )
111
- }
112
-
113
- setImmediate ( ( ) => callback ( ) )
123
+ _internalUnsubscribe ( { topic :topic , handler :handler , callback :callback } ) ;
124
+ } ,
125
+ unsubscribeAll : ( topic , callback ) => {
126
+ _internalUnsubscribe ( { topic : topic , callback : callback } ) ;
114
127
} ,
115
128
publish : promisify ( ( topic , data , callback ) => {
116
129
if ( ! isNode ) {
0 commit comments