5
5
6
6
import config from 'config' ;
7
7
import _ from 'lodash' ;
8
+ import Promise from 'bluebird' ;
8
9
import util from '../../util' ;
9
10
10
11
const ES_PROJECT_INDEX = config . get ( 'elasticsearchConfig.indexName' ) ;
@@ -19,37 +20,22 @@ const eClient = util.getElasticSearchClient();
19
20
* @param {Object } channel channel to ack, nack
20
21
* @returns {undefined }
21
22
*/
22
- const projectAttachmentAddedHandler = ( logger , msg , channel ) => {
23
- const data = JSON . parse ( msg . content . toString ( ) ) ;
24
-
25
- eClient . get ( {
26
- index : ES_PROJECT_INDEX ,
27
- type : ES_PROJECT_TYPE ,
28
- id : data . projectId ,
29
- } ) . then ( ( doc ) => {
23
+ const projectAttachmentAddedHandler = Promise . coroutine ( function * ( logger , msg , channel ) { // eslint-disable-line func-names
24
+ try {
25
+ const data = JSON . parse ( msg . content . toString ( ) ) ;
26
+ const doc = yield eClient . get ( { index : ES_PROJECT_INDEX , type : ES_PROJECT_TYPE , id : data . projectId } ) ;
30
27
const attachments = _ . isArray ( doc . _source . attachments ) ? doc . _source . attachments : [ ] ; // eslint-disable-line no-underscore-dangle
31
28
attachments . push ( data ) ;
32
29
const merged = _ . merge ( doc . _source , { attachments } ) ; // eslint-disable-line no-underscore-dangle
33
- eClient . update ( {
34
- index : ES_PROJECT_INDEX ,
35
- type : ES_PROJECT_TYPE ,
36
- id : data . projectId ,
37
- body : {
38
- doc : merged ,
39
- } ,
40
- } ) . then ( ( ) => {
41
- logger . debug ( 'project attachment added to project document successfully' ) ;
42
- channel . ack ( msg ) ;
43
- } ) . catch ( ( error ) => {
44
- logger . error ( 'failed to add project attachment to project document' , error ) ;
45
- channel . nack ( msg , false , ! msg . fields . redelivered ) ;
46
- } ) ;
47
- } ) . catch ( ( error ) => {
48
- logger . error ( 'Error fetching project document from elasticsearch' , error ) ;
30
+ yield eClient . update ( { index : ES_PROJECT_INDEX , type : ES_PROJECT_TYPE , id : data . projectId , body : { doc : merged } } ) ;
31
+ logger . debug ( 'project attachment added to project document successfully' ) ;
32
+ channel . ack ( msg ) ;
33
+ } catch ( error ) {
34
+ logger . error ( 'Error handling project.attachment.added event' , error ) ;
49
35
// if the message has been redelivered dont attempt to reprocess it
50
36
channel . nack ( msg , false , ! msg . fields . redelivered ) ;
51
- } ) ;
52
- } ;
37
+ }
38
+ } ) ;
53
39
54
40
/**
55
41
* Handler for project attachment updated event
@@ -58,41 +44,33 @@ const projectAttachmentAddedHandler = (logger, msg, channel) => {
58
44
* @param {Object } channel channel to ack, nack
59
45
* @returns {undefined }
60
46
*/
61
- const projectAttachmentUpdatedHandler = ( logger , msg , channel ) => {
62
- const data = JSON . parse ( msg . content . toString ( ) ) ;
63
-
64
- eClient . get ( {
65
- index : ES_PROJECT_INDEX ,
66
- type : ES_PROJECT_TYPE ,
67
- id : data . original . projectId ,
68
- } ) . then ( ( doc ) => {
47
+ const projectAttachmentUpdatedHandler = Promise . coroutine ( function * ( logger , msg , channel ) { // eslint-disable-line func-names
48
+ try {
49
+ const data = JSON . parse ( msg . content . toString ( ) ) ;
50
+ const doc = yield eClient . get ( { index : ES_PROJECT_INDEX , type : ES_PROJECT_TYPE , id : data . original . projectId } ) ;
69
51
const attachments = _ . map ( doc . _source . attachments , ( single ) => { // eslint-disable-line no-underscore-dangle
70
52
if ( single . id === data . original . id ) {
71
53
return _ . merge ( single , data . updated ) ;
72
54
}
73
55
return single ;
74
56
} ) ;
75
57
const merged = _ . merge ( doc . _source , { attachments } ) ; // eslint-disable-line no-underscore-dangle
76
- eClient . update ( {
58
+ yield eClient . update ( {
77
59
index : ES_PROJECT_INDEX ,
78
60
type : ES_PROJECT_TYPE ,
79
61
id : data . original . projectId ,
80
62
body : {
81
63
doc : merged ,
82
64
} ,
83
- } ) . then ( ( ) => {
84
- logger . debug ( 'elasticsearch index updated, project attachment updated successfully' ) ;
85
- channel . ack ( msg ) ;
86
- } ) . catch ( ( error ) => {
87
- logger . error ( 'failed to update project attachment for project document' , error ) ;
88
- channel . nack ( msg , false , ! msg . fields . redelivered ) ;
89
65
} ) ;
90
- } ) . catch ( ( error ) => {
91
- logger . error ( 'Error fetching project document from elasticsearch' , error ) ;
66
+ logger . debug ( 'elasticsearch index updated, project attachment updated successfully' ) ;
67
+ channel . ack ( msg ) ;
68
+ } catch ( error ) {
69
+ logger . error ( 'Error handling project.attachment.updated event' , error ) ;
92
70
// if the message has been redelivered dont attempt to reprocess it
93
71
channel . nack ( msg , false , ! msg . fields . redelivered ) ;
94
- } ) ;
95
- } ;
72
+ }
73
+ } ) ;
96
74
97
75
/**
98
76
* Handler for project attachment deleted event
@@ -101,35 +79,28 @@ const projectAttachmentUpdatedHandler = (logger, msg, channel) => {
101
79
* @param {Object } channel channel to ack, nack
102
80
* @returns {undefined }
103
81
*/
104
- const projectAttachmentRemovedHandler = ( logger , msg , channel ) => {
105
- const data = JSON . parse ( msg . content . toString ( ) ) ;
106
- eClient . get ( {
107
- index : ES_PROJECT_INDEX ,
108
- type : ES_PROJECT_TYPE ,
109
- id : data . projectId ,
110
- } ) . then ( ( doc ) => {
82
+ const projectAttachmentRemovedHandler = Promise . coroutine ( function * ( logger , msg , channel ) { // eslint-disable-line func-names
83
+ try {
84
+ const data = JSON . parse ( msg . content . toString ( ) ) ;
85
+ const doc = yield eClient . get ( { index : ES_PROJECT_INDEX , type : ES_PROJECT_TYPE , id : data . projectId } ) ;
111
86
const attachments = _ . filter ( doc . _source . attachments , single => single . id !== data . id ) ; // eslint-disable-line no-underscore-dangle
112
87
const merged = _ . merge ( doc . _source , { attachments } ) ; // eslint-disable-line no-underscore-dangle
113
- eClient . update ( {
88
+ yield eClient . update ( {
114
89
index : ES_PROJECT_INDEX ,
115
90
type : ES_PROJECT_TYPE ,
116
91
id : data . projectId ,
117
92
body : {
118
93
doc : merged ,
119
94
} ,
120
- } ) . then ( ( ) => {
121
- logger . debug ( 'project attachment removed from project document successfully' ) ;
122
- channel . ack ( msg ) ;
123
- } ) . catch ( ( error ) => {
124
- logger . error ( 'failed to remove project attachment from project document' , error ) ;
125
- channel . nack ( msg , false , ! msg . fields . redelivered ) ;
126
95
} ) ;
127
- } ) . catch ( ( error ) => {
96
+ logger . debug ( 'project attachment removed from project document successfully' ) ;
97
+ channel . ack ( msg ) ;
98
+ } catch ( error ) {
128
99
logger . error ( 'Error fetching project document from elasticsearch' , error ) ;
129
100
// if the message has been redelivered dont attempt to reprocess it
130
101
channel . nack ( msg , false , ! msg . fields . redelivered ) ;
131
- } ) ;
132
- } ;
102
+ }
103
+ } ) ;
133
104
134
105
135
106
module . exports = {
0 commit comments