@@ -18,6 +18,8 @@ const logger = require('./common/logger');
18
18
const errors = require ( './common/errors' ) ;
19
19
const models = require ( './models' ) ;
20
20
21
+ let emailTries = { } ;
22
+
21
23
/**
22
24
* Configure Kafka consumer.
23
25
* @param {Object } handlers the handlers
@@ -30,12 +32,7 @@ function configureKafkaConsumer(handlers) {
30
32
}
31
33
const pauseTime = parseInt ( config . EMAIL_PAUSE_TIME ) ;
32
34
const maxErrors = parseInt ( config . EMAIL_MAX_ERRORS ) ;
33
-
34
- // email tries
35
- let emailTries = 0 ;
36
-
37
- // Kafka Consumer
38
- const consumer = new Kafka . GroupConsumer ( options ) ;
35
+ const consumer = new Kafka . SimpleConsumer ( options ) ;
39
36
40
37
// data handler
41
38
const dataHandler = ( messageSet , topic , partition ) => Promise . each ( messageSet , ( m ) => {
@@ -53,11 +50,10 @@ function configureKafkaConsumer(handlers) {
53
50
// return null to ignore this message
54
51
return null ;
55
52
}
56
-
57
53
let emailModel = { } ;
58
54
const messageJSON = JSON . parse ( message ) ;
59
55
const handlerAsync = Promise . promisify ( handler ) ;
60
-
56
+ // use handler to create notification instances for each recipient
61
57
return models . Email . create (
62
58
Object . assign ( { status : 'PENDING' } , {
63
59
topicName,
@@ -76,23 +72,23 @@ function configureKafkaConsumer(handlers) {
76
72
status : result . success ? 'Message accepted' : 'Message rejected' ,
77
73
} ) ;
78
74
if ( result . success ) {
79
- emailTries = 0 ;
75
+ emailTries [ topicName ] = 0 ;
80
76
emailModel . status = 'SUCCESS' ;
81
77
return emailModel . save ( ) ;
82
78
} else {
83
- emailTries += 1 ;
79
+ emailTries [ topicName ] += 1 ;
84
80
emailModel . status = 'FAILED' ;
85
81
return emailModel . save ( ) . then ( ( ) => {
86
- const currentTries = emailTries ;
82
+ const currentTries = emailTries [ topicName ] ;
87
83
if ( currentTries > maxErrors ) {
88
84
logger . debug ( `Failed to send email. Will sleep for ${ pauseTime } s` ) ;
89
- emailTries = 0 ;
85
+ emailTries [ topicName ] = 0 ;
90
86
91
87
schedule . scheduleJob ( new Date ( now . getTime ( ) + pauseTime * 1000 ) , ( ) => {
92
- return startKafkaConsumer ( consumer , handlers , dataHandler ) ;
88
+ consumer . subscribe ( topic , dataHandler ) ;
93
89
} ) ;
94
90
95
- return consumer . end ( ) . then ( ( ) => {
91
+ return consumer . unsubscribe ( topic , partition ) . then ( ( ) => {
96
92
throw result . error
97
93
} ) ;
98
94
} else {
0 commit comments