Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Shapeup#4 CQRS standards update #125

Merged
merged 11 commits into from
Sep 20, 2021
Merged
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ workflows:
branches:
only:
- develop
- feature/shapeup4-cqrs-update
- feature/shapeup4-cqrs-update2

# Production builds are exectuted only on tagged commits to the
# master branch.
Expand Down
10 changes: 10 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ module.exports = {

BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',

TOPCODER_GROUP_API: process.env.TOPCODER_GROUP_API || 'https://api.topcoder-dev.com/v5/groups',

KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',

Expand Down Expand Up @@ -63,6 +65,14 @@ module.exports = {
password: process.env.ELASTICCLOUD_PASSWORD
},

USER_ACHIEVEMENT_PROPERTY_NAME: process.env.USER_ACHIEVEMENT_PROPERTY_NAME || 'achievements',
USER_EXTERNALPROFILE_PROPERTY_NAME: process.env.USER_EXTERNALPROFILE_PROPERTY_NAME || 'externalProfiles',
USER_ATTRIBUTE_PROPERTY_NAME: process.env.USER_ATTRIBUTE_PROPERTY_NAME || 'attributes',
USER_ROLE_PROPERTY_NAME: process.env.USER_ROLE_PROPERTY_NAME || 'roles',
USER_SKILL_PROPERTY_NAME: process.env.USER_SKILL_PROPERTY_NAME || 'skills',
USER_GROUP_PROPERTY_NAME: process.env.USER_GROUP_PROPERTY_NAME || 'groups',

ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders',
// es mapping: _index, _type, _id
DOCUMENTS: {
achievementprovider: {
Expand Down
272 changes: 255 additions & 17 deletions src/common/es-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ const helper = require('../common/helper')
const appConst = require('../consts')
const esClient = require('./es-client').getESClient()

const {
TopResources,
UserResources,
OrganizationResources
} = appConst

const DOCUMENTS = config.ES.DOCUMENTS
const RESOURCES = Object.keys(DOCUMENTS)

Expand Down Expand Up @@ -283,21 +289,213 @@ function escapeRegex (str) {
/* eslint-enable no-useless-escape */
}

/**
* Function to get user from es
* @param {String} userId
* @returns {Object} user
*/
async function getUser (userId) {
const { body: user } = await esClient.get({ index: TopResources.user.index, type: TopResources.user.type, id: userId })
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
}

/**
* Function to update es user
* @param {String} userId
* @param {Number} seqNo
* @param {Number} primaryTerm
* @param {Object} body
*/
async function updateUser (userId, body, seqNo, primaryTerm) {
try {
await esClient.index({
index: TopResources.user.index,
type: TopResources.user.type,
id: userId,
body,
if_seq_no: seqNo,
if_primary_term: primaryTerm,
pipeline: TopResources.user.ingest.pipeline.id,
refresh: 'wait_for'
})
logger.debug('Update user completed')
} catch (err) {
if (err && err.meta && err.meta.body && err.meta.body.error) {
logger.debug(JSON.stringify(err.meta.body.error, null, 4))
}
logger.debug(JSON.stringify(err))
throw err
}
}

/**
* Function to get org from es
* @param {String} organizationId
* @returns {Object} organization
*/
async function getOrg (organizationId) {
const { body: org } = await esClient.get({ index: TopResources.organization.index, type: TopResources.organization.type, id: organizationId })
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
}

/**
* Function to update es organization
* @param {String} organizationId
* @param {Number} seqNo
* @param {Number} primaryTerm
* @param {Object} body
*/
async function updateOrg (organizationId, body, seqNo, primaryTerm) {
await esClient.index({
index: TopResources.organization.index,
type: TopResources.organization.type,
id: organizationId,
body,
if_seq_no: seqNo,
if_primary_term: primaryTerm,
refresh: 'wait_for'
})
await esClient.enrich.executePolicy({ name: TopResources.organization.enrich.policyName })
}

/**
* Process create entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processCreate (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.index({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
body: entity,
refresh: 'true'
})
logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`)
if (_.includes(_.keys(TopResources), resource)) {
// process the top resources such as user, skill...
helper.validProperties(entity, ['id'])
await esClient.index({
index: TopResources[resource].index,
type: TopResources[resource].type,
id: entity.id,
body: _.omit(entity, ['resource', 'originalTopic']),
pipeline: TopResources[resource].ingest ? TopResources[resource].ingest.pipeline.id : undefined,
refresh: 'true'
})
if (TopResources[resource].enrich) {
await esClient.enrich.executePolicy({
name: TopResources[resource].enrich.policyName
})
}
} else if (_.includes(_.keys(UserResources), resource)) {
// process user resources such as userSkill, userAttribute...
const userResource = UserResources[resource]
userResource.validate(entity)
const { seqNo, primaryTerm, user } = await getUser(entity.userId)
const relateId = entity[userResource.relateKey]
if (!user[userResource.propertyName]) {
user[userResource.propertyName] = []
}

// import groups for a new user
if (resource === 'externalprofile' && entity.externalId) {
const userGroups = await helper.getUserGroup(entity.externalId)
user[config.get('ES.USER_GROUP_PROPERTY_NAME')] = _.unionBy(user[config.get('ES.USER_GROUP_PROPERTY_NAME')], userGroups, 'id')
}

// check the resource does not exist
if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId}`)
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
} else {
user[userResource.propertyName].push(entity)
await updateUser(entity.userId, user, seqNo, primaryTerm)
}
} else if (_.includes(_.keys(OrganizationResources), resource)) {
// process org resources such as org skill provider
const orgResources = OrganizationResources[resource]
orgResources.validate(entity)
const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
const relateId = entity[orgResources.relateKey]
if (!org[orgResources.propertyName]) {
org[orgResources.propertyName] = []
}

// check the resource does not exist
if (_.some(org[orgResources.propertyName], [orgResources.relateKey, relateId])) {
logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${entity.organizationId}`)
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
} else {
org[orgResources.propertyName].push(entity)
await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
}
} else {
logger.info(`Ignore this message since resource is not in [${_.union(_.values(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
}
}

/**
* Process update entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processUpdate (resource, entity) {
if (_.includes(_.keys(TopResources), resource)) {
logger.info(`Processing top level resource: ${resource}`)
// process the top resources such as user, skill...
helper.validProperties(entity, ['id'])
const { index, type } = TopResources[resource]
const id = entity.id
const { body: source } = await esClient.get({ index, type, id })
await esClient.index({
index,
type,
id,
body: _.assign(source._source, _.omit(entity, ['resource', 'originalTopic'])),
pipeline: TopResources[resource].ingest ? TopResources[resource].ingest.pipeline.id : undefined,
if_seq_no: source._seq_no,
if_primary_term: source._primary_term,
refresh: 'true'
})
if (TopResources[resource].enrich) {
await esClient.enrich.executePolicy({
name: TopResources[resource].enrich.policyName
})
}
} else if (_.includes(_.keys(UserResources), resource)) {
// process user resources such as userSkill, userAttribute...
const userResource = UserResources[resource]
const relateId = entity[userResource.relateKey]
logger.info(`Processing user level resource: ${resource}:${relateId}`)
userResource.validate(entity)
logger.info(`Resource validated for ${relateId}`)
const { seqNo, primaryTerm, user } = await getUser(entity.userId)
logger.info(`User fetched ${user.id} and ${relateId}`)

// check the resource exist
if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId} not exist`)
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
} else {
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
user[userResource.propertyName].splice(updateIndex, 1, entity)
logger.info(`Updating ${user.id} and ${relateId}`)
await updateUser(entity.userId, user, seqNo, primaryTerm)
logger.info(`Updated ${user.id} and ${relateId}`)
}
} else if (_.includes(_.keys(OrganizationResources), resource)) {
logger.info(`Processing org level resource: ${resource}`)
// process org resources such as org skill providers
const orgResource = OrganizationResources[resource]
orgResource.validate(entity)
const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
const relateId = entity[orgResource.relateKey]

// check the resource exist
if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) {
logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${entity.organizationId} not exist`)
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
} else {
const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
org[orgResource.propertyName].splice(updateIndex, 1, entity)
await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
}
} else {
logger.info(`Ignore this message since resource is not in [${_.union(_.values(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
}
}

/**
Expand All @@ -306,13 +504,53 @@ async function processCreate (resource, entity) {
* @param {Object} entity entity object
*/
async function processDelete (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.delete({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
refresh: 'wait_for'
})
if (_.includes(_.keys(TopResources), resource)) {
// process the top resources such as user, skill...
helper.validProperties(entity, ['id'])
await esClient.delete({
index: TopResources[resource].index,
type: TopResources[resource].type,
id: entity.id,
refresh: 'wait_for'
})
if (TopResources[resource].enrich) {
await esClient.enrich.executePolicy({
name: TopResources[resource].enrich.policyName
})
}
} else if (_.includes(_.keys(UserResources), resource)) {
// process user resources such as userSkill, userAttribute...
const userResource = UserResources[resource]
userResource.validate(entity)
const { seqNo, primaryTerm, user } = await getUser(entity.userId)
const relateId = entity[userResource.relateKey]

// check the resource exist
if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId} not exist`)
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
} else {
_.remove(user[userResource.propertyName], [userResource.relateKey, relateId])
await updateUser(entity.userId, user, seqNo, primaryTerm)
}
} else if (_.includes(_.keys(OrganizationResources), resource)) {
// process user resources such as org skill provider
const orgResource = OrganizationResources[resource]
orgResource.validate(entity)
const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
const relateId = entity[orgResource.relateKey]

// check the resource exist
if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) {
logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${entity.organizationId} not exist`)
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
} else {
_.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId])
await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
}
} else {
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
}
}

async function getOrganizationId (handle) {
Expand Down Expand Up @@ -1487,7 +1725,7 @@ async function searchAchievementValues ({ organizationId, keyword }) {

module.exports = {
processCreate,
processUpdate: processCreate,
processUpdate,
processDelete,
searchElasticSearch,
getFromElasticSearch,
Expand Down
Loading