1
1
/**
2
2
* BUS Event Handlers
3
3
*/
4
- import { CONNECT_NOTIFICATION_EVENT , BUS_API_EVENT , RESOURCES } from '../constants' ;
5
4
import {
6
- projectCreatedKafkaHandler ,
7
- projectUpdatedKafkaHandler } from './projects' ;
8
- import { projectPhaseAddedKafkaHandler , projectPhaseRemovedKafkaHandler ,
9
- projectPhaseUpdatedKafkaHandler } from './projectPhases ' ;
5
+ CONNECT_NOTIFICATION_EVENT ,
6
+ BUS_API_EVENT ,
7
+ RESOURCES ,
8
+ } from '../constants ' ;
10
9
import {
11
- timelineAdjustedKafkaHandler ,
12
- } from './timelines' ;
10
+ projectCreatedKafkaHandler ,
11
+ projectUpdatedKafkaHandler ,
12
+ } from './projects' ;
13
13
import {
14
- milestoneUpdatedKafkaHandler ,
15
- } from './milestones' ;
14
+ projectPhaseAddedKafkaHandler ,
15
+ projectPhaseRemovedKafkaHandler ,
16
+ projectPhaseUpdatedKafkaHandler ,
17
+ } from './projectPhases' ;
18
+ import { timelineAdjustedKafkaHandler } from './timelines' ;
19
+ import { milestoneUpdatedKafkaHandler } from './milestones' ;
16
20
17
21
const kafkaHandlers = {
18
22
/**
@@ -33,22 +37,64 @@ const kafkaHandlers = {
33
37
// Events coming from timeline/milestones (considering it as a separate module/service in future)
34
38
[ CONNECT_NOTIFICATION_EVENT . MILESTONE_TRANSITION_COMPLETED ] : milestoneUpdatedKafkaHandler ,
35
39
[ CONNECT_NOTIFICATION_EVENT . TIMELINE_ADJUSTED ] : timelineAdjustedKafkaHandler ,
40
+ } ;
36
41
37
- /**
38
- * New Unified Bus Events
39
- */
40
- [ BUS_API_EVENT . PROJECT_CREATED ] : {
41
- [ RESOURCES . PROJECT ] : projectCreatedKafkaHandler ,
42
- } ,
43
- [ BUS_API_EVENT . PROJECT_PHASE_CREATED ] : {
44
- [ RESOURCES . PHASE ] : projectPhaseAddedKafkaHandler ,
45
- } ,
46
- [ BUS_API_EVENT . PROJECT_PHASE_UPDATED ] : {
47
- [ RESOURCES . PHASE ] : projectPhaseUpdatedKafkaHandler ,
48
- } ,
49
- [ BUS_API_EVENT . PROJECT_PHASE_DELETED ] : {
50
- [ RESOURCES . PHASE ] : projectPhaseRemovedKafkaHandler ,
51
- } ,
42
+ /**
43
+ * Register New Unified Bus Event Handlers
44
+ *
45
+ * We need this special method so it would properly merge topics with the same names
46
+ * but different resources.
47
+ *
48
+ * @param {String } topic Kafka topic name
49
+ * @param {String } resource resource name
50
+ * @param {Function } handler handler method
51
+ *
52
+ * @returns {void }
53
+ */
54
+ const registerKafkaHandler = ( topic , resource , handler ) => {
55
+ let topicConfig = kafkaHandlers [ topic ] ;
56
+
57
+ // if config for topic is not yet initialized, create it
58
+ if ( ! topicConfig ) {
59
+ topicConfig = { } ;
60
+ kafkaHandlers [ topic ] = topicConfig ;
61
+ }
62
+
63
+ if ( typeof topicConfig !== 'object' ) {
64
+ throw new Error (
65
+ `Topic "${ topic } " should be defined as object with resource names as keys.` ,
66
+ ) ;
67
+ }
68
+
69
+ if ( topicConfig [ resource ] ) {
70
+ throw new Error (
71
+ `Handler for topic "${ topic } " with resource ${ resource } has been already registered.` ,
72
+ ) ;
73
+ }
74
+
75
+ topicConfig [ resource ] = handler ;
52
76
} ;
53
77
78
+ registerKafkaHandler (
79
+ BUS_API_EVENT . PROJECT_CREATED ,
80
+ RESOURCES . PROJECT ,
81
+ projectCreatedKafkaHandler ,
82
+ ) ;
83
+ registerKafkaHandler (
84
+ BUS_API_EVENT . PROJECT_PHASE_CREATED ,
85
+ RESOURCES . PHASE ,
86
+ projectPhaseAddedKafkaHandler ,
87
+ ) ;
88
+ registerKafkaHandler (
89
+ BUS_API_EVENT . PROJECT_PHASE_UPDATED ,
90
+ RESOURCES . PHASE ,
91
+ projectPhaseUpdatedKafkaHandler ,
92
+ ) ;
93
+ registerKafkaHandler (
94
+ BUS_API_EVENT . PROJECT_PHASE_DELETED ,
95
+ RESOURCES . PHASE ,
96
+ projectPhaseRemovedKafkaHandler ,
97
+ ) ;
98
+
99
+
54
100
export default kafkaHandlers ;
0 commit comments