3
3
*/
4
4
5
5
global . Promise = require ( 'bluebird' )
6
+ const _ = require ( 'lodash' )
6
7
const config = require ( 'config' )
7
8
const logger = require ( './common/logger' )
8
9
const Kafka = require ( 'no-kafka' )
10
+ const co = require ( 'co' )
9
11
const ProcessorService = require ( './services/ProcessorService' )
10
12
const healthcheck = require ( 'topcoder-healthcheck-dropin' )
11
13
12
14
// create consumer
13
- const options = { connectionString : config . KAFKA_URL , groupId : config . KAFKA_GROUP_ID , handlerConcurrency : 1 }
15
+ const options = { connectionString : config . KAFKA_URL , handlerConcurrency : 1 }
14
16
if ( config . KAFKA_CLIENT_CERT && config . KAFKA_CLIENT_CERT_KEY ) {
15
17
options . ssl = { cert : config . KAFKA_CLIENT_CERT , key : config . KAFKA_CLIENT_CERT_KEY }
16
18
}
17
- const consumer = new Kafka . GroupConsumer ( options )
19
+ const consumer = new Kafka . SimpleConsumer ( options )
18
20
19
21
// data handler
20
- const dataHandler = async ( messageSet , topic , partition ) => Promise . each ( messageSet , async ( m ) => {
22
+ const dataHandler = ( messageSet , topic , partition ) => Promise . each ( messageSet , ( m ) => {
21
23
const message = m . message . value . toString ( 'utf8' )
22
24
logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${
23
25
m . offset } ; Message: ${ message } .`)
@@ -35,26 +37,24 @@ const dataHandler = async (messageSet, topic, partition) => Promise.each(message
35
37
// ignore the message
36
38
return
37
39
}
38
- try {
40
+ return co ( function * ( ) {
39
41
switch ( topic ) {
40
42
case config . CREATE_DATA_TOPIC :
41
- await ProcessorService . create ( messageJSON )
43
+ yield ProcessorService . create ( messageJSON )
42
44
break
43
45
case config . UPDATE_DATA_TOPIC :
44
- await ProcessorService . update ( messageJSON )
46
+ yield ProcessorService . update ( messageJSON )
45
47
break
46
48
case config . DELETE_DATA_TOPIC :
47
- await ProcessorService . remove ( messageJSON )
49
+ yield ProcessorService . remove ( messageJSON )
48
50
break
49
51
default :
50
52
throw new Error ( `Invalid topic: ${ topic } ` )
51
53
}
52
-
54
+ } )
53
55
// commit offset
54
- await consumer . commitOffset ( { topic, partition, offset : m . offset } )
55
- } catch ( err ) {
56
- logger . error ( err )
57
- }
56
+ . then ( ( ) => consumer . commitOffset ( { topic, partition, offset : m . offset } ) )
57
+ . catch ( ( err ) => logger . error ( err ) )
58
58
} )
59
59
60
60
// check if there is kafka connection alive
@@ -70,14 +70,13 @@ function check () {
70
70
return connected
71
71
}
72
72
73
- const topics = [ config . CREATE_DATA_TOPIC , config . UPDATE_DATA_TOPIC , config . DELETE_DATA_TOPIC ]
74
- // consume configured topics
75
73
consumer
76
- . init ( [ {
77
- subscriptions : topics ,
78
- handler : dataHandler
79
- } ] )
74
+ . init ( )
75
+ // consume configured topics
80
76
. then ( ( ) => {
81
77
healthcheck . init ( [ check ] )
78
+
79
+ const topics = [ config . CREATE_DATA_TOPIC , config . UPDATE_DATA_TOPIC , config . DELETE_DATA_TOPIC ]
80
+ _ . each ( topics , ( tp ) => consumer . subscribe ( tp , { time : Kafka . LATEST_OFFSET } , dataHandler ) )
82
81
} )
83
82
. catch ( ( err ) => logger . error ( err ) )
0 commit comments