Skip to content

Commit 5c6eaad

Browse files
author
Sachin Maheshwari
committed
deploying on dev
1 parent 918f970 commit 5c6eaad

File tree

5 files changed

+92
-65
lines changed

5 files changed

+92
-65
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ workflows:
102102
context : org-global
103103
filters:
104104
branches:
105-
only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/platform-filtering']
105+
only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/bulk-notification']
106106
- "build-prod":
107107
context : org-global
108108
filters:

src/common/broadcastAPIHelper.js

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ const m2m = m2mAuth(config);
1111

1212
const logPrefix = "BroadcastAPI: "
1313

14+
/**
15+
* Helper Function - get m2m token
16+
*/
1417
async function getM2MToken() {
1518
return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
1619
}
@@ -70,27 +73,6 @@ async function getUserGroup(userId) {
7073
})
7174
}
7275

73-
async function checkBroadcastMessageForUser(userId, bulkMessage) {
74-
return new Promise(function (resolve, reject) {
75-
Promise.all([
76-
checkUserSkill(userId, bulkMessage),
77-
checkUserGroup(userId, bulkMessage),
78-
]).then((results) => {
79-
let flag = true // TODO need to be sure about default value
80-
_.map(results, (r) => {
81-
flag = !r ? false : flag // TODO recheck condition
82-
})
83-
logger.info(`Final condition result is: ${flag}`)
84-
resolve({
85-
record: bulkMessage,
86-
result: flag
87-
})
88-
}).catch((err) => {
89-
reject(`${logPrefix} got issue in checking recipient condition. ${err}`)
90-
})
91-
}) // promise end
92-
}
93-
9476
/**
9577
* Helper function - check Skill condition
9678
*/
@@ -147,6 +129,33 @@ async function checkUserGroup(userId, bulkMessage) {
147129
})
148130
}
149131

132+
/**
133+
* Main Function - check if broadcast message is for current user or not
134+
*
135+
* @param {Integer} userId
136+
* @param {Object} bulkMessage
137+
*/
138+
async function checkBroadcastMessageForUser(userId, bulkMessage) {
139+
return new Promise(function (resolve, reject) {
140+
Promise.all([
141+
checkUserSkill(userId, bulkMessage),
142+
checkUserGroup(userId, bulkMessage),
143+
]).then((results) => {
144+
let flag = true // TODO need to be sure about default value
145+
_.map(results, (r) => {
146+
flag = !r ? false : flag // TODO recheck condition
147+
})
148+
logger.info(`Final condition result is: ${flag}`)
149+
resolve({
150+
record: bulkMessage,
151+
result: flag
152+
})
153+
}).catch((err) => {
154+
reject(`${logPrefix} got issue in checking recipient condition. ${err}`)
155+
})
156+
}) // promise end
157+
}
158+
150159
module.exports = {
151160
checkBroadcastMessageForUser,
152161
}

src/hooks/hookBulkMessage.js

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async function checkBulkMessageForUser(userId) {
3737
let result = true
3838
if (tUserRefs < tBulkMessages) {
3939
logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`)
40-
result = await syncBulkMessageForUser(userId)
40+
result = await syncBulkMessageForUser(userId)
4141
}
4242
resolve(result) // resolve here
4343
}).catch((e) => {
@@ -102,23 +102,25 @@ async function isBroadCastMessageForUser(userId, bulkMessage) {
102102
}
103103

104104
/**
105-
* Helper function
105+
* Helper function - Insert in bulkMessage user reference table
106+
*
106107
* @param {Integer} userId
107108
* @param {Integer} bulkMessageId
108109
* @param {Integer} notificationId
109110
*/
110111
async function insertUserRefs(userId, bulkMessageId, notificationId) {
111-
return new Promise(function (resolve, reject) {
112-
models.BulkMessageUserRefs.create({
112+
try {
113+
const r = await models.BulkMessageUserRefs.create({
113114
bulk_message_id: bulkMessageId,
114115
user_id: userId,
115116
notification_id: notificationId,
116-
}).then((b) => {
117-
resolve(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`)
118-
}).catch((e) => {
119-
reject(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`)
120117
})
121-
})
118+
logger.info(`${logPrefix} Inserted userRef record for bulk message id ${r.id} for current user ${userId}`)
119+
return r
120+
} catch (e) {
121+
logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`)
122+
return e
123+
}
122124
}
123125

