@@ -14,6 +14,7 @@ const models = require('./models');
14
14
* @param {Object } handlers the handlers
15
15
*/
16
16
async function configureKafkaConsumer ( handlers ) {
17
+
17
18
// create group consumer
18
19
const options = { brokers : config . KAFKA_URL . split ( ',' ) } ;
19
20
if ( config . KAFKA_CLIENT_CERT && config . KAFKA_CLIENT_CERT_KEY ) {
@@ -23,17 +24,22 @@ async function configureKafkaConsumer(handlers) {
23
24
const consumer = kafka . consumer ( { groupId : config . KAFKA_GROUP_ID } ) ;
24
25
await consumer . connect ( )
25
26
await consumer . subscribe ( { topics : _ . keys ( handlers ) } ) ;
26
- dataHandler ( consumer ) . catch ( ( err ) => {
27
+ dataHandler ( consumer , handlers ) . catch ( ( err ) => {
27
28
logger . error ( err ) ;
28
29
} ) ;
29
30
}
30
31
31
32
32
- async function dataHandler ( consumer ) {
33
+ async function dataHandler ( consumer , handlers ) {
33
34
await consumer . run ( {
34
- eachMessage : async ( { topic, partition, msg } ) => {
35
+ eachMessage : async ( data ) => {
36
+ const topic = data . topic
37
+ const msg = data . message
38
+ const partition = data . partition
39
+ //If there is no message, return
40
+ if ( ! msg ) return
35
41
const message = msg . value . toString ( 'utf8' )
36
- logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${ m . offset } ; Message: ${ message } .` ) ;
42
+ logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Message: ${ message } .` ) ;
37
43
// ignore configured Kafka topic prefix
38
44
let topicName = topic ;
39
45
// find handler
@@ -43,37 +49,49 @@ async function dataHandler(consumer) {
43
49
// return null to ignore this message
44
50
return null ;
45
51
}
46
- let emailModel = { } ;
52
+ console . log ( [ 1 ] )
53
+ const emailModel = await models . loadEmailModule ( )
47
54
const busPayload = JSON . parse ( message ) ;
48
55
const messageJSON = busPayload . payload ;
49
56
try {
50
57
51
- const result = await models . Email . create ( {
58
+ const emailInfo = {
52
59
status : 'PENDING' ,
53
60
topicName,
54
61
data : JSON . stringify ( messageJSON ) ,
55
62
recipients : JSON . stringify ( messageJSON . recipients ) ,
56
- } )
63
+ }
64
+
65
+ try {
66
+ console . log ( emailModel )
67
+ await emailModel . create ( emailInfo )
57
68
58
- logger . log ( 'info' , 'Email sent' , {
69
+ } catch ( err ) {
70
+ console . log ( err )
71
+ }
72
+ const result = await handler ( topicName , messageJSON ) ;
73
+
74
+ logger . info ( 'info' , 'Email sent' , {
59
75
sender : 'Connect' ,
60
76
to_address : messageJSON . recipients . join ( ',' ) ,
61
77
from_address : config . EMAIL_FROM ,
62
78
status : result . success ? 'Message accepted' : 'Message rejected' ,
63
79
error : result . error ? result . error . toString ( ) : 'No error message' ,
64
80
} ) ;
81
+ console . log ( "******************* result *******************" , result )
65
82
66
83
if ( result . success ) {
67
84
emailTries [ topicName ] = 0 ;
68
85
emailModel . status = 'SUCCESS' ;
69
- emailModel . save ( ) ;
86
+ await emailModel . save ( ) ;
70
87
} else {
71
88
// emailTries[topicName] += 1; //temporary disabling this feature
72
89
if ( result . error ) {
73
- logger . log ( 'error' , 'Send email error details' , result . error ) ;
90
+ logger . error ( 'error' , 'Send email error details' , result . error ) ;
74
91
}
75
92
}
76
93
} catch ( e ) {
94
+ console . log ( e )
77
95
logger . error ( e )
78
96
}
79
97
@@ -83,31 +101,31 @@ async function dataHandler(consumer) {
83
101
} ,
84
102
} )
85
103
86
- const errorTypes = [ 'unhandledRejection' , 'uncaughtException' ]
87
- const signalTraps = [ 'SIGTERM' , 'SIGINT' , 'SIGUSR2' ]
88
-
89
- errorTypes . forEach ( type => {
90
- process . on ( type , async e => {
91
- try {
92
- console . log ( `process.on ${ type } ` )
93
- console . error ( e )
94
- await consumer . disconnect ( )
95
- process . exit ( 0 )
96
- } catch ( _ ) {
97
- process . exit ( 1 )
98
- }
99
- } )
100
- } )
101
-
102
- signalTraps . forEach ( type => {
103
- process . once ( type , async ( ) => {
104
- try {
105
- await consumer . disconnect ( )
106
- } finally {
107
- process . kill ( process . pid , type )
108
- }
109
- } )
110
- } )
104
+ // const errorTypes = ['unhandledRejection', 'uncaughtException']
105
+ // const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
106
+
107
+ // errorTypes.forEach(type => {
108
+ // process.on(type, async e => {
109
+ // try {
110
+ // console.log(`process.on ${type}`)
111
+ // console.error(e)
112
+ // await consumer.disconnect()
113
+ // process.exit(0)
114
+ // } catch (_) {
115
+ // process.exit(1)
116
+ // }
117
+ // })
118
+ // })
119
+
120
+ // signalTraps.forEach(type => {
121
+ // process.once(type, async () => {
122
+ // try {
123
+ // await consumer.disconnect()
124
+ // } finally {
125
+ // process.kill(process.pid, type)
126
+ // }
127
+ // })
128
+ // })
111
129
112
130
}
113
131
@@ -116,34 +134,32 @@ async function dataHandler(consumer) {
116
134
* Callback to retry sending email.
117
135
* @param {Object } handlers the handlers
118
136
*/
119
- function retryEmail ( handlers ) {
120
- return models . Email . findAll ( { where : { status : 'FAILED' , createdAt : { $gt : new Date ( new Date ( ) - config . EMAIL_RETRY_MAX_AGE ) } } } )
121
- . then ( ( models ) => {
122
- if ( models . length > 0 ) {
123
- logger . info ( `Found ${ models . length } e-mails to be resent` ) ;
124
- return Promise . each ( models , ( m ) => {
125
- // find handler
126
- const handler = handlers [ m . topicName ] ;
127
- if ( ! handler ) {
128
- logger . warn ( `No handler configured for topic: ${ m . topicName } ` ) ;
129
- return m ;
130
- }
131
- const handlerAsync = Promise . promisify ( handler ) ;
132
- const messageJSON = { data : JSON . parse ( m . data ) , recipients : JSON . parse ( m . recipients ) } ;
133
- return handlerAsync ( m . topicName , messageJSON ) . then ( ( result ) => { // save email
134
- if ( result . success ) {
135
- logger . info ( `Email model with ${ m . id } was sent correctly` ) ;
136
- m . status = 'SUCCESS' ;
137
- return m . save ( ) ;
138
- }
139
- logger . info ( `Email model with ${ m . id } wasn't sent correctly` ) ;
140
- return m ;
141
- } ) ;
142
- } ) ;
143
- } else {
144
- return models ;
137
+ async function retryEmail ( handlers ) {
138
+ const models = await models . Email . findAll ( { where : { status : 'FAILED' , createdAt : { $gt : new Date ( new Date ( ) - config . EMAIL_RETRY_MAX_AGE ) } } } )
139
+
140
+ if ( models . length > 0 ) {
141
+ logger . info ( `Found ${ models . length } e-mails to be resent` ) ;
142
+ models . map ( async m => {
143
+ // find handler
144
+ const handler = handlers [ m . topicName ] ;
145
+ if ( ! handler ) {
146
+ logger . warn ( `No handler configured for topic: ${ m . topicName } ` ) ;
147
+ return m ;
148
+ }
149
+ const messageJSON = { data : JSON . parse ( m . data ) , recipients : JSON . parse ( m . recipients ) } ;
150
+ const result = await handler ( m . topicName , messageJSON ) ;
151
+ if ( result . success ) {
152
+ logger . info ( `Email model with ${ m . id } was sent correctly` ) ;
153
+ m . status = 'SUCCESS' ;
154
+ return m . save ( ) ;
145
155
}
156
+ logger . info ( `Email model with ${ m . id } wasn't sent correctly` ) ;
157
+ return m ;
146
158
} ) ;
159
+ } else {
160
+ return models ;
161
+ }
162
+
147
163
}
148
164
149
165
async function initServer ( handlers ) {
0 commit comments