diff --git a/src/events/index.js b/src/events/index.js index c4c121b8..1ddcd82c 100644 --- a/src/events/index.js +++ b/src/events/index.js @@ -1,6 +1,7 @@ import { EVENT, CONNECT_NOTIFICATION_EVENT } from '../constants'; import { projectCreatedHandler, + projectCreatedHandlerForPhases, projectUpdatedKafkaHandler } from './projects'; import { projectPhaseAddedHandler, projectPhaseRemovedHandler, projectPhaseUpdatedHandler } from './projectPhases'; @@ -39,7 +40,7 @@ const voidRabbitHandler = (logger, msg, channel) => { // we should completely remove the handlers for this events. export const rabbitHandlers = { 'project.initial': projectCreatedHandler, // is only used `seedElasticsearchIndex.js` and can be removed - [EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: projectCreatedHandlerForPhases, // we have to call it, because it triggers topics creating for phases [EVENT.ROUTING_KEY.PROJECT_UPDATED]: voidRabbitHandler, // DISABLED [EVENT.ROUTING_KEY.PROJECT_DELETED]: voidRabbitHandler, // DISABLED [EVENT.ROUTING_KEY.PROJECT_MEMBER_ADDED]: voidRabbitHandler, // DISABLED diff --git a/src/events/projects/index.js b/src/events/projects/index.js index ab53c704..7b89eb92 100644 --- a/src/events/projects/index.js +++ b/src/events/projects/index.js @@ -78,6 +78,31 @@ const projectCreatedHandler = Promise.coroutine(function* (logger, msg, channel) } }); +/** + * Handler for project creation event + * + * we call this handle only for the sake of creating topics for the phases + * + * @param {Object} logger logger to log along with trace id + * @param {Object} msg event payload + * @param {Object} channel channel to ack, nack + * @returns {undefined} + */ +const projectCreatedHandlerForPhases = Promise.coroutine(function* (logger, msg, channel) { // eslint-disable-line func-names + const project = JSON.parse(msg.content.toString()); + try { + if (project.phases && project.phases.length > 0) { + logger.debug('Phases found for the project, trying to create topics for each phase.'); + const topicPromises = _.map(project.phases, phase => createPhaseTopic(logger, phase)); + yield Promise.all(topicPromises); + } + channel.ack(msg); + } catch (error) { + logger.error(`Error processing event (projectId: ${project.id})`, error); + channel.nack(msg, false, !msg.fields.redelivered); + } +}); + /** * Handler for project updated event * @param {Object} logger logger to log along with trace id @@ -190,6 +215,7 @@ async function projectUpdatedKafkaHandler(app, topic, payload) { module.exports = { projectCreatedHandler, + projectCreatedHandlerForPhases, projectUpdatedHandler, projectDeletedHandler, projectUpdatedKafkaHandler,