@@ -8,10 +8,6 @@ const { Kafka } = require('kafkajs')
8
8
const logger = require ( './common/logger' ) ;
9
9
const models = require ( './models' ) ;
10
10
11
- let emailTries = { } ;
12
-
13
-
14
-
15
11
16
12
/**
17
13
* Configure Kafka consumer.
@@ -25,81 +21,97 @@ async function configureKafkaConsumer(handlers) {
25
21
}
26
22
const kafka = new Kafka ( options )
27
23
const consumer = kafka . consumer ( { groupId : config . KAFKA_GROUP_ID } ) ;
28
- // data handler
29
- const dataHandler = ( messageSet , topic , partition ) => Promise . all ( messageSet , ( m ) => {
30
- console . log ( "messageSet" , messageSet ) ;
31
- const message = m . message . value . toString ( 'utf8' ) ;
32
- logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${ m . offset } ; Message: ${ message } .` ) ;
33
- // ignore configured Kafka topic prefix
34
- let topicName = topic ;
35
-
36
- // find handler
37
- const handler = handlers [ topicName ] ;
38
- if ( ! handler ) {
39
- logger . info ( `No handler configured for topic: ${ topicName } ` ) ;
40
- // return null to ignore this message
41
- return null ;
42
- }
43
- let emailModel = { } ;
44
- const busPayload = JSON . parse ( message ) ;
45
- const messageJSON = busPayload . payload ;
46
- const handlerAsync = Promise . promisify ( handler ) ;
47
- // use handler to create notification instances for each recipient
48
- return models . Email . create (
49
- Object . assign ( { status : 'PENDING' } , {
50
- topicName,
51
- data : JSON . stringify ( messageJSON . data ) ,
52
- recipients : JSON . stringify ( messageJSON . recipients ) ,
53
- } )
54
- ) . then ( ( model ) => {
55
- emailModel = model ;
56
- return handlerAsync ( topicName , messageJSON ) ;
57
- } ) . then ( ( result ) => { // save email
58
- const now = new Date ( ) ;
59
- logger . log ( 'info' , 'Email sent' , {
60
- sender : 'Connect' ,
61
- to_address : messageJSON . recipients . join ( ',' ) ,
62
- from_address : config . EMAIL_FROM ,
63
- status : result . success ? 'Message accepted' : 'Message rejected' ,
64
- error : result . error ? result . error . toString ( ) : 'No error message' ,
65
- } ) ;
66
- if ( result . success ) {
67
- emailTries [ topicName ] = 0 ;
68
- emailModel . status = 'SUCCESS' ;
69
- return emailModel . save ( ) ;
70
- } else {
71
- // emailTries[topicName] += 1; //temporary disabling this feature
72
- if ( result . error ) {
73
- logger . log ( 'error' , 'Send email error details' , result . error ) ;
74
- }
75
- emailModel . status = 'FAILED' ;
76
- return emailModel . save ( ) . then ( ( ) => {
24
+ await consumer . connect ( )
25
+ await consumer . subscribe ( { topics : _ . keys ( handlers ) } ) ;
26
+ dataHandler ( consumer ) . catch ( ( err ) => {
27
+ logger . error ( err ) ;
28
+ } ) ;
29
+ }
30
+
31
+
32
+ async function dataHandler ( consumer ) {
33
+ await consumer . run ( {
34
+ eachMessage : async ( { topic, partition, msg } ) => {
35
+ const message = msg . value . toString ( 'utf8' )
36
+ logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${ m . offset } ; Message: ${ message } .` ) ;
37
+ // ignore configured Kafka topic prefix
38
+ let topicName = topic ;
39
+ // find handler
40
+ const handler = handlers [ topicName ] ;
41
+ if ( ! handler ) {
42
+ logger . info ( `No handler configured for topic: ${ topicName } ` ) ;
43
+ // return null to ignore this message
44
+ return null ;
45
+ }
46
+ let emailModel = { } ;
47
+ const busPayload = JSON . parse ( message ) ;
48
+ const messageJSON = busPayload . payload ;
49
+ try {
50
+
51
+ const result = await models . Email . create ( {
52
+ status : 'PENDING' ,
53
+ topicName,
54
+ data : JSON . stringify ( messageJSON ) ,
55
+ recipients : JSON . stringify ( messageJSON . recipients ) ,
56
+ } )
57
+
58
+ logger . log ( 'info' , 'Email sent' , {
59
+ sender : 'Connect' ,
60
+ to_address : messageJSON . recipients . join ( ',' ) ,
61
+ from_address : config . EMAIL_FROM ,
62
+ status : result . success ? 'Message accepted' : 'Message rejected' ,
63
+ error : result . error ? result . error . toString ( ) : 'No error message' ,
77
64
} ) ;
65
+
66
+ if ( result . success ) {
67
+ emailTries [ topicName ] = 0 ;
68
+ emailModel . status = 'SUCCESS' ;
69
+ emailModel . save ( ) ;
70
+ } else {
71
+ // emailTries[topicName] += 1; //temporary disabling this feature
72
+ if ( result . error ) {
73
+ logger . log ( 'error' , 'Send email error details' , result . error ) ;
74
+ }
75
+ }
76
+ } catch ( e ) {
77
+ logger . error ( e )
78
78
}
79
- } ) . then ( ( ) => consumer . commitOffset ( { topic, partition, offset : m . offset } ) ) // commit offset
80
- . catch ( ( err ) => logger . error ( err ) ) ;
81
- } ) ;
82
79
83
80
84
- return startKafkaConsumer ( consumer , handlers , dataHandler ) ;
85
- }
86
81
87
- /**
88
- * Start Kafka consumer.
89
- * @param {Object } consumer the kafka consumer
90
- * @param {Object } handlers the handlers map
91
- * @param {Object } dataHandler the kafka data handler function
92
- */
93
- async function startKafkaConsumer ( consumer , handlers , dataHandler ) {
94
- console . log ( consumer , handlers , dataHandler )
95
- await consumer . connect ( )
96
- await Promise . all ( _ . keys ( handlers ) , ( topicName ) => { // add back the ignored topic prefix to use full topic name
97
- emailTries [ topicName ] = 0 ;
98
- return consumer . subscribe ( topicName , dataHandler ) ;
82
+
83
+ } ,
84
+ } )
85
+
86
+ const errorTypes = [ 'unhandledRejection' , 'uncaughtException' ]
87
+ const signalTraps = [ 'SIGTERM' , 'SIGINT' , 'SIGUSR2' ]
88
+
89
+ errorTypes . forEach ( type => {
90
+ process . on ( type , async e => {
91
+ try {
92
+ console . log ( `process.on ${ type } ` )
93
+ console . error ( e )
94
+ await consumer . disconnect ( )
95
+ process . exit ( 0 )
96
+ } catch ( _ ) {
97
+ process . exit ( 1 )
98
+ }
99
+ } )
100
+ } )
101
+
102
+ signalTraps . forEach ( type => {
103
+ process . once ( type , async ( ) => {
104
+ try {
105
+ await consumer . disconnect ( )
106
+ } finally {
107
+ process . kill ( process . pid , type )
108
+ }
109
+ } )
99
110
} )
100
111
101
112
}
102
113
114
+
103
115
/**
104
116
* Callback to retry sending email.
105
117
* @param {Object } handlers the handlers
0 commit comments