From 5f34e61a9edf34b8dba2f555c8b3544742a69cba Mon Sep 17 00:00:00 2001 From: Stephan Halarewicz Date: Thu, 11 Aug 2022 23:16:04 -0400 Subject: [PATCH 1/4] throttled call to processRequest during rate limiter setup rather than express middleware --- src/middleware/index.ts | 69 +---------------- src/middleware/rateLimiterSetup.ts | 115 ++++++++++++++++++++++++----- test/middleware/express.test.ts | 2 +- 3 files changed, 99 insertions(+), 87 deletions(-) diff --git a/src/middleware/index.ts b/src/middleware/index.ts index 06584c4..49a4b70 100644 --- a/src/middleware/index.ts +++ b/src/middleware/index.ts @@ -1,11 +1,9 @@ -import EventEmitter from 'events'; import { parse, validate } from 'graphql'; import { GraphQLSchema } from 'graphql/type/schema'; import { Request, Response, NextFunction, RequestHandler } from 'express'; import buildTypeWeightsFromSchema, { defaultTypeWeightsConfig } from '../analysis/buildTypeWeights'; import setupRateLimiter from './rateLimiterSetup'; import { ExpressMiddlewareConfig, ExpressMiddlewareSet } from '../@types/expressMiddleware'; -import { RateLimiterResponse } from '../@types/rateLimit'; import { connect } from '../utils/redis'; import ASTParser from '../analysis/ASTParser'; @@ -66,71 +64,6 @@ export default function expressGraphQLRateLimiter( middlewareSetup.redis.keyExpiry ); - /** - * We are using a queue and event emitter to handle situations where a user has two concurrent requests being processed. - * The trailing request will be added to the queue to and await the prior request processing by the rate-limiter - * This will maintain the consistency and accuracy of the cache when under load from one user - */ - // stores request IDs for each user in an array to be processed - const requestQueues: { [index: string]: string[] } = {}; - // Manages processing of requests queue - const requestEvents = new EventEmitter(); - - // processes requests (by resolving promises) that have been throttled by throttledProcess - async function processRequestResolver( - userId: string, - timestamp: number, - tokens: number, - resolve: (value: RateLimiterResponse | PromiseLike) => void, - reject: (reason: any) => void - ) { - try { - const response = await rateLimiter.processRequest(userId, timestamp, tokens); - requestQueues[userId] = requestQueues[userId].slice(1); - resolve(response); - // trigger the next event and delete the request queue for this user if there are no more requests to process - requestEvents.emit(requestQueues[userId][0]); - if (requestQueues[userId].length === 0) delete requestQueues[userId]; - } catch (err) { - reject(err); - } - } - - /** - * Throttle rateLimiter.processRequest based on user IP to prevent inaccurate redis reads - * Throttling is based on a event driven promise fulfillment approach. - * Each time a request is received a promise is added to the user's request queue. The promise "subscribes" - * to the previous request in the user's queue then calls processRequest and resolves once the previous request - * is complete. - * @param userId - * @param timestamp - * @param tokens - * @returns - */ - async function throttledProcess( - userId: string, - timestamp: number, - tokens: number - ): Promise { - // Alternatively use crypto.randomUUID() to generate a random uuid - const requestId = `${timestamp}${tokens}`; - - if (!requestQueues[userId]) { - requestQueues[userId] = []; - } - requestQueues[userId].push(requestId); - - return new Promise((resolve, reject) => { - if (requestQueues[userId].length > 1) { - requestEvents.once(requestId, async () => { - await processRequestResolver(userId, timestamp, tokens, resolve, reject); - }); - } else { - processRequestResolver(userId, timestamp, tokens, resolve, reject); - } - }); - } - /** Rate-limiting middleware */ return async ( req: Request, @@ -169,7 +102,7 @@ export default function expressGraphQLRateLimiter( const queryComplexity = queryParser.processQuery(queryAST); try { - const rateLimiterResponse = await throttledProcess( + const rateLimiterResponse = await rateLimiter.processRequest( ip, requestTimestamp, queryComplexity diff --git a/src/middleware/rateLimiterSetup.ts b/src/middleware/rateLimiterSetup.ts index 26166f8..a245a4e 100644 --- a/src/middleware/rateLimiterSetup.ts +++ b/src/middleware/rateLimiterSetup.ts @@ -1,5 +1,8 @@ +import EventEmitter from 'events'; + import Redis from 'ioredis'; -import { RateLimiterConfig } from '../@types/rateLimit'; + +import { RateLimiter, RateLimiterConfig, RateLimiterResponse } from '../@types/rateLimit'; import TokenBucket from '../rateLimiters/tokenBucket'; import SlidingWindowCounter from '../rateLimiters/slidingWindowCounter'; import SlidingWindowLog from '../rateLimiters/slidingWindowLog'; @@ -9,22 +12,24 @@ import FixedWindow from '../rateLimiters/fixedWindow'; * Instatieate the rateLimiting algorithm class based on the developer selection and options * * @export - * @param {RateLimiterConfig} rateLimiter limiter selection and option + * @param {RateLimiterConfig} rateLimiterConfig limiter selection and option * @param {Redis} client * @param {number} keyExpiry - * @return {*} + * @return {RateLimiter} */ export default function setupRateLimiter( - rateLimiter: RateLimiterConfig, + rateLimiterConfig: RateLimiterConfig, client: Redis, keyExpiry: number -) { +): RateLimiter { + let rateLimiter: RateLimiter; + try { - switch (rateLimiter.type) { + switch (rateLimiterConfig.type) { case 'TOKEN_BUCKET': - return new TokenBucket( - rateLimiter.capacity, - rateLimiter.refillRate, + rateLimiter = new TokenBucket( + rateLimiterConfig.capacity, + rateLimiterConfig.refillRate, client, keyExpiry ); @@ -32,23 +37,25 @@ export default function setupRateLimiter( case 'LEAKY_BUCKET': throw new Error('Leaky Bucket algonithm has not be implemented.'); case 'FIXED_WINDOW': - return new FixedWindow( - rateLimiter.capacity, - rateLimiter.windowSize, + rateLimiter = new FixedWindow( + rateLimiterConfig.capacity, + rateLimiterConfig.windowSize, client, keyExpiry ); + break; case 'SLIDING_WINDOW_LOG': - return new SlidingWindowLog( - rateLimiter.windowSize, - rateLimiter.capacity, + rateLimiter = new SlidingWindowLog( + rateLimiterConfig.windowSize, + rateLimiterConfig.capacity, client, keyExpiry ); + break; case 'SLIDING_WINDOW_COUNTER': - return new SlidingWindowCounter( - rateLimiter.windowSize, - rateLimiter.capacity, + rateLimiter = new SlidingWindowCounter( + rateLimiterConfig.windowSize, + rateLimiterConfig.capacity, client, keyExpiry ); @@ -57,6 +64,78 @@ export default function setupRateLimiter( // typescript should never let us invoke this function with anything other than the options above throw new Error('Selected rate limiting algorithm is not suppported'); } + + const processRequest = rateLimiter.processRequest.bind(rateLimiter); + + /** + * We are using a queue and event emitter to handle situations where a user has two concurrent requests being processed. + * The trailing request will be added to the queue to and await the prior request processing by the rate-limiter + * This will maintain the consistency and accuracy of the cache when under load from one user + */ + // stores request IDs for each user in an array to be processed + const requestQueues: { [index: string]: string[] } = {}; + // Manages processing of requests queue + const requestEvents = new EventEmitter(); + + // processes requests (by resolving promises) that have been throttled by throttledProcess + // eslint-disable-next-line no-inner-declarations + async function processRequestResolver( + userId: string, + timestamp: number, + tokens: number, + resolve: (value: RateLimiterResponse | PromiseLike) => void, + reject: (reason: unknown) => void + ) { + try { + const response = await processRequest(userId, timestamp, tokens); + requestQueues[userId] = requestQueues[userId].slice(1); + resolve(response); + // trigger the next event and delete the request queue for this user if there are no more requests to process + requestEvents.emit(requestQueues[userId][0]); + if (requestQueues[userId].length === 0) delete requestQueues[userId]; + } catch (err) { + reject(err); + } + } + + /** + * Throttle rateLimiter.processRequest based on user IP to prevent inaccurate redis reads + * Throttling is based on a event driven promise fulfillment approach. + * Each time a request is received a promise is added to the user's request queue. The promise "subscribes" + * to the previous request in the user's queue then calls processRequest and resolves once the previous request + * is complete. + * @param userId + * @param timestamp + * @param tokens + * @returns + */ + // eslint-disable-next-line no-inner-declarations + async function throttledProcess( + userId: string, + timestamp: number, + tokens = 1 + ): Promise { + // Alternatively use crypto.randomUUID() to generate a random uuid + const requestId = `${timestamp}${tokens}`; + + if (!requestQueues[userId]) { + requestQueues[userId] = []; + } + requestQueues[userId].push(requestId); + + return new Promise((resolve, reject) => { + if (requestQueues[userId].length > 1) { + requestEvents.once(requestId, async () => { + await processRequestResolver(userId, timestamp, tokens, resolve, reject); + }); + } else { + processRequestResolver(userId, timestamp, tokens, resolve, reject); + } + }); + } + + rateLimiter.processRequest = throttledProcess; + return rateLimiter; } catch (err) { throw new Error(`Error in expressGraphQLRateLimiter setting up rate-limiter: ${err}`); } diff --git a/test/middleware/express.test.ts b/test/middleware/express.test.ts index 6518d2a..54e5806 100644 --- a/test/middleware/express.test.ts +++ b/test/middleware/express.test.ts @@ -375,7 +375,7 @@ describe('Express Middleware tests', () => { describe('Adds expected properties to res.locals', () => { test('Adds UNIX timestamp', async () => { - jest.useRealTimers(); + // jest.useRealTimers(); await middleware(mockRequest as Request, mockResponse as Response, nextFunction); jest.useFakeTimers(); From 91a5672ff8501ff312c51e4ea079bcf4beab1f17 Mon Sep 17 00:00:00 2001 From: Stephan Halarewicz Date: Wed, 17 Aug 2022 22:33:13 -0400 Subject: [PATCH 2/4] refactored throttling functions during rateLimiter setup --- src/middleware/rateLimiterSetup.ts | 140 ++++++++++++++++++----------- 1 file changed, 86 insertions(+), 54 deletions(-) diff --git a/src/middleware/rateLimiterSetup.ts b/src/middleware/rateLimiterSetup.ts index a245a4e..576906e 100644 --- a/src/middleware/rateLimiterSetup.ts +++ b/src/middleware/rateLimiterSetup.ts @@ -24,6 +24,88 @@ export default function setupRateLimiter( ): RateLimiter { let rateLimiter: RateLimiter; + /** + * We are using a queue and event emitter to handle situations where a user has two concurrent requests being processed. + * The trailing request will be added to the queue to and await the prior request processing by the rate-limiter + * This will maintain the consistency and accuracy of the cache when under load from one user + */ + // stores request IDs for each user in an array to be processed + const requestQueues: { [index: string]: string[] } = {}; + // Manages processing of requests queue + const requestEvents = new EventEmitter(); + + // processes requests (by resolving promises) that have been throttled by throttledProcess + async function processRequestResolver( + userId: string, + timestamp: number, + tokens: number, + processRequest: ( + userId: string, + timestamp: number, + tokens: number + ) => Promise, + resolve: (value: RateLimiterResponse | PromiseLike) => void, + reject: (reason: unknown) => void + ) { + try { + const response = await processRequest(userId, timestamp, tokens); + requestQueues[userId] = requestQueues[userId].slice(1); + resolve(response); + // trigger the next event and delete the request queue for this user if there are no more requests to process + requestEvents.emit(requestQueues[userId][0]); + if (requestQueues[userId].length === 0) delete requestQueues[userId]; + } catch (err) { + reject(err); + } + } + + /** + * Throttle rateLimiter.processRequest based on user IP to prevent inaccurate redis reads + * Throttling is based on a event driven promise fulfillment approach. + * Each time a request is received a promise is added to the user's request queue. The promise "subscribes" + * to the previous request in the user's queue then calls processRequest and resolves once the previous request + * is complete. + * @param userId + * @param timestamp + * @param tokens + * @returns + */ + async function throttledProcess( + processRequest: ( + userId: string, + timestamp: number, + tokens: number + ) => Promise, + userId: string, + timestamp: number, + tokens = 1 + ): Promise { + // Alternatively use crypto.randomUUID() to generate a random uuid + const requestId = `${timestamp}${tokens}`; + + if (!requestQueues[userId]) { + requestQueues[userId] = []; + } + requestQueues[userId].push(requestId); + + return new Promise((resolve, reject) => { + if (requestQueues[userId].length > 1) { + requestEvents.once(requestId, async () => { + processRequestResolver( + userId, + timestamp, + tokens, + processRequest, + resolve, + reject + ); + }); + } else { + processRequestResolver(userId, timestamp, tokens, processRequest, resolve, reject); + } + }); + } + try { switch (rateLimiterConfig.type) { case 'TOKEN_BUCKET': @@ -65,38 +147,7 @@ export default function setupRateLimiter( throw new Error('Selected rate limiting algorithm is not suppported'); } - const processRequest = rateLimiter.processRequest.bind(rateLimiter); - - /** - * We are using a queue and event emitter to handle situations where a user has two concurrent requests being processed. - * The trailing request will be added to the queue to and await the prior request processing by the rate-limiter - * This will maintain the consistency and accuracy of the cache when under load from one user - */ - // stores request IDs for each user in an array to be processed - const requestQueues: { [index: string]: string[] } = {}; - // Manages processing of requests queue - const requestEvents = new EventEmitter(); - - // processes requests (by resolving promises) that have been throttled by throttledProcess - // eslint-disable-next-line no-inner-declarations - async function processRequestResolver( - userId: string, - timestamp: number, - tokens: number, - resolve: (value: RateLimiterResponse | PromiseLike) => void, - reject: (reason: unknown) => void - ) { - try { - const response = await processRequest(userId, timestamp, tokens); - requestQueues[userId] = requestQueues[userId].slice(1); - resolve(response); - // trigger the next event and delete the request queue for this user if there are no more requests to process - requestEvents.emit(requestQueues[userId][0]); - if (requestQueues[userId].length === 0) delete requestQueues[userId]; - } catch (err) { - reject(err); - } - } + const boundProcessRequest = rateLimiter.processRequest.bind(rateLimiter); /** * Throttle rateLimiter.processRequest based on user IP to prevent inaccurate redis reads @@ -109,32 +160,13 @@ export default function setupRateLimiter( * @param tokens * @returns */ - // eslint-disable-next-line no-inner-declarations - async function throttledProcess( + rateLimiter.processRequest = async ( userId: string, timestamp: number, tokens = 1 - ): Promise { - // Alternatively use crypto.randomUUID() to generate a random uuid - const requestId = `${timestamp}${tokens}`; - - if (!requestQueues[userId]) { - requestQueues[userId] = []; - } - requestQueues[userId].push(requestId); - - return new Promise((resolve, reject) => { - if (requestQueues[userId].length > 1) { - requestEvents.once(requestId, async () => { - await processRequestResolver(userId, timestamp, tokens, resolve, reject); - }); - } else { - processRequestResolver(userId, timestamp, tokens, resolve, reject); - } - }); - } + ): Promise => + throttledProcess(boundProcessRequest, userId, timestamp, tokens); - rateLimiter.processRequest = throttledProcess; return rateLimiter; } catch (err) { throw new Error(`Error in expressGraphQLRateLimiter setting up rate-limiter: ${err}`); From 200948d98cd0eaebc8c0e58b2ba3c44906efb498 Mon Sep 17 00:00:00 2001 From: Stephan Halarewicz Date: Wed, 17 Aug 2022 22:45:30 -0400 Subject: [PATCH 3/4] lint fix --- src/middleware/index.ts | 9 +++++---- src/middleware/rateLimiterSetup.ts | 11 ----------- src/rateLimiters/slidingWindowLog.ts | 2 +- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/middleware/index.ts b/src/middleware/index.ts index 49a4b70..5908f9f 100644 --- a/src/middleware/index.ts +++ b/src/middleware/index.ts @@ -69,7 +69,7 @@ export default function expressGraphQLRateLimiter( req: Request, res: Response, next: NextFunction - ): Promise>> => { + ): Promise>> => { const requestTimestamp = new Date().valueOf(); // access the query and variables passed to the server in the body or query string let query; @@ -82,8 +82,9 @@ export default function expressGraphQLRateLimiter( variables = req.body.variables; } if (!query) { + // eslint-disable-next-line no-console console.error( - 'Error in expressGraphQLRateLimiter: There is no query on the request. Rate-Limiting skipped' + '[graphql-gate] Error in expressGraphQLRateLimiter: There is no query on the request. Rate-Limiting skipped' ); return next(); } @@ -140,9 +141,9 @@ export default function expressGraphQLRateLimiter( } return next(); } catch (err) { - // log the error to the console and pass the request onto the next middleware. + // eslint-disable-next-line no-console console.error( - `Error in expressGraphQLRateLimiter processing query. Rate limiting is skipped: ${err}` + `[graphql-gate] Error in expressGraphQLRateLimiter processing query. Rate limiting is skipped: ${err}` ); return next(err); } diff --git a/src/middleware/rateLimiterSetup.ts b/src/middleware/rateLimiterSetup.ts index 576906e..03bef3c 100644 --- a/src/middleware/rateLimiterSetup.ts +++ b/src/middleware/rateLimiterSetup.ts @@ -149,17 +149,6 @@ export default function setupRateLimiter( const boundProcessRequest = rateLimiter.processRequest.bind(rateLimiter); - /** - * Throttle rateLimiter.processRequest based on user IP to prevent inaccurate redis reads - * Throttling is based on a event driven promise fulfillment approach. - * Each time a request is received a promise is added to the user's request queue. The promise "subscribes" - * to the previous request in the user's queue then calls processRequest and resolves once the previous request - * is complete. - * @param userId - * @param timestamp - * @param tokens - * @returns - */ rateLimiter.processRequest = async ( userId: string, timestamp: number, diff --git a/src/rateLimiters/slidingWindowLog.ts b/src/rateLimiters/slidingWindowLog.ts index 3407920..98020cd 100644 --- a/src/rateLimiters/slidingWindowLog.ts +++ b/src/rateLimiters/slidingWindowLog.ts @@ -1,5 +1,5 @@ import Redis from 'ioredis'; -import { RateLimiter, RateLimiterResponse, RedisBucket, RedisLog } from '../@types/rateLimit'; +import { RateLimiter, RateLimiterResponse, RedisLog } from '../@types/rateLimit'; /** * The SlidingWindowLog instance of a RateLimiter limits requests based on a unique user ID. From 7cf49b599e173ea2829bb07611d3553b020987b2 Mon Sep 17 00:00:00 2001 From: Stephan Halarewicz Date: Fri, 19 Aug 2022 16:26:40 -0400 Subject: [PATCH 4/4] removed redundant timer controlls in express test. added comments on throttled processRequest --- src/middleware/rateLimiterSetup.ts | 2 ++ test/middleware/express.test.ts | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/middleware/rateLimiterSetup.ts b/src/middleware/rateLimiterSetup.ts index 03bef3c..caea6e9 100644 --- a/src/middleware/rateLimiterSetup.ts +++ b/src/middleware/rateLimiterSetup.ts @@ -147,6 +147,8 @@ export default function setupRateLimiter( throw new Error('Selected rate limiting algorithm is not suppported'); } + // Overwrite the processRequest method with a throttled implementation to ensure async redis interactions are handled + // sequentially for each user. const boundProcessRequest = rateLimiter.processRequest.bind(rateLimiter); rateLimiter.processRequest = async ( diff --git a/test/middleware/express.test.ts b/test/middleware/express.test.ts index 54e5806..10462d1 100644 --- a/test/middleware/express.test.ts +++ b/test/middleware/express.test.ts @@ -375,9 +375,7 @@ describe('Express Middleware tests', () => { describe('Adds expected properties to res.locals', () => { test('Adds UNIX timestamp', async () => { - // jest.useRealTimers(); await middleware(mockRequest as Request, mockResponse as Response, nextFunction); - jest.useFakeTimers(); // confirm that this is timestamp +/- 5 minutes of now. const now: number = Date.now().valueOf();