@@ -21,7 +21,7 @@ const healthcheck = require('topcoder-healthcheck-dropin');
21
21
// helps in health checking in case of unhandled rejection of promises
22
22
const unhandledRejections = [ ] ;
23
23
process . on ( 'unhandledRejection' , ( reason , promise ) => {
24
- logger . debug ( 'Unhandled Rejection at:' , promise , 'reason:' , reason ) ;
24
+ console . log ( 'Unhandled Rejection at:' , promise , 'reason:' , reason ) ;
25
25
// aborts the process to let the HA of the container to restart the task
26
26
// process.abort();
27
27
unhandledRejections . push ( promise ) ;
@@ -31,7 +31,7 @@ process.on('unhandledRejection', (reason, promise) => {
31
31
// from the unhandledRejections array. We just remove the first element from the array as we only care
32
32
// about the count every time an unhandled rejection promise is handled
33
33
process . on ( 'rejectionHandled' , ( promise ) => {
34
- logger . debug ( 'Handled Rejection at:' , promise ) ;
34
+ console . log ( 'Handled Rejection at:' , promise ) ;
35
35
unhandledRejections . shift ( ) ;
36
36
} ) ;
37
37
@@ -93,10 +93,10 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) {
93
93
} ) ;
94
94
} ) ;
95
95
96
- let latestSubscriptions = null ;
96
+ var latestSubscriptions = null ;
97
97
98
98
const check = function ( ) {
99
- logger . debug ( ' Checking health' ) ;
99
+ logger . debug ( " Checking health" ) ;
100
100
if ( unhandledRejections && unhandledRejections . length > 0 ) {
101
101
logger . error ( 'Found unhandled promises. Application is potentially in stalled state.' ) ;
102
102
return false ;
@@ -106,12 +106,12 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) {
106
106
return false ;
107
107
}
108
108
let connected = true ;
109
- const currentSubscriptions = consumer . subscriptions ;
110
- for ( const sIdx of currentSubscriptions ) {
109
+ let currentSubscriptions = consumer . subscriptions ;
110
+ for ( var sIdx in currentSubscriptions ) {
111
111
// current subscription
112
- const sub = currentSubscriptions [ sIdx ] ;
112
+ let sub = currentSubscriptions [ sIdx ] ;
113
113
// previous subscription
114
- const prevSub = latestSubscriptions ? latestSubscriptions [ sIdx ] : null ;
114
+ let prevSub = latestSubscriptions ? latestSubscriptions [ sIdx ] : null ;
115
115
// levarage the `paused` field (https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L66) to
116
116
// determine if there was a possibility of an unhandled exception. If we find paused status for the same
117
117
// topic in two consecutive health checks, we assume it was stuck because of unhandled error
@@ -123,11 +123,11 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) {
123
123
// stores the latest subscription status in global variable
124
124
latestSubscriptions = consumer . subscriptions ;
125
125
consumer . client . initialBrokers . forEach ( conn => {
126
- logger . debug ( `url ${ conn . server ( ) } - connected=${ conn . connected } ` ) ;
127
- connected = conn . connected & connected ;
126
+ logger . debug ( `url ${ conn . server ( ) } - connected=${ conn . connected } ` )
127
+ connected = conn . connected & connected
128
128
} ) ;
129
- return connected ;
130
- } ;
129
+ return connected
130
+ }
131
131
132
132
consumer
133
133
. init ( )
0 commit comments