124126
/**
@@ -127,8 +129,8 @@ async function insertUserRefs(userId, bulkMessageId, notificationId) {
127129
* @param {Object} bulkMessage
128130
*/
129131
async function createNotificationForUser(userId, bulkMessage) {
130-
return new Promise(function (resolve, reject) {
131-
models.Notification.create({
132+
try {
133+
const n = await models.Notification.create({
132134
userId: userId,
133135
type: bulkMessage.type,
134136
contents: {
@@ -140,14 +142,15 @@ async function createNotificationForUser(userId, bulkMessage) {
140142
read: false,
141143
seen: false,
142144
version: null,
143-
}).then(async (n) => {
144-
logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
145-
const result = await insertUserRefs(userId, bulkMessage.id, n.id)
146-
resolve(result)
147-
}).catch((err) => {
148-
reject(`${logPrefix} Error in inserting broadcast message: ${err} `)
149145
})
150-
})
146+
logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
147+
// TODO need to be in transaction so that rollback will be possible
148+
const result = await insertUserRefs(userId, bulkMessage.id, n.id)
149+
return result
150+
} catch (e) {
151+
logger.error(`${logPrefix} Error in inserting broadcast message: ${err} `)
152+
return e
153+
}
151154
}
152155

153156

src/models/BulkMessages.js

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@
1111

1212

1313
module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', {
14-
id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
15-
type: { type: DataTypes.STRING, allowNull: false },
16-
message: { type: DataTypes.TEXT, allowNull: false },
17-
recipients: { type: DataTypes.JSONB, allowNull: false },
18-
rules: {type: DataTypes.JSONB, allowNull: true}
19-
}, {});
20-
14+
id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
15+
type: { type: DataTypes.STRING, allowNull: false },
16+
message: { type: DataTypes.TEXT, allowNull: false },
17+
recipients: { type: DataTypes.JSONB, allowNull: false },
18+
}, {});
19+
2120
// sequelize will generate and manage createdAt, updatedAt fields
22-
Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
/**
22
* Bulk notification handler.
33
*/
4-
const co = require('co');
5-
const models = require('../../models');
4+
const joi = require('joi')
5+
const co = require('co')
6+
const models = require('../../models')
67
const logger = require('../../common/logger')
78

89
/**
@@ -14,22 +15,38 @@ const logger = require('../../common/logger')
1415
* @return {Promise} promise resolved to notifications
1516
*/
1617
const handle = (message, ruleSets) => co(function* () {
17-
return new Promise(function(resolve, reject){
18-
models.BulkMessages.create({
19-
type: message.topic,
20-
message: message.payload.message,
21-
recipients: message.payload.recipients,
22-
rules: message.payload.rules || null,
23-
}).then((bm) => {
24-
logger.info("Broadcast message recieved and inserted in db with id:", bm.id)
25-
resolve([]) // no notification need to insert at this point
26-
}).catch((e) => {
27-
logger.error("Broadcast processor failed in db operation. Error: ", e)
28-
reject(e)
29-
})
30-
})
18+
try {
19+
const bm = yield models.BulkMessages.create({
20+
type: message.topic,
21+
message: message.payload.message,
22+
recipients: message.payload.recipients,
23+
})
24+
logger.info("Broadcast message recieved and inserted in db with id:", bm.id)
25+
} catch (e) {
26+
logger.error(`Broadcast processor failed in db operation. Error: ${e}`)
27+
}
28+
return [] // this point of time, send empty notification object
3129
});
3230

31+
/**
32+
* validate kafka payload
33+
*/
34+
handle.schema = {
35+
message: joi.object().keys({
36+
topic: joi.string().required(),
37+
originator: joi.string().required(),
38+
timestamp: joi.date().required(),
39+
'mime-type': joi.string().required(),
40+
payload: joi.object().keys({
41+
message: joi.string().required(),
42+
recipients: joi.object().required(),
43+
}),
44+
}).required(),
45+
ruleSets: joi.object(),
46+
};
47+
3348
module.exports = {
3449
handle,
35-
};
50+
};
51+
52+
logger.buildService(module.exports);

0 commit comments

Comments
 (0)