Skip to content

Commit 460fc4e

Browse files
Fix bug in processor
1 parent ac3e613 commit 460fc4e

File tree

5 files changed

+58
-41
lines changed

5 files changed

+58
-41
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Configuration for the notification server is at `config/default.js`.
1313
The following parameters can be set in config files or in env variables:
1414

1515
- LOG_LEVEL: the log level; default value: 'debug'
16+
- KAFKA_GROUP_ID: group id of the consumer; default value: 'submission-processor-es-group'
1617
- KAFKA_URL: comma separated Kafka hosts; default value: 'localhost:9092'
1718
- KAFKA_CLIENT_CERT: Kafka connection certificate, optional; default value is undefined;
1819
if not provided, then SSL connection is not used, direct insecure connection is used;

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module.exports = {
66
DISABLE_LOGGING: process.env.DISABLE_LOGGING || false, // If true, logging will be disabled
77
LOG_LEVEL: process.env.LOG_LEVEL || 'debug',
88

9+
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'submission-processor-es-group',
910
KAFKA_URL: process.env.KAFKA_URL || 'localhost:9092',
1011
// below are used for secure Kafka connection, they are optional
1112
// for the local Kafka, they are not needed

package-lock.json

Lines changed: 37 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"http-aws-es": "^6.0.0",
3636
"joi": "^9.0.4",
3737
"lodash": "^4.17.10",
38-
"no-kafka": "^3.2.4",
38+
"no-kafka": "^3.4.3",
3939
"topcoder-healthcheck-dropin": "^1.0.2",
4040
"winston": "^2.2.0"
4141
},

src/app.js

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,21 @@
33
*/
44

55
global.Promise = require('bluebird')
6-
const _ = require('lodash')
76
const config = require('config')
87
const logger = require('./common/logger')
98
const Kafka = require('no-kafka')
10-
const co = require('co')
119
const ProcessorService = require('./services/ProcessorService')
1210
const healthcheck = require('topcoder-healthcheck-dropin')
1311

1412
// create consumer
15-
const options = { connectionString: config.KAFKA_URL, handlerConcurrency: 1 }
13+
const options = { connectionString: config.KAFKA_URL, groupId: config.KAFKA_GROUP_ID, handlerConcurrency: 1 }
1614
if (config.KAFKA_CLIENT_CERT && config.KAFKA_CLIENT_CERT_KEY) {
1715
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY }
1816
}
19-
const consumer = new Kafka.SimpleConsumer(options)
17+
const consumer = new Kafka.GroupConsumer(options)
2018

2119
// data handler
22-
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
20+
const dataHandler = async (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
2321
const message = m.message.value.toString('utf8')
2422
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
2523
m.offset}; Message: ${message}.`)
@@ -37,24 +35,26 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (
3735
// ignore the message
3836
return
3937
}
40-
return co(function * () {
38+
try {
4139
switch (topic) {
4240
case config.CREATE_DATA_TOPIC:
43-
yield ProcessorService.create(messageJSON)
41+
await ProcessorService.create(messageJSON)
4442
break
4543
case config.UPDATE_DATA_TOPIC:
46-
yield ProcessorService.update(messageJSON)
44+
await ProcessorService.update(messageJSON)
4745
break
4846
case config.DELETE_DATA_TOPIC:
49-
yield ProcessorService.remove(messageJSON)
47+
await ProcessorService.remove(messageJSON)
5048
break
5149
default:
5250
throw new Error(`Invalid topic: ${topic}`)
5351
}
54-
})
52+
5553
// commit offset
56-
.then(() => consumer.commitOffset({ topic, partition, offset: m.offset }))
57-
.catch((err) => logger.error(err))
54+
await consumer.commitOffset({ topic, partition, offset: m.offset })
55+
} catch (err) {
56+
logger.error(err)
57+
}
5858
})
5959

6060
// check if there is kafka connection alive
@@ -70,13 +70,14 @@ function check () {
7070
return connected
7171
}
7272

73+
const topics = [config.CREATE_DATA_TOPIC, config.UPDATE_DATA_TOPIC, config.DELETE_DATA_TOPIC]
74+
// consume configured topics
7375
consumer
74-
.init()
75-
// consume configured topics
76+
.init([{
77+
subscriptions: topics,
78+
handler: dataHandler
79+
}])
7680
.then(() => {
7781
healthcheck.init([check])
78-
79-
const topics = [config.CREATE_DATA_TOPIC, config.UPDATE_DATA_TOPIC, config.DELETE_DATA_TOPIC]
80-
_.each(topics, (tp) => consumer.subscribe(tp, { time: Kafka.LATEST_OFFSET }, dataHandler))
8182
})
8283
.catch((err) => logger.error(err))

0 commit comments

Comments
 (0)