@@ -78,6 +78,31 @@ const projectCreatedHandler = Promise.coroutine(function* (logger, msg, channel)
78
78
}
79
79
} ) ;
80
80
81
+ /**
82
+ * Handler for project creation event
83
+ *
84
+ * we call this handle only for the sake of creating topics for the phases
85
+ *
86
+ * @param {Object } logger logger to log along with trace id
87
+ * @param {Object } msg event payload
88
+ * @param {Object } channel channel to ack, nack
89
+ * @returns {undefined }
90
+ */
91
+ const projectCreatedHandlerForPhases = Promise . coroutine ( function * ( logger , msg , channel ) { // eslint-disable-line func-names
92
+ const project = JSON . parse ( msg . content . toString ( ) ) ;
93
+ try {
94
+ if ( project . phases && project . phases . length > 0 ) {
95
+ logger . debug ( 'Phases found for the project, trying to create topics for each phase.' ) ;
96
+ const topicPromises = _ . map ( project . phases , phase => createPhaseTopic ( logger , phase ) ) ;
97
+ yield Promise . all ( topicPromises ) ;
98
+ }
99
+ channel . ack ( msg ) ;
100
+ } catch ( error ) {
101
+ logger . error ( `Error processing event (projectId: ${ project . id } )` , error ) ;
102
+ channel . nack ( msg , false , ! msg . fields . redelivered ) ;
103
+ }
104
+ } ) ;
105
+
81
106
/**
82
107
* Handler for project updated event
83
108
* @param {Object } logger logger to log along with trace id
@@ -190,6 +215,7 @@ async function projectUpdatedKafkaHandler(app, topic, payload) {
190
215
191
216
module . exports = {
192
217
projectCreatedHandler,
218
+ projectCreatedHandlerForPhases,
193
219
projectUpdatedHandler,
194
220
projectDeletedHandler,
195
221
projectUpdatedKafkaHandler,
0 commit comments