@@ -24,8 +24,8 @@ const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
24
24
*/
25
25
const dataHandler = ( messageSet , topic , partition ) => Promise . each ( messageSet , async ( m ) => {
26
26
const message = m . message . value . toString ( 'utf8' )
27
- logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${
28
- m . offset } ; Message: ${ message } .`)
27
+ // logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
28
+ // m.offset }; Message: ${message}.`)
29
29
let messageJSON
30
30
try {
31
31
messageJSON = JSON . parse ( message )
@@ -39,7 +39,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
39
39
}
40
40
41
41
if ( messageJSON . topic !== topic ) {
42
- logger . error ( `The message topic ${ messageJSON . topic } doesn't match the Kafka topic ${ topic } .` )
42
+ logger . error ( `The message topic ${ messageJSON . topic } doesn't match the Kafka topic ${ topic } . Message: ${ JSON . stringify ( messageJSON ) } ` )
43
43
44
44
// commit the message and ignore it
45
45
await consumer . commitOffset ( { topic, partition, offset : m . offset } )
@@ -64,13 +64,14 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
64
64
65
65
try {
66
66
if ( topic === config . CREATE_CHALLENGE_TOPIC ) {
67
- await ProcessorService . processCreate ( messageJSON )
67
+ await ProcessorService . processMessage ( messageJSON )
68
68
} else {
69
- await ProcessorService . processUpdate ( messageJSON )
69
+ await ProcessorService . processMessage ( messageJSON )
70
70
}
71
71
72
- logger . debug ( 'Successfully processed message' )
72
+ // logger.debug('Successfully processed message')
73
73
} catch ( err ) {
74
+ logger . error ( `Error processing message ${ JSON . stringify ( messageJSON ) } ` )
74
75
logger . logFullError ( err )
75
76
} finally {
76
77
// Commit offset regardless of error
0 commit comments