1
+ import EventEmitter from 'events' ;
2
+
1
3
import Redis from 'ioredis' ;
2
- import { RateLimiterConfig } from '../@types/rateLimit' ;
4
+
5
+ import { RateLimiter , RateLimiterConfig , RateLimiterResponse } from '../@types/rateLimit' ;
3
6
import TokenBucket from '../rateLimiters/tokenBucket' ;
4
7
import SlidingWindowCounter from '../rateLimiters/slidingWindowCounter' ;
5
8
import SlidingWindowLog from '../rateLimiters/slidingWindowLog' ;
@@ -9,46 +12,50 @@ import FixedWindow from '../rateLimiters/fixedWindow';
9
12
* Instatieate the rateLimiting algorithm class based on the developer selection and options
10
13
*
11
14
* @export
12
- * @param {RateLimiterConfig } rateLimiter limiter selection and option
15
+ * @param {RateLimiterConfig } rateLimiterConfig limiter selection and option
13
16
* @param {Redis } client
14
17
* @param {number } keyExpiry
15
- * @return {* }
18
+ * @return {RateLimiter }
16
19
*/
17
20
export default function setupRateLimiter (
18
- rateLimiter : RateLimiterConfig ,
21
+ rateLimiterConfig : RateLimiterConfig ,
19
22
client : Redis ,
20
23
keyExpiry : number
21
- ) {
24
+ ) : RateLimiter {
25
+ let rateLimiter : RateLimiter ;
26
+
22
27
try {
23
- switch ( rateLimiter . type ) {
28
+ switch ( rateLimiterConfig . type ) {
24
29
case 'TOKEN_BUCKET' :
25
- return new TokenBucket (
26
- rateLimiter . capacity ,
27
- rateLimiter . refillRate ,
30
+ rateLimiter = new TokenBucket (
31
+ rateLimiterConfig . capacity ,
32
+ rateLimiterConfig . refillRate ,
28
33
client ,
29
34
keyExpiry
30
35
) ;
31
36
break ;
32
37
case 'LEAKY_BUCKET' :
33
38
throw new Error ( 'Leaky Bucket algonithm has not be implemented.' ) ;
34
39
case 'FIXED_WINDOW' :
35
- return new FixedWindow (
36
- rateLimiter . capacity ,
37
- rateLimiter . windowSize ,
40
+ rateLimiter = new FixedWindow (
41
+ rateLimiterConfig . capacity ,
42
+ rateLimiterConfig . windowSize ,
38
43
client ,
39
44
keyExpiry
40
45
) ;
46
+ break ;
41
47
case 'SLIDING_WINDOW_LOG' :
42
- return new SlidingWindowLog (
43
- rateLimiter . windowSize ,
44
- rateLimiter . capacity ,
48
+ rateLimiter = new SlidingWindowLog (
49
+ rateLimiterConfig . windowSize ,
50
+ rateLimiterConfig . capacity ,
45
51
client ,
46
52
keyExpiry
47
53
) ;
54
+ break ;
48
55
case 'SLIDING_WINDOW_COUNTER' :
49
- return new SlidingWindowCounter (
50
- rateLimiter . windowSize ,
51
- rateLimiter . capacity ,
56
+ rateLimiter = new SlidingWindowCounter (
57
+ rateLimiterConfig . windowSize ,
58
+ rateLimiterConfig . capacity ,
52
59
client ,
53
60
keyExpiry
54
61
) ;
@@ -57,6 +64,78 @@ export default function setupRateLimiter(
57
64
// typescript should never let us invoke this function with anything other than the options above
58
65
throw new Error ( 'Selected rate limiting algorithm is not suppported' ) ;
59
66
}
67
+
68
+ const processRequest = rateLimiter . processRequest . bind ( rateLimiter ) ;
69
+
70
+ /**
71
+ * We are using a queue and event emitter to handle situations where a user has two concurrent requests being processed.
72
+ * The trailing request will be added to the queue to and await the prior request processing by the rate-limiter
73
+ * This will maintain the consistency and accuracy of the cache when under load from one user
74
+ */
75
+ // stores request IDs for each user in an array to be processed
76
+ const requestQueues : { [ index : string ] : string [ ] } = { } ;
77
+ // Manages processing of requests queue
78
+ const requestEvents = new EventEmitter ( ) ;
79
+
80
+ // processes requests (by resolving promises) that have been throttled by throttledProcess
81
+ // eslint-disable-next-line no-inner-declarations
82
+ async function processRequestResolver (
83
+ userId : string ,
84
+ timestamp : number ,
85
+ tokens : number ,
86
+ resolve : ( value : RateLimiterResponse | PromiseLike < RateLimiterResponse > ) => void ,
87
+ reject : ( reason : unknown ) => void
88
+ ) {
89
+ try {
90
+ const response = await processRequest ( userId , timestamp , tokens ) ;
91
+ requestQueues [ userId ] = requestQueues [ userId ] . slice ( 1 ) ;
92
+ resolve ( response ) ;
93
+ // trigger the next event and delete the request queue for this user if there are no more requests to process
94
+ requestEvents . emit ( requestQueues [ userId ] [ 0 ] ) ;
95
+ if ( requestQueues [ userId ] . length === 0 ) delete requestQueues [ userId ] ;
96
+ } catch ( err ) {
97
+ reject ( err ) ;
98
+ }
99
+ }
100
+
101
+ /**
102
+ * Throttle rateLimiter.processRequest based on user IP to prevent inaccurate redis reads
103
+ * Throttling is based on a event driven promise fulfillment approach.
104
+ * Each time a request is received a promise is added to the user's request queue. The promise "subscribes"
105
+ * to the previous request in the user's queue then calls processRequest and resolves once the previous request
106
+ * is complete.
107
+ * @param userId
108
+ * @param timestamp
109
+ * @param tokens
110
+ * @returns
111
+ */
112
+ // eslint-disable-next-line no-inner-declarations
113
+ async function throttledProcess (
114
+ userId : string ,
115
+ timestamp : number ,
116
+ tokens = 1
117
+ ) : Promise < RateLimiterResponse > {
118
+ // Alternatively use crypto.randomUUID() to generate a random uuid
119
+ const requestId = `${ timestamp } ${ tokens } ` ;
120
+
121
+ if ( ! requestQueues [ userId ] ) {
122
+ requestQueues [ userId ] = [ ] ;
123
+ }
124
+ requestQueues [ userId ] . push ( requestId ) ;
125
+
126
+ return new Promise ( ( resolve , reject ) => {
127
+ if ( requestQueues [ userId ] . length > 1 ) {
128
+ requestEvents . once ( requestId , async ( ) => {
129
+ await processRequestResolver ( userId , timestamp , tokens , resolve , reject ) ;
130
+ } ) ;
131
+ } else {
132
+ processRequestResolver ( userId , timestamp , tokens , resolve , reject ) ;
133
+ }
134
+ } ) ;
135
+ }
136
+
137
+ rateLimiter . processRequest = throttledProcess ;
138
+ return rateLimiter ;
60
139
} catch ( err ) {
61
140
throw new Error ( `Error in expressGraphQLRateLimiter setting up rate-limiter: ${ err } ` ) ;
62
141
}
0 commit comments