|
5 | 5 | global.Promise = require('bluebird')
|
6 | 6 | const config = require('config')
|
7 | 7 | const Kafka = require('no-kafka')
|
| 8 | +const _ = require('lodash') |
8 | 9 | const healthcheck = require('topcoder-healthcheck-dropin')
|
9 | 10 | const logger = require('./common/logger')
|
10 | 11 | const helper = require('./common/helper')
|
@@ -34,14 +35,18 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (
|
34 | 35 | }
|
35 | 36 |
|
36 | 37 | return (async () => {
|
37 |
| - if (topic === config.CHALLENGE_CREATE_TOPIC) { |
38 |
| - await ProcessorService.handleChallengeCreate(messageJSON) |
39 |
| - } else if (topic === config.PROJECT_MEMBER_ADDED_TOPIC) { |
40 |
| - await ProcessorService.handleMemberAdded(messageJSON) |
41 |
| - } else if (topic === config.PROJECT_MEMBER_REMOVED_TOPIC) { |
42 |
| - await ProcessorService.handleMemberRemoved(messageJSON) |
| 38 | + if (_.includes(config.IGNORED_ORIGINATORS, messageJSON.originator)) { |
| 39 | + logger.error(`The message originator is in the ignored list. Originator: ${messageJSON.originator}`) |
| 40 | + } else { |
| 41 | + if (topic === config.CHALLENGE_CREATE_TOPIC) { |
| 42 | + await ProcessorService.handleChallengeCreate(messageJSON) |
| 43 | + } else if (topic === config.PROJECT_MEMBER_ADDED_TOPIC) { |
| 44 | + await ProcessorService.handleMemberAdded(messageJSON) |
| 45 | + } else if (topic === config.PROJECT_MEMBER_REMOVED_TOPIC) { |
| 46 | + await ProcessorService.handleMemberRemoved(messageJSON) |
| 47 | + } |
| 48 | + logger.debug('Successfully processed message') |
43 | 49 | }
|
44 |
| - logger.debug('Successfully processed message') |
45 | 50 | })()
|
46 | 51 | // commit offset
|
47 | 52 | .then(() => consumer.commitOffset({ topic, partition, offset: m.offset }))
|
|
0 commit comments