From 3a7e4a70e82e4806123f066cf79b6690d7b3d0aa Mon Sep 17 00:00:00 2001 From: eisbilir Date: Thu, 3 Jun 2021 23:19:21 +0300 Subject: [PATCH 1/3] action topic and service added --- README.md | 7 ++ config/default.js | 20 +++- package-lock.json | 133 +++++++++++++++++++-- package.json | 2 + src/app.js | 9 +- src/common/errors.js | 40 +++++++ src/common/helper.js | 71 ++++++++++- src/services/ActionProcessorService.js | 83 +++++++++++++ src/services/WorkPeriodProcessorService.js | 24 +++- test/unit/test.js | 12 +- 10 files changed, 371 insertions(+), 30 deletions(-) create mode 100644 src/common/errors.js create mode 100644 src/services/ActionProcessorService.js diff --git a/README.md b/README.md index 2abb251..257b38c 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,9 @@ The following parameters can be set in config files or in env variables: - `KAFKA_CLIENT_CERT_KEY`: Kafka connection private key, optional; if not provided, then SSL connection is not used, direct insecure connection is used; if provided, it can be either path to private key file or private key content +- `KAFKA_MESSAGE_ORIGINATOR`: The originator value for the kafka messages - `KAFKA_GROUP_ID`: the Kafka group id +- `topics.KAFKA_ERROR_TOPIC`: the error topic at which bus api will publish any errors - `topics.TAAS_JOB_CREATE_TOPIC`: the create job entity Kafka message topic - `topics.TAAS_JOB_UPDATE_TOPIC`: the update job entity Kafka message topic - `topics.TAAS_JOB_DELETE_TOPIC`: the delete job entity Kafka message topic @@ -41,6 +43,10 @@ The following parameters can be set in config files or in env variables: - `topics.TAAS_ROLE_CREATE_TOPIC`: the create role entity Kafka message topic - `topics.TAAS_ROLE_UPDATE_TOPIC`: the update role entity Kafka message topic - `topics.TAAS_ROLE_DELETE_TOPIC`: the delete role entity Kafka message topic +- `topics.TAAS_ACTION_RETRY_TOPIC`: the retry process Kafka message topic +- `MAX_RETRY`: maximum allowed retry count for failed operations for sending `taas.action.retry` message +- `BASE_RETRY_DELAY`: base amount of retry delay (ms) for failed operations +- `BUSAPI_URL`: Topcoder Bus API URL - `esConfig.HOST`: Elasticsearch host - `esConfig.AWS_REGION`: The Amazon region to use when using AWS Elasticsearch service - `esConfig.ELASTICCLOUD.id`: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this @@ -56,6 +62,7 @@ The following parameters can be set in config files or in env variables: - `auth0.AUTH0_CLIENT_ID`: Auth0 client id, used to get TC M2M token - `auth0.AUTH0_CLIENT_SECRET`: Auth0 client secret, used to get TC M2M token - `auth0.AUTH0_PROXY_SERVER_URL`: Proxy Auth0 URL, used to get TC M2M token +- `auth0.TOKEN_CACHE_TIME`: Auth0 token cache time, used to get TC M2M token - `zapier.ZAPIER_COMPANYID_SLUG`: your company id in zapier; numeric value - `zapier.ZAPIER_CONTACTID_SLUG`: your contact id in zapier; numeric value diff --git a/config/default.js b/config/default.js index ffb2c16..059ecdc 100644 --- a/config/default.js +++ b/config/default.js @@ -1,7 +1,7 @@ /** * The default configuration file. */ - +require('dotenv').config() module.exports = { PORT: process.env.PORT || 3001, LOG_LEVEL: process.env.LOG_LEVEL || 'debug', @@ -14,8 +14,12 @@ module.exports = { // Kafka group id KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'taas-es-processor', + // The originator value for the kafka messages + KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'taas-es-processor', topics: { + // The error topic at which bus api will publish any errors + KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting', // topics for job service TAAS_JOB_CREATE_TOPIC: process.env.TAAS_JOB_CREATE_TOPIC || 'taas.job.create', TAAS_JOB_UPDATE_TOPIC: process.env.TAAS_JOB_UPDATE_TOPIC || 'taas.job.update', @@ -42,8 +46,17 @@ module.exports = { // topics for role service TAAS_ROLE_CREATE_TOPIC: process.env.TAAS_ROLE_CREATE_TOPIC || 'taas.role.requested', TAAS_ROLE_UPDATE_TOPIC: process.env.TAAS_ROLE_UPDATE_TOPIC || 'taas.role.update', - TAAS_ROLE_DELETE_TOPIC: process.env.TAAS_ROLE_DELETE_TOPIC || 'taas.role.delete' + TAAS_ROLE_DELETE_TOPIC: process.env.TAAS_ROLE_DELETE_TOPIC || 'taas.role.delete', + // special kafka topics + TAAS_ACTION_RETRY_TOPIC: process.env.TAAS_ACTION_RETRY_TOPIC || 'taas.action.retry' + }, + // maximum allowed retry count for failed operations for sending `action.retry` message + MAX_RETRY: process.env.MAX_RETRY || 3, + // base amount of retry delay for failed operations + BASE_RETRY_DELAY: process.env.BASE_RETRY_DELAY || 500, + // Topcoder Bus API URL + BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5', esConfig: { HOST: process.env.ES_HOST || 'http://localhost:9200', @@ -67,7 +80,8 @@ module.exports = { AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE, AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID, AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET, - AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL + AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL, + TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME }, zapier: { diff --git a/package-lock.json b/package-lock.json index b198b5c..fec285f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -271,6 +271,79 @@ "resolved": "https://registry.npmjs.org/@tootallnate/once/-/once-1.1.2.tgz", "integrity": "sha512-RbzJvlNzmRq5c3O09UipeuXno4tA1FE6ikOjxZK0tuxVv3412l64l5t1W5pj4+rJq9vpkm/kwiR07aZXnsKPxw==" }, + "@topcoder-platform/topcoder-bus-api-wrapper": { + "version": "github:topcoder-platform/tc-bus-api-wrapper#f8cbd335a0e0b4d6edd7cae859473593271fd97f", + "from": "github:topcoder-platform/tc-bus-api-wrapper", + "requires": { + "joi": "^13.4.0", + "lodash": "^4.17.15", + "superagent": "^3.8.3", + "tc-core-library-js": "github:appirio-tech/tc-core-library-js#v2.6.4" + }, + "dependencies": { + "debug": { + "version": "3.2.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz", + "integrity": "sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==", + "requires": { + "ms": "^2.1.1" + } + }, + "readable-stream": { + "version": "2.3.7", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.7.tgz", + "integrity": "sha512-Ebho8K4jIbHAxnuxi7o42OrZgF/ZTNcsZj6nRKyUmkhLFq8CHItp/fy6hQZuZmP/n3yZ9VBUbp4zz/mX8hmYPw==", + "requires": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.3", + "isarray": "~1.0.0", + "process-nextick-args": "~2.0.0", + "safe-buffer": "~5.1.1", + "string_decoder": "~1.1.1", + "util-deprecate": "~1.0.1" + } + }, + "string_decoder": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", + "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", + "requires": { + "safe-buffer": "~5.1.0" + } + }, + "superagent": { + "version": "3.8.3", + "resolved": "https://registry.npmjs.org/superagent/-/superagent-3.8.3.tgz", + "integrity": "sha512-GLQtLMCoEIK4eDv6OGtkOoSMt3D+oq0y3dsxMuYuDvaNUvuT8eFBuLmfR0iYYzHC1e8hpzC6ZsxbuP6DIalMFA==", + "requires": { + "component-emitter": "^1.2.0", + "cookiejar": "^2.1.0", + "debug": "^3.1.0", + "extend": "^3.0.0", + "form-data": "^2.3.1", + "formidable": "^1.2.0", + "methods": "^1.1.1", + "mime": "^1.4.1", + "qs": "^6.5.1", + "readable-stream": "^2.3.5" + } + }, + "tc-core-library-js": { + "version": "github:appirio-tech/tc-core-library-js#df0b36c51cf80918194cbff777214b3c0cf5a151", + "from": "github:appirio-tech/tc-core-library-js#v2.6.4", + "requires": { + "axios": "^0.19.0", + "bunyan": "^1.8.12", + "jsonwebtoken": "^8.5.1", + "jwks-rsa": "^1.6.0", + "lodash": "^4.17.15", + "millisecond": "^0.1.2", + "r7insight_node": "^1.8.4", + "request": "^2.88.0" + } + } + } + }, "@types/bluebird": { "version": "3.5.0", "resolved": "https://registry.npm.taobao.org/@types/bluebird/download/@types/bluebird-3.5.0.tgz", @@ -1133,8 +1206,7 @@ "component-emitter": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.0.tgz", - "integrity": "sha512-Rd3se6QB+sO1TwqZjscQrurpEPIfO0/yYnSin6Q/rD3mOutHvUrCAhJub3r90uNb+SESBuE0QYoB90YdfatsRg==", - "dev": true + "integrity": "sha512-Rd3se6QB+sO1TwqZjscQrurpEPIfO0/yYnSin6Q/rD3mOutHvUrCAhJub3r90uNb+SESBuE0QYoB90YdfatsRg==" }, "concat-map": { "version": "0.0.1", @@ -1238,8 +1310,7 @@ "cookiejar": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/cookiejar/-/cookiejar-2.1.2.tgz", - "integrity": "sha512-Mw+adcfzPxcPeI+0WlvRrr/3lGVO0bD75SxX6811cxSh1Wbxx7xZBGK1eVtDf6si8rg2lhnUjsVLMFMfbRIuwA==", - "dev": true + "integrity": "sha512-Mw+adcfzPxcPeI+0WlvRrr/3lGVO0bD75SxX6811cxSh1Wbxx7xZBGK1eVtDf6si8rg2lhnUjsVLMFMfbRIuwA==" }, "core-js": { "version": "2.6.12", @@ -1485,6 +1556,11 @@ "is-obj": "^2.0.0" } }, + "dotenv": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-10.0.0.tgz", + "integrity": "sha512-rlBi9d8jpv9Sf1klPjNfFAuWDjKLwTIJJ/VxtoTwIR6hnZxcEOQCZg2oIL3MWBYw5GpUDKOEnND7LXTbIpQ03Q==" + }, "dtrace-provider": { "version": "0.8.8", "resolved": "https://registry.npmjs.org/dtrace-provider/-/dtrace-provider-0.8.8.tgz", @@ -2310,7 +2386,6 @@ "version": "2.5.0", "resolved": "https://registry.npmjs.org/form-data/-/form-data-2.5.0.tgz", "integrity": "sha512-WXieX3G/8side6VIqx44ablyULoGruSde5PNTxoUyo5CeyAMX6nVWUd0rgist/EuX655cjhUhTo1Fo3tRYqbcA==", - "dev": true, "requires": { "asynckit": "^0.4.0", "combined-stream": "^1.0.6", @@ -2320,8 +2395,7 @@ "formidable": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/formidable/-/formidable-1.2.1.tgz", - "integrity": "sha512-Fs9VRguL0gqGHkXS5GQiMCr1VhZBxz0JnJs4JmMp/2jL18Fmbzvv7vOFRU+U8TBkHEE/CX1qDXzJplVULgsLeg==", - "dev": true + "integrity": "sha512-Fs9VRguL0gqGHkXS5GQiMCr1VhZBxz0JnJs4JmMp/2jL18Fmbzvv7vOFRU+U8TBkHEE/CX1qDXzJplVULgsLeg==" }, "forwarded": { "version": "0.1.2", @@ -2570,6 +2644,11 @@ "integrity": "sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==", "dev": true }, + "hoek": { + "version": "5.0.4", + "resolved": "https://registry.npmjs.org/hoek/-/hoek-5.0.4.tgz", + "integrity": "sha512-Alr4ZQgoMlnere5FZJsIyfIjORBqZll5POhDsF4q64dPuJR6rNxXdDxtHSQq8OXRurhmx+PWYEE8bXRROY8h0w==" + }, "hosted-git-info": { "version": "2.7.1", "resolved": "https://registry.npmjs.org/hosted-git-info/-/hosted-git-info-2.7.1.tgz", @@ -2931,6 +3010,21 @@ "resolved": "https://registry.npm.taobao.org/isarray/download/isarray-1.0.0.tgz", "integrity": "sha1-u5NdSFgsuhaMBoNJV6VKPgcSTxE=" }, + "isemail": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/isemail/-/isemail-3.2.0.tgz", + "integrity": "sha512-zKqkK+O+dGqevc93KNsbZ/TqTUFd46MwWjYOoMrjIMZ51eU7DtQG3Wmd9SQQT7i7RVnuTPEiYEWHU3MSbxC1Tg==", + "requires": { + "punycode": "2.x.x" + }, + "dependencies": { + "punycode": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.1.1.tgz", + "integrity": "sha512-XRsRjdf+j5ml+y/6GKHPZbrF/8p2Yga0JPtdqTIY2Xe5ohJPD9saDJJLPvp9+NSBprVvevdXZybnj2cv8OEd0A==" + } + } + }, "isexe": { "version": "2.0.0", "resolved": "https://registry.npm.taobao.org/isexe/download/isexe-2.0.0.tgz", @@ -3046,6 +3140,16 @@ "resolved": "https://registry.npm.taobao.org/jmespath/download/jmespath-0.15.0.tgz", "integrity": "sha1-o/Iiqarp+Wb10nx5ZRDigJF2Qhc=" }, + "joi": { + "version": "13.7.0", + "resolved": "https://registry.npmjs.org/joi/-/joi-13.7.0.tgz", + "integrity": "sha512-xuY5VkHfeOYK3Hdi91ulocfuFopwgbSORmIwzcwHKESQhC7w1kD5jaVSPnqDxS2I8t3RZ9omCKAxNwXN5zG1/Q==", + "requires": { + "hoek": "5.x.x", + "isemail": "3.x.x", + "topo": "3.x.x" + } + }, "js-tokens": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz", @@ -5697,6 +5801,21 @@ "express": "^4.16.3" } }, + "topo": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/topo/-/topo-3.0.3.tgz", + "integrity": "sha512-IgpPtvD4kjrJ7CRA3ov2FhWQADwv+Tdqbsf1ZnPUSAtCJ9e1Z44MmoSGDXGk4IppoZA7jd/QRkNddlLJWlUZsQ==", + "requires": { + "hoek": "6.x.x" + }, + "dependencies": { + "hoek": { + "version": "6.1.3", + "resolved": "https://registry.npmjs.org/hoek/-/hoek-6.1.3.tgz", + "integrity": "sha512-YXXAAhmF9zpQbC7LEcREFtXfGq5K1fmd+4PHkBq8NUqmzW3G+Dq10bI/i0KucLRwss3YYFQ0fSfoxBZYiGUqtQ==" + } + } + }, "touch": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/touch/-/touch-3.1.0.tgz", diff --git a/package.json b/package.json index 7bd0c50..77228e9 100644 --- a/package.json +++ b/package.json @@ -34,10 +34,12 @@ "dependencies": { "@elastic/elasticsearch": "^7.9.1", "@hapi/joi": "^15.1.0", + "@topcoder-platform/topcoder-bus-api-wrapper": "github:topcoder-platform/tc-bus-api-wrapper", "async-mutex": "^0.2.4", "aws-sdk": "^2.476.0", "bluebird": "^3.5.5", "config": "^3.1.0", + "dotenv": "^10.0.0", "get-parameter-names": "^0.3.0", "lodash": "^4.17.20", "no-kafka": "^3.4.3", diff --git a/src/app.js b/src/app.js index 5b403b0..bc6bb2a 100644 --- a/src/app.js +++ b/src/app.js @@ -16,6 +16,7 @@ const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorServic const InterviewProcessorService = require('./services/InterviewProcessorService') const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService') const RoleProcessorService = require('./services/RoleProcessorService') +const ActionProcessorService = require('./services/ActionProcessorService') const Mutex = require('async-mutex').Mutex const events = require('events') @@ -23,7 +24,6 @@ const eventEmitter = new events.EventEmitter() // healthcheck listening port process.env.PORT = config.PORT - const localLogger = { info: (message) => logger.info({ component: 'app', message }), debug: (message) => logger.debug({ component: 'app', message }), @@ -57,7 +57,9 @@ const topicServiceMapping = { // role [config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate, [config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate, - [config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete + [config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete, + // action + [config.topics.TAAS_ACTION_RETRY_TOPIC]: ActionProcessorService.processRetry } // Start kafka consumer @@ -179,5 +181,6 @@ if (!module.parent) { module.exports = { initConsumer, - eventEmitter + eventEmitter, + topicServiceMapping } diff --git a/src/common/errors.js b/src/common/errors.js new file mode 100644 index 0000000..1fee2b6 --- /dev/null +++ b/src/common/errors.js @@ -0,0 +1,40 @@ +/** + * This file defines application errors + */ +const util = require('util') + +/** + * Helper function to create generic error object with http status code + * @param {String} name the error name + * @param {Number} statusCode the http status code + * @returns {Function} the error constructor + * @private + */ +function createError (name, statusCode) { + /** + * The error constructor + * @param {String} message the error message + * @param {String} [cause] the error cause + * @constructor + */ + function ErrorCtor (message, cause) { + Error.call(this) + Error.captureStackTrace(this) + this.message = message || name + this.cause = cause + this.httpStatus = statusCode + } + + util.inherits(ErrorCtor, Error) + ErrorCtor.prototype.name = name + return ErrorCtor +} + +module.exports = { + BadRequestError: createError('BadRequestError', 400), + UnauthorizedError: createError('UnauthorizedError', 401), + ForbiddenError: createError('ForbiddenError', 403), + NotFoundError: createError('NotFoundError', 404), + ConflictError: createError('ConflictError', 409), + InternalServerError: createError('InternalServerError', 500) +} diff --git a/src/common/helper.js b/src/common/helper.js index 281b335..9530d99 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -6,10 +6,13 @@ const AWS = require('aws-sdk') const config = require('config') const request = require('superagent') const logger = require('./logger') +const errors = require('./errors') const elasticsearch = require('@elastic/elasticsearch') const _ = require('lodash') const { Mutex } = require('async-mutex') const m2mAuth = require('tc-core-library-js').auth.m2m +const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper') +const ActionProcessorService = require('../services/ActionProcessorService') AWS.config.region = config.esConfig.AWS_REGION @@ -91,7 +94,7 @@ function getESClient () { await esClient.create(data) } catch (err) { if (err.statusCode === 409) { - throw new Error(`id: ${data.id} "${data.index}" already exists`) + throw new errors.ConflictError(`id: ${data.id} "${data.index}" already exists`) } throw err } @@ -103,7 +106,7 @@ function getESClient () { await esClient.update(data) } catch (err) { if (err.statusCode === 404) { - throw new Error(`id: ${data.id} "${data.index}" not found`) + throw new errors.NotFoundError(`id: ${data.id} "${data.index}" not found`) } throw err } @@ -117,7 +120,7 @@ function getESClient () { doc = await esClient.getSource(data) } catch (err) { if (err.statusCode === 404) { - throw new Error(`id: ${data.id} "${data.index}" not found`) + throw new errors.NotFoundError(`id: ${data.id} "${data.index}" not found`) } throw err } @@ -131,7 +134,7 @@ function getESClient () { await esClient.delete(data) } catch (err) { if (err.statusCode === 404) { - throw new Error(`id: ${data.id} "${data.index}" not found`) + throw new errors.NotFoundError(`id: ${data.id} "${data.index}" not found`) } throw err } @@ -178,10 +181,68 @@ async function postMessageViaWebhook (webhook, message) { await request.post(webhook).send(message) } +/** + * Calls ActionProcessorService to attempt to retry failed process + * @param {String} topic the failed topic name + * @param {Object} payload the payload + * @param {String} id the id that was the subject of the operation failed + */ +async function retryFailedProcess (topic, payload, id) { + await ActionProcessorService.processCreate(topic, payload, id) +} + +let busApiClient + +/** + * Get bus api client. + * + * @returns {Object} the bus api client + */ +function getBusApiClient () { + if (busApiClient) { + return busApiClient + } + busApiClient = busApi( + _.assign(_.pick(config.auth0, [ + 'AUTH0_URL', + 'AUTH0_AUDIENCE', + 'TOKEN_CACHE_TIME', + 'AUTH0_CLIENT_ID', + 'AUTH0_CLIENT_SECRET', + 'AUTH0_PROXY_SERVER_URL' + ]), _.pick(config, 'BUSAPI_URL'), + _.pick(config.topics, 'KAFKA_ERROR_TOPIC')) + + ) + return busApiClient +} + +/** + * Send Kafka event message + * @param {String} topic the topic name + * @param {Object} payload the payload + */ +async function postEvent (topic, payload) { + logger.debug({ component: 'helper', context: 'postEvent', message: `Posting event to Kafka topic ${topic}, ${JSON.stringify(payload)}` }) + + const client = getBusApiClient() + const message = { + topic, + originator: config.KAFKA_MESSAGE_ORIGINATOR, + timestamp: new Date().toISOString(), + 'mime-type': 'application/json', + payload + } + await client.postEvent(message) +} + module.exports = { getKafkaOptions, getESClient, checkEsMutexRelease, getM2MToken, - postMessageViaWebhook + postMessageViaWebhook, + retryFailedProcess, + getBusApiClient, + postEvent } diff --git a/src/services/ActionProcessorService.js b/src/services/ActionProcessorService.js new file mode 100644 index 0000000..3faac7c --- /dev/null +++ b/src/services/ActionProcessorService.js @@ -0,0 +1,83 @@ +/** + * Action Processor Service + */ + +const Joi = require('@hapi/joi') +const logger = require('../common/logger') +const helper = require('../common/helper') +const config = require('config') +const _ = require('lodash') + +const localLogger = { + debug: ({ context, message }) => logger.debug({ component: 'ActionProcessorService', context, message }) +} + +const retryMap = {} + +/** + * Process retry operation message + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processRetry (message, transactionId) { + if (message.originator !== config.KAFKA_MESSAGE_ORIGINATOR) { + localLogger.debug({ context: 'processRetry', message: `originator: ${message.originator} does not match with ${config.KAFKA_MESSAGE_ORIGINATOR} - ignored` }) + return + } + const { topicServiceMapping } = require('../app') + message.topic = message.payload.originalTopic + message.payload = message.payload.originalPayload + await topicServiceMapping[message.topic](message, transactionId) +} + +processRetry.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + originalTopic: Joi.string().required(), + originalPayload: Joi.object().required(), + retry: Joi.number().integer().min(1).required() + }).required() + }).required(), + transactionId: Joi.string().required() +} + +/** + * Analyzes the failed process and sends it to bus api to be received again. + * @param {String} originalTopic the failed topic name + * @param {Object} originalPayload the payload + * @param {String} id the id that was the subject of the operation failed + */ +async function processCreate (originalTopic, originalPayload, id) { + const retry = _.defaultTo(retryMap[id], 0) + 1 + if (retry > config.MAX_RETRY) { + localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${id} exceeds the max retry: ${config.MAX_RETRY} - ignored` }) + return + } + localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${id}` }) + retryMap[id] = retry + const payload = { + originalTopic, + originalPayload, + retry + } + setTimeout(async function () { + await helper.postEvent(config.topics.TAAS_ACTION_RETRY_TOPIC, payload) + }, 2 ** retry * config.BASE_RETRY_DELAY) +} + +processCreate.schema = { + originalTopic: Joi.string().required(), + originalPayload: Joi.object().required(), + id: Joi.string().uuid().required() +} + +module.exports = { + processRetry, + processCreate +} + +logger.buildService(module.exports, 'ActionProcessorService') diff --git a/src/services/WorkPeriodProcessorService.js b/src/services/WorkPeriodProcessorService.js index 770f774..3b91a91 100644 --- a/src/services/WorkPeriodProcessorService.js +++ b/src/services/WorkPeriodProcessorService.js @@ -18,11 +18,25 @@ const esClient = helper.getESClient() async function processCreate (message, transactionId) { const workPeriod = message.payload // Find related resourceBooking - const resourceBooking = await esClient.getExtra({ - index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - transactionId, - id: workPeriod.resourceBookingId - }) + let resourceBooking + try { + resourceBooking = await esClient.getExtra({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + transactionId, + id: workPeriod.resourceBookingId + }) + } catch (err) { + // if resource booking was not found, it may be because + // it has not yet been created. We should send a retry request. + if (err.httpStatus === 404) { + logger.logFullError(err, { component: 'WorkPeriodProcessorService', context: 'processCreate' }) + await helper.retryFailedProcess(message.topic, workPeriod, workPeriod.resourceBookingId) + return + } else { + throw err + } + } + console.log(`[RB value-999] before update: ${JSON.stringify(resourceBooking)}`) // Get ResourceBooking's existing workPeriods const workPeriods = _.isArray(resourceBooking.body.workPeriods) ? resourceBooking.body.workPeriods : [] diff --git a/test/unit/test.js b/test/unit/test.js index d2d3f66..b89a18c 100644 --- a/test/unit/test.js +++ b/test/unit/test.js @@ -17,7 +17,8 @@ const services = { JobCandidateProcessorService: require('../../src/services/JobCandidateProcessorService'), ResourceBookingProcessorService: require('../../src/services/ResourceBookingProcessorService'), WorkPeriodProcessorService: require('../../src/services/WorkPeriodProcessorService'), - WorkPeriodPaymentProcessorService: require('../../src/services/WorkPeriodPaymentProcessorService') + WorkPeriodPaymentProcessorService: require('../../src/services/WorkPeriodPaymentProcessorService'), + ActionProcessorService: require('../../src/services/ActionProcessorService') } // random transaction id here @@ -172,12 +173,9 @@ describe('General Logic Tests', () => { ) }) it(`Failure - processCreate - ${modelInSpaceCase} not found`, async () => { - try { - await services[`${model}ProcessorService`].processCreate(testData.messages[model].create.message, transactionId) - throw new Error() - } catch (err) { - should.equal(err.message, `id: ${testData.messages[parentModel].create.message.payload.id} "${index}" not found`) - } + const processCreateStub = sandbox.stub(services.ActionProcessorService, 'processCreate').callsFake(() => {}) + await services[`${model}ProcessorService`].processCreate(testData.messages[model].create.message, transactionId) + should.equal(processCreateStub.getCall(0).args[0], testData.messages[model].create.topic) }) it(`Failure - processUpdate - ${modelInSpaceCase} not found`, async () => { From 59ec92decb8838af019c10fe69f6d46fd4e0c97e Mon Sep 17 00:00:00 2001 From: eisbilir Date: Mon, 7 Jun 2021 23:50:09 +0300 Subject: [PATCH 2/3] use retry value from payload --- src/common/helper.js | 8 ++++---- src/services/ActionProcessorService.js | 21 +++++++++------------ src/services/WorkPeriodProcessorService.js | 12 +++++++++--- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/common/helper.js b/src/common/helper.js index 9530d99..ae07432 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -12,7 +12,6 @@ const _ = require('lodash') const { Mutex } = require('async-mutex') const m2mAuth = require('tc-core-library-js').auth.m2m const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper') -const ActionProcessorService = require('../services/ActionProcessorService') AWS.config.region = config.esConfig.AWS_REGION @@ -185,10 +184,11 @@ async function postMessageViaWebhook (webhook, message) { * Calls ActionProcessorService to attempt to retry failed process * @param {String} topic the failed topic name * @param {Object} payload the payload - * @param {String} id the id that was the subject of the operation failed + * @param {String} retry how many times has it been retried */ -async function retryFailedProcess (topic, payload, id) { - await ActionProcessorService.processCreate(topic, payload, id) +async function retryFailedProcess (topic, payload, retry) { + const ActionProcessorService = require('../services/ActionProcessorService') + await ActionProcessorService.processCreate(topic, payload, retry) } let busApiClient diff --git a/src/services/ActionProcessorService.js b/src/services/ActionProcessorService.js index 3faac7c..91fa310 100644 --- a/src/services/ActionProcessorService.js +++ b/src/services/ActionProcessorService.js @@ -6,14 +6,11 @@ const Joi = require('@hapi/joi') const logger = require('../common/logger') const helper = require('../common/helper') const config = require('config') -const _ = require('lodash') const localLogger = { debug: ({ context, message }) => logger.debug({ component: 'ActionProcessorService', context, message }) } -const retryMap = {} - /** * Process retry operation message * @param {Object} message the kafka message @@ -25,9 +22,10 @@ async function processRetry (message, transactionId) { return } const { topicServiceMapping } = require('../app') + const retry = message.payload.retry message.topic = message.payload.originalTopic message.payload = message.payload.originalPayload - await topicServiceMapping[message.topic](message, transactionId) + await topicServiceMapping[message.topic](message, transactionId, { retry }) } processRetry.schema = { @@ -49,22 +47,21 @@ processRetry.schema = { * Analyzes the failed process and sends it to bus api to be received again. * @param {String} originalTopic the failed topic name * @param {Object} originalPayload the payload - * @param {String} id the id that was the subject of the operation failed + * @param {Number} retry how many times has it been retried */ -async function processCreate (originalTopic, originalPayload, id) { - const retry = _.defaultTo(retryMap[id], 0) + 1 +async function processCreate (originalTopic, originalPayload, retry) { + retry = retry + 1 if (retry > config.MAX_RETRY) { - localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${id} exceeds the max retry: ${config.MAX_RETRY} - ignored` }) + localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${originalPayload.id} exceeds the max retry: ${config.MAX_RETRY} - ignored` }) return } - localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${id}` }) - retryMap[id] = retry + localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${originalPayload.id}` }) const payload = { originalTopic, originalPayload, retry } - setTimeout(async function () { + setTimeout(async () => { await helper.postEvent(config.topics.TAAS_ACTION_RETRY_TOPIC, payload) }, 2 ** retry * config.BASE_RETRY_DELAY) } @@ -72,7 +69,7 @@ async function processCreate (originalTopic, originalPayload, id) { processCreate.schema = { originalTopic: Joi.string().required(), originalPayload: Joi.object().required(), - id: Joi.string().uuid().required() + retry: Joi.number().integer().min(0).required() } module.exports = { diff --git a/src/services/WorkPeriodProcessorService.js b/src/services/WorkPeriodProcessorService.js index 3b91a91..e86b21d 100644 --- a/src/services/WorkPeriodProcessorService.js +++ b/src/services/WorkPeriodProcessorService.js @@ -14,8 +14,9 @@ const esClient = helper.getESClient() * Process create entity message * @param {Object} message the kafka message * @param {String} transactionId + * @param {Object} options */ -async function processCreate (message, transactionId) { +async function processCreate (message, transactionId, options) { const workPeriod = message.payload // Find related resourceBooking let resourceBooking @@ -30,7 +31,7 @@ async function processCreate (message, transactionId) { // it has not yet been created. We should send a retry request. if (err.httpStatus === 404) { logger.logFullError(err, { component: 'WorkPeriodProcessorService', context: 'processCreate' }) - await helper.retryFailedProcess(message.topic, workPeriod, workPeriod.resourceBookingId) + await helper.retryFailedProcess(message.topic, workPeriod, options.retry) return } else { throw err @@ -78,7 +79,12 @@ processCreate.schema = { updatedBy: Joi.string().uuid().allow(null) }).required() }).required(), - transactionId: Joi.string().required() + transactionId: Joi.string().required(), + options: Joi.object().keys({ + retry: Joi.number().integer().min(0).default(0) + }).default({ + retry: 0 + }) } /** From 3716995d1631c5e52135bb486ae58a785c43f05d Mon Sep 17 00:00:00 2001 From: eisbilir Date: Tue, 8 Jun 2021 21:11:36 +0300 Subject: [PATCH 3/3] avoid circular dependency --- src/common/helper.js | 12 ------------ src/services/WorkPeriodProcessorService.js | 3 ++- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/src/common/helper.js b/src/common/helper.js index ae07432..6725df1 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -180,17 +180,6 @@ async function postMessageViaWebhook (webhook, message) { await request.post(webhook).send(message) } -/** - * Calls ActionProcessorService to attempt to retry failed process - * @param {String} topic the failed topic name - * @param {Object} payload the payload - * @param {String} retry how many times has it been retried - */ -async function retryFailedProcess (topic, payload, retry) { - const ActionProcessorService = require('../services/ActionProcessorService') - await ActionProcessorService.processCreate(topic, payload, retry) -} - let busApiClient /** @@ -242,7 +231,6 @@ module.exports = { checkEsMutexRelease, getM2MToken, postMessageViaWebhook, - retryFailedProcess, getBusApiClient, postEvent } diff --git a/src/services/WorkPeriodProcessorService.js b/src/services/WorkPeriodProcessorService.js index e86b21d..ed24304 100644 --- a/src/services/WorkPeriodProcessorService.js +++ b/src/services/WorkPeriodProcessorService.js @@ -9,6 +9,7 @@ const constants = require('../common/constants') const config = require('config') const _ = require('lodash') const esClient = helper.getESClient() +const ActionProcessorService = require('../services/ActionProcessorService') /** * Process create entity message @@ -31,7 +32,7 @@ async function processCreate (message, transactionId, options) { // it has not yet been created. We should send a retry request. if (err.httpStatus === 404) { logger.logFullError(err, { component: 'WorkPeriodProcessorService', context: 'processCreate' }) - await helper.retryFailedProcess(message.topic, workPeriod, options.retry) + await ActionProcessorService.processCreate(message.topic, workPeriod, options.retry) return } else { throw err