Skip to content

Commit 5e506c2

Browse files
authored
Merge pull request #597 from topcoder-platform/hotfix/kafka-handlers-config
[DEV] [HOTFIX] Kafka Topics Handlers Config
2 parents c44dd43 + 26c30c3 commit 5e506c2

File tree

2 files changed

+71
-25
lines changed

2 files changed

+71
-25
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
"pg-native": "^3.0.0",
7575
"sequelize": "^5.8.7",
7676
"swagger-ui-express": "^4.0.6",
77-
"tc-core-library-js": "github:appirio-tech/tc-core-library-js#v2.6.3",
77+
"tc-core-library-js": "github:appirio-tech/tc-core-library-js#v2.6.6",
7878
"traverse": "^0.6.6",
7979
"urlencode": "^1.1.0",
8080
"yamljs": "^0.3.0"

src/events/kafkaHandlers.js

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
/**
22
* BUS Event Handlers
33
*/
4-
import { CONNECT_NOTIFICATION_EVENT, BUS_API_EVENT, RESOURCES } from '../constants';
54
import {
6-
projectCreatedKafkaHandler,
7-
projectUpdatedKafkaHandler } from './projects';
8-
import { projectPhaseAddedKafkaHandler, projectPhaseRemovedKafkaHandler,
9-
projectPhaseUpdatedKafkaHandler } from './projectPhases';
5+
CONNECT_NOTIFICATION_EVENT,
6+
BUS_API_EVENT,
7+
RESOURCES,
8+
} from '../constants';
109
import {
11-
timelineAdjustedKafkaHandler,
12-
} from './timelines';
10+
projectCreatedKafkaHandler,
11+
projectUpdatedKafkaHandler,
12+
} from './projects';
1313
import {
14-
milestoneUpdatedKafkaHandler,
15-
} from './milestones';
14+
projectPhaseAddedKafkaHandler,
15+
projectPhaseRemovedKafkaHandler,
16+
projectPhaseUpdatedKafkaHandler,
17+
} from './projectPhases';
18+
import { timelineAdjustedKafkaHandler } from './timelines';
19+
import { milestoneUpdatedKafkaHandler } from './milestones';
1620

1721
const kafkaHandlers = {
1822
/**
@@ -33,22 +37,64 @@ const kafkaHandlers = {
3337
// Events coming from timeline/milestones (considering it as a separate module/service in future)
3438
[CONNECT_NOTIFICATION_EVENT.MILESTONE_TRANSITION_COMPLETED]: milestoneUpdatedKafkaHandler,
3539
[CONNECT_NOTIFICATION_EVENT.TIMELINE_ADJUSTED]: timelineAdjustedKafkaHandler,
40+
};
3641

37-
/**
38-
* New Unified Bus Events
39-
*/
40-
[BUS_API_EVENT.PROJECT_CREATED]: {
41-
[RESOURCES.PROJECT]: projectCreatedKafkaHandler,
42-
},
43-
[BUS_API_EVENT.PROJECT_PHASE_CREATED]: {
44-
[RESOURCES.PHASE]: projectPhaseAddedKafkaHandler,
45-
},
46-
[BUS_API_EVENT.PROJECT_PHASE_UPDATED]: {
47-
[RESOURCES.PHASE]: projectPhaseUpdatedKafkaHandler,
48-
},
49-
[BUS_API_EVENT.PROJECT_PHASE_DELETED]: {
50-
[RESOURCES.PHASE]: projectPhaseRemovedKafkaHandler,
51-
},
42+
/**
43+
* Register New Unified Bus Event Handlers
44+
*
45+
* We need this special method so it would properly merge topics with the same names
46+
* but different resources.
47+
*
48+
* @param {String} topic Kafka topic name
49+
* @param {String} resource resource name
50+
* @param {Function} handler handler method
51+
*
52+
* @returns {void}
53+
*/
54+
const registerKafkaHandler = (topic, resource, handler) => {
55+
let topicConfig = kafkaHandlers[topic];
56+
57+
// if config for topic is not yet initialized, create it
58+
if (!topicConfig) {
59+
topicConfig = {};
60+
kafkaHandlers[topic] = topicConfig;
61+
}
62+
63+
if (typeof topicConfig !== 'object') {
64+
throw new Error(
65+
`Topic "${topic}" should be defined as object with resource names as keys.`,
66+
);
67+
}
68+
69+
if (topicConfig[resource]) {
70+
throw new Error(
71+
`Handler for topic "${topic}" with resource ${resource} has been already registered.`,
72+
);
73+
}
74+
75+
topicConfig[resource] = handler;
5276
};
5377

78+
registerKafkaHandler(
79+
BUS_API_EVENT.PROJECT_CREATED,
80+
RESOURCES.PROJECT,
81+
projectCreatedKafkaHandler,
82+
);
83+
registerKafkaHandler(
84+
BUS_API_EVENT.PROJECT_PHASE_CREATED,
85+
RESOURCES.PHASE,
86+
projectPhaseAddedKafkaHandler,
87+
);
88+
registerKafkaHandler(
89+
BUS_API_EVENT.PROJECT_PHASE_UPDATED,
90+
RESOURCES.PHASE,
91+
projectPhaseUpdatedKafkaHandler,
92+
);
93+
registerKafkaHandler(
94+
BUS_API_EVENT.PROJECT_PHASE_DELETED,
95+
RESOURCES.PHASE,
96+
projectPhaseRemovedKafkaHandler,
97+
);
98+
99+
54100
export default kafkaHandlers;

0 commit comments

Comments
 (0)