@@ -24,6 +24,88 @@ export default function setupRateLimiter(
24
24
) : RateLimiter {
25
25
let rateLimiter : RateLimiter ;
26
26
27
+ /**
28
+ * We are using a queue and event emitter to handle situations where a user has two concurrent requests being processed.
29
+ * The trailing request will be added to the queue to and await the prior request processing by the rate-limiter
30
+ * This will maintain the consistency and accuracy of the cache when under load from one user
31
+ */
32
+ // stores request IDs for each user in an array to be processed
33
+ const requestQueues : { [ index : string ] : string [ ] } = { } ;
34
+ // Manages processing of requests queue
35
+ const requestEvents = new EventEmitter ( ) ;
36
+
37
+ // processes requests (by resolving promises) that have been throttled by throttledProcess
38
+ async function processRequestResolver (
39
+ userId : string ,
40
+ timestamp : number ,
41
+ tokens : number ,
42
+ processRequest : (
43
+ userId : string ,
44
+ timestamp : number ,
45
+ tokens : number
46
+ ) => Promise < RateLimiterResponse > ,
47
+ resolve : ( value : RateLimiterResponse | PromiseLike < RateLimiterResponse > ) => void ,
48
+ reject : ( reason : unknown ) => void
49
+ ) {
50
+ try {
51
+ const response = await processRequest ( userId , timestamp , tokens ) ;
52
+ requestQueues [ userId ] = requestQueues [ userId ] . slice ( 1 ) ;
53
+ resolve ( response ) ;
54
+ // trigger the next event and delete the request queue for this user if there are no more requests to process
55
+ requestEvents . emit ( requestQueues [ userId ] [ 0 ] ) ;
56
+ if ( requestQueues [ userId ] . length === 0 ) delete requestQueues [ userId ] ;
57
+ } catch ( err ) {
58
+ reject ( err ) ;
59
+ }
60
+ }
61
+
62
+ /**
63
+ * Throttle rateLimiter.processRequest based on user IP to prevent inaccurate redis reads
64
+ * Throttling is based on a event driven promise fulfillment approach.
65
+ * Each time a request is received a promise is added to the user's request queue. The promise "subscribes"
66
+ * to the previous request in the user's queue then calls processRequest and resolves once the previous request
67
+ * is complete.
68
+ * @param userId
69
+ * @param timestamp
70
+ * @param tokens
71
+ * @returns
72
+ */
73
+ async function throttledProcess (
74
+ processRequest : (
75
+ userId : string ,
76
+ timestamp : number ,
77
+ tokens : number
78
+ ) => Promise < RateLimiterResponse > ,
79
+ userId : string ,
80
+ timestamp : number ,
81
+ tokens = 1
82
+ ) : Promise < RateLimiterResponse > {
83
+ // Alternatively use crypto.randomUUID() to generate a random uuid
84
+ const requestId = `${ timestamp } ${ tokens } ` ;
85
+
86
+ if ( ! requestQueues [ userId ] ) {
87
+ requestQueues [ userId ] = [ ] ;
88
+ }
89
+ requestQueues [ userId ] . push ( requestId ) ;
90
+
91
+ return new Promise ( ( resolve , reject ) => {
92
+ if ( requestQueues [ userId ] . length > 1 ) {
93
+ requestEvents . once ( requestId , async ( ) => {
94
+ processRequestResolver (
95
+ userId ,
96
+ timestamp ,
97
+ tokens ,
98
+ processRequest ,
99
+ resolve ,
100
+ reject
101
+ ) ;
102
+ } ) ;
103
+ } else {
104
+ processRequestResolver ( userId , timestamp , tokens , processRequest , resolve , reject ) ;
105
+ }
106
+ } ) ;
107
+ }
108
+
27
109
try {
28
110
switch ( rateLimiterConfig . type ) {
29
111
case 'TOKEN_BUCKET' :
@@ -65,38 +147,7 @@ export default function setupRateLimiter(
65
147
throw new Error ( 'Selected rate limiting algorithm is not suppported' ) ;
66
148
}
67
149
68
- const processRequest = rateLimiter . processRequest . bind ( rateLimiter ) ;
69
-
70
- /**
71
- * We are using a queue and event emitter to handle situations where a user has two concurrent requests being processed.
72
- * The trailing request will be added to the queue to and await the prior request processing by the rate-limiter
73
- * This will maintain the consistency and accuracy of the cache when under load from one user
74
- */
75
- // stores request IDs for each user in an array to be processed
76
- const requestQueues : { [ index : string ] : string [ ] } = { } ;
77
- // Manages processing of requests queue
78
- const requestEvents = new EventEmitter ( ) ;
79
-
80
- // processes requests (by resolving promises) that have been throttled by throttledProcess
81
- // eslint-disable-next-line no-inner-declarations
82
- async function processRequestResolver (
83
- userId : string ,
84
- timestamp : number ,
85
- tokens : number ,
86
- resolve : ( value : RateLimiterResponse | PromiseLike < RateLimiterResponse > ) => void ,
87
- reject : ( reason : unknown ) => void
88
- ) {
89
- try {
90
- const response = await processRequest ( userId , timestamp , tokens ) ;
91
- requestQueues [ userId ] = requestQueues [ userId ] . slice ( 1 ) ;
92
- resolve ( response ) ;
93
- // trigger the next event and delete the request queue for this user if there are no more requests to process
94
- requestEvents . emit ( requestQueues [ userId ] [ 0 ] ) ;
95
- if ( requestQueues [ userId ] . length === 0 ) delete requestQueues [ userId ] ;
96
- } catch ( err ) {
97
- reject ( err ) ;
98
- }
99
- }
150
+ const boundProcessRequest = rateLimiter . processRequest . bind ( rateLimiter ) ;
100
151
101
152
/**
102
153
* Throttle rateLimiter.processRequest based on user IP to prevent inaccurate redis reads
@@ -109,32 +160,13 @@ export default function setupRateLimiter(
109
160
* @param tokens
110
161
* @returns
111
162
*/
112
- // eslint-disable-next-line no-inner-declarations
113
- async function throttledProcess (
163
+ rateLimiter . processRequest = async (
114
164
userId : string ,
115
165
timestamp : number ,
116
166
tokens = 1
117
- ) : Promise < RateLimiterResponse > {
118
- // Alternatively use crypto.randomUUID() to generate a random uuid
119
- const requestId = `${ timestamp } ${ tokens } ` ;
120
-
121
- if ( ! requestQueues [ userId ] ) {
122
- requestQueues [ userId ] = [ ] ;
123
- }
124
- requestQueues [ userId ] . push ( requestId ) ;
125
-
126
- return new Promise ( ( resolve , reject ) => {
127
- if ( requestQueues [ userId ] . length > 1 ) {
128
- requestEvents . once ( requestId , async ( ) => {
129
- await processRequestResolver ( userId , timestamp , tokens , resolve , reject ) ;
130
- } ) ;
131
- } else {
132
- processRequestResolver ( userId , timestamp , tokens , resolve , reject ) ;
133
- }
134
- } ) ;
135
- }
167
+ ) : Promise < RateLimiterResponse > =>
168
+ throttledProcess ( boundProcessRequest , userId , timestamp , tokens ) ;
136
169
137
- rateLimiter . processRequest = throttledProcess ;
138
170
return rateLimiter ;
139
171
} catch ( err ) {
140
172
throw new Error ( `Error in expressGraphQLRateLimiter setting up rate-limiter: ${ err } ` ) ;
0 commit comments