Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/perf3.6 enhancements #175

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1fe920f
#395 Added the SEED for endpointType (#142)
shashi165 Sep 27, 2018
87c748d
Feature/461 Settlement Transfer (#150)
ggrg Sep 28, 2018
7f70832
Incrementing central ledger version (#152)
Oct 1, 2018
2d304e7
Bugfix/391 updating postman link in onboarding doc (#151)
Oct 1, 2018
244df6a
update code to remove transfer handler
rmothilal Oct 1, 2018
4c4aaa0
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 1, 2018
c9fa1aa
Feature/461 settlement transfer (#154)
ggrg Oct 2, 2018
43cf6b9
fixes for the timeout
rmothilal Oct 2, 2018
8ce854d
Updated populateTestData script to set endPoint Callback URLs for eac…
mdebarros Oct 2, 2018
3a68d27
validation for different FSPs added and tests are extended (#156)
vgenev Oct 2, 2018
2399d12
Story #484 - Added validation for negative values of NET_DEBIT_CAP (#…
shashi165 Oct 2, 2018
3eef19a
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 2, 2018
7b08b67
testPI2 -> test (#158)
vgenev Oct 3, 2018
f66f634
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 3, 2018
315ed4a
Merge branch 'feature/perf3.7_enhancements' of https://github.com/moj…
rmothilal Oct 3, 2018
d72547e
updated code
rmothilal Oct 3, 2018
3da656d
Fixed the listed occurrences of local timestamps (#157)
ggrg Oct 3, 2018
2f1e66c
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 3, 2018
ca44c88
Feature/461 Settlement Transfer - ParticipantFacade.adjustLimits (#160)
ggrg Oct 5, 2018
e80371b
Story #397 cleanup of unused files (#161)
shashi165 Oct 5, 2018
f529661
Bugfix/fix circleci build (#162)
rmothilal Oct 8, 2018
bdfb8cc
Bugfix/api not starting 08 10 (#163)
rmothilal Oct 8, 2018
1ab87d2
Feature/cleaning unsured code (#164)
rmothilal Oct 8, 2018
5216d17
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 9, 2018
ec9e851
added coverage tests (#165)
shashi165 Oct 9, 2018
55de7cd
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 11, 2018
993a69e
Merge branch 'feature/perf3.7_enhancements' of https://github.com/moj…
rmothilal Oct 11, 2018
7caf82f
Story #397 cleanup and coverage tests (#167)
shashi165 Oct 11, 2018
b4c5d68
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 11, 2018
e3197b4
Merge branch 'feature/perf3.7_enhancements' of https://github.com/moj…
rmothilal Oct 12, 2018
5bf141d
Feature/perf3.6 enhancements (#169)
rmothilal Oct 12, 2018
64c0cb6
fix for integration test
rmothilal Oct 12, 2018
370aa2f
Merge branch 'develop' into feature/perf3.6_enhancements
rmothilal Oct 12, 2018
af66ce8
Feature/perf3.6 enhancements (#172)
rmothilal Oct 12, 2018
9b235c2
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 12, 2018
c284973
Feature/509 Participant Accounts Endpoint (#170)
ggrg Oct 12, 2018
52a3567
Feature/425 Timeout Handler Tests (#171)
ggrg Oct 12, 2018
0163d52
Fixed integration tests teardown test (#173)
ggrg Oct 12, 2018
fcfb2d9
Merge branch 'develop' of https://github.com/mojaloop/central-ledger …
rmothilal Oct 15, 2018
23bcb76
refactor code to commit message to kafka after all processing is comp…
rmothilal Oct 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion seeds/ledgerAccountType.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const ledgerAccountTypes = [
'description': 'Reflects the individual DFSP Settlement Accounts as held at the Settlement Bank'
},
{
'name': 'HUB_SETTLEMENT',
'name': 'HUB_LIABILITY',
'description': 'A single account for each currency with which the hub operates. The account is "held" by the Participant representing the hub in the switch'
}
]
Expand Down
8 changes: 4 additions & 4 deletions seeds/ledgerEntryType.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ const ledgerEntryTypes = [
'description': 'Participant is settlement net sender'
},
{
'name': 'SETTLEMENT_ACCOUNT_DEPOSIT',
'description': 'Settlement account deposit'
'name': 'RECORD_FUNDS_IN',
'description': 'Settlement account funds in'
},
{
'name': 'SETTLEMENT_ACCOUNT_WITHDRAWAL',
'description': 'Settlement account withdrawal'
'name': 'RECORD_FUNDS_OUT',
'description': 'Settlement account funds out'
}
]

Expand Down
12 changes: 11 additions & 1 deletion src/admin/participants/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ const getPositions = async function (request, h) {
}
}

const getAccounts = async function (request, h) {
Sidecar.logRequest(request)
try {
return Participant.getAccounts(request.params.name, request.query)
} catch (err) {
throw Boom.badRequest()
}
}

module.exports = {
create,
getAll,
Expand All @@ -218,5 +227,6 @@ module.exports = {
addLimitAndInitialPosition,
getLimits,
adjustLimits,
getPositions
getPositions,
getAccounts
}
18 changes: 18 additions & 0 deletions src/admin/participants/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,5 +222,23 @@ module.exports = [
}
}
}
},
{
method: 'GET',
path: '/participants/{name}/accounts',
handler: Handler.getAccounts,
options: {
id: 'participants_accounts_get',
tags: tags,
description: 'View participant accounts balances',
validate: {
params: {
name: nameValidator
},
query: {
currency: currencyValidator
}
}
}
}
]
43 changes: 36 additions & 7 deletions src/domain/participant/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ const getParticipantCurrencyById = async (participantCurrencyId) => {
const destroyByName = async (name) => {
try {
let participant = await ParticipantModel.getByName(name)
await ParticipantLimitModel.destroyByParticipantId(participant.participantId)
await ParticipantPositionModel.destroyByParticipantId(participant.participantId)
await ParticipantCurrencyModel.destroyByParticipantId(participant.participantId)
return await ParticipantModel.destroyByName(name)
} catch (err) {
Expand Down Expand Up @@ -254,16 +256,18 @@ const addLimitAndInitialPosition = async (participantName, limitAndInitialPositi
try {
const participant = await ParticipantFacade.getByNameAndCurrency(participantName, limitAndInitialPositionObj.currency, Enum.LedgerAccountType.POSITION)
participantExists(participant)
const settlementAccount = await ParticipantFacade.getByNameAndCurrency(participantName, limitAndInitialPositionObj.currency, Enum.LedgerAccountType.SETTLEMENT)
const existingLimit = await ParticipantLimitModel.getByParticipantCurrencyId(participant.participantCurrencyId)
const existingPosition = await ParticipantPositionModel.getByParticipantCurrencyId(participant.participantCurrencyId)
if (existingLimit || existingPosition) {
const existingSettlementPosition = await ParticipantPositionModel.getByParticipantCurrencyId(settlementAccount.participantCurrencyId)
if (existingLimit || existingPosition || existingSettlementPosition) {
throw new Error('Participant Limit or Initial Position already set')
}
let limitAndInitialPosition = limitAndInitialPositionObj
if (limitAndInitialPosition.initialPosition == null) {
limitAndInitialPosition.initialPosition = Config.PARTICIPANT_INITIAL_POSTITION
}
return ParticipantFacade.addLimitAndInitialPosition(participant.participantCurrencyId, limitAndInitialPosition)
return ParticipantFacade.addLimitAndInitialPosition(participant.participantCurrencyId, settlementAccount.participantCurrencyId, limitAndInitialPosition)
} catch (err) {
throw err
}
Expand Down Expand Up @@ -468,27 +472,27 @@ const getPositions = async (name, query) => {
if (query.currency) {
const participant = await ParticipantFacade.getByNameAndCurrency(name, query.currency, Enum.LedgerAccountType.POSITION)
participantExists(participant)
const result = await PositionFacade.getByNameAndCurrency(name, query.currency, Enum.LedgerAccountType.POSITION)
const result = await PositionFacade.getByNameAndCurrency(name, Enum.LedgerAccountType.POSITION, query.currency, Enum.LedgerAccountType.POSITION)
let position = {}
if (Array.isArray(result) && result.length > 0) {
position = {
currency: result[0].currencyId,
value: result[0].value,
updatedTime: result[0].changedDate
changedDate: result[0].changedDate
}
}
return position
} else {
const participant = await ParticipantModel.getByName(name)
participantExists(participant)
const result = await await PositionFacade.getByNameAndCurrency(name, null, Enum.LedgerAccountType.POSITION)
const result = await await PositionFacade.getByNameAndCurrency(name, Enum.LedgerAccountType.POSITION)
let positions = []
if (Array.isArray(result) && result.length > 0) {
result.forEach(item => {
positions.push({
currency: item.currencyId,
value: item.value,
updatedTime: item.changedDate
changedDate: item.changedDate
})
})
}
Expand All @@ -499,6 +503,30 @@ const getPositions = async (name, query) => {
}
}

const getAccounts = async (name, query) => {
try {
const participant = await ParticipantModel.getByName(name)
participantExists(participant)
const result = await PositionFacade.getAllByNameAndCurrency(name, query.currency)
let positions = []
if (Array.isArray(result) && result.length > 0) {
result.forEach(item => {
positions.push({
id: item.participantCurrencyId,
ledgerAccountType: item.ledgerAccountType,
currency: item.currencyId,
value: item.value,
reservedValue: item.reservedValue,
changedDate: item.changedDate
})
})
}
return positions
} catch (err) {
throw err
}
}

module.exports = {
create,
getAll,
Expand All @@ -520,5 +548,6 @@ module.exports = {
destroyParticipantLimitByNameAndCurrency,
getLimits,
adjustLimits,
getPositions
getPositions,
getAccounts
}
78 changes: 13 additions & 65 deletions src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,20 @@ const positions = async (error, messages) => {
message = Object.assign({}, messages)
}
Logger.info('PositionHandler::positions')
let consumer = {}
let consumer
let kafkaTopic
const payload = message.value.content && message.value.content.payload || {}
payload.transferId = message.value.id
kafkaTopic = message.topic
try {
consumer = Kafka.Consumer.getConsumer(kafkaTopic)
} catch (e) {
Logger.info(`No consumer found for topic ${kafkaTopic}`)
Logger.error(e)
return true
}
if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.PREPARE) {
Logger.info('PositionHandler::positions::prepare')
kafkaTopic = message.topic
try {
consumer = Kafka.Consumer.getConsumer(kafkaTopic)
} catch (e) {
Logger.info(`No consumer found for topic ${kafkaTopic}`)
Logger.error(e)
return true
}
const {preparedMessagesList, limitAlarms} = await PositionService.calculatePreparePositionsBatch(prepareBatch)
for (let prepareMessage of preparedMessagesList) {
const {transferState, rawMessage} = prepareMessage
Expand All @@ -111,12 +111,6 @@ const positions = async (error, messages) => {
return true
} else if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.COMMIT) {
Logger.info('PositionHandler::positions::commit')
kafkaTopic = message.topic
consumer = Kafka.Consumer.getConsumer(kafkaTopic)
if (!consumer) {
Logger.info(`No consumer found for topic ${kafkaTopic}`)
return true
}
// Check current transfer state
const transferInfo = await TransferService.getTransferInfoToChangePosition(payload.transferId, Enum.TransferParticipantRoleType.PAYEE_DFSP, Enum.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== TransferState.RECEIVED_FULFIL) {
Expand All @@ -131,22 +125,10 @@ const positions = async (error, messages) => {
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
}
if (!Kafka.Consumer.isConsumerAutoCommitEnabled(kafkaTopic)) {
await consumer.commitMessageSync(message)
}
// Will follow framework flow in future
await Utility.produceGeneralMessage(TransferEventType.NOTIFICATION, TransferEventAction.COMMIT, message.value, Utility.ENUMS.STATE.SUCCESS)
return true
} else if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.REJECT) {
Logger.info('PositionHandler::positions::reject')
kafkaTopic = message.topic
try {
consumer = Kafka.Consumer.getConsumer(kafkaTopic)
} catch (e) {
Logger.info(`No consumer found for topic ${kafkaTopic}`)
Logger.error(e)
return true
}
const transferInfo = await TransferService.getTransferInfoToChangePosition(payload.transferId, Enum.TransferParticipantRoleType.PAYER_DFSP, Enum.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== TransferState.REJECTED) {
Logger.info('PositionHandler::positions::reject::validationFailed::notRejectedState')
Expand All @@ -161,18 +143,13 @@ const positions = async (error, messages) => {
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
}
if (!Kafka.Consumer.isConsumerAutoCommitEnabled(kafkaTopic)) {
await consumer.commitMessageSync(message)
}
// Will follow framework flow in future
await Utility.produceGeneralMessage(TransferEventType.NOTIFICATION, TransferEventAction.REJECT, message.value, Utility.ENUMS.STATE.SUCCESS)
return true
} else if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.TIMEOUT_RESERVED) {
Logger.info('PositionHandler::positions::timeout')
const transferInfo = await TransferService.getTransferInfoToChangePosition(payload.transferId, Enum.TransferParticipantRoleType.PAYER_DFSP, Enum.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== TransferState.RESERVED_TIMEOUT) {
Logger.info('PositionHandler::positions::commit::validationFailed::notReceivedFulfilState')
// throw Error 2001
throw new Error('Internal server error')
} else { // transfer state check success
const isReversal = true
Expand All @@ -185,44 +162,15 @@ const positions = async (error, messages) => {
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
let newMessage = Object.assign({}, message)
newMessage.value.content.payload = Utility.createPrepareErrorStatus(3303, reason, newMessage.value.content.payload.extensionList)
kafkaTopic = message.topic
try {
consumer = Kafka.Consumer.getConsumer(kafkaTopic)
} catch (e) {
Logger.info(`No consumer found for topic ${kafkaTopic}`)
Logger.error(e)
return true
}
if (!Kafka.Consumer.isConsumerAutoCommitEnabled(kafkaTopic)) {
await consumer.commitMessageSync(message)
}
await Utility.produceGeneralMessage(TransferEventType.NOTIFICATION, TransferEventAction.ABORT, newMessage.value, Utility.createState(Utility.ENUMS.STATE.FAILURE.status, 4001, transferStateChange.reason))
return true
}
// TODO: Need to understand the purpose of this branch.
// } else if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.FAIL) {
// Logger.info('PositionHandler::positions::fail')
// kafkaTopic = Utility.transformAccountToTopicName(message.value.from, TransferEventType.POSITION, TransferEventAction.ABORT)
// consumer = Kafka.Consumer.getConsumer(kafkaTopic)
// if (!Kafka.Consumer.isConsumerAutoCommitEnabled(kafkaTopic)) {
// await consumer.commitMessageSync(message)
// }
// throw new Error('Position Fail messaged received - What do we do here??')
} else {
Logger.info('PositionHandler::positions::invalidEventTypeOrAction')
kafkaTopic = message.topic
try {
consumer = Kafka.Consumer.getConsumer(kafkaTopic)
} catch (e) {
Logger.info(`No consumer found for topic ${kafkaTopic}`)
Logger.error(e)
return true
}
if (!Kafka.Consumer.isConsumerAutoCommitEnabled(kafkaTopic)) {
await consumer.commitMessageSync(message)
}
throw new Error('Event type or action is invalid')
}
if (!Kafka.Consumer.isConsumerAutoCommitEnabled(kafkaTopic)) {
await consumer.commitMessageSync(message)
}
return true
} catch (error) {
Logger.error(error)
throw error
Expand Down
6 changes: 4 additions & 2 deletions src/handlers/timeouts/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ const isRunning = async () => {
*/
const stop = async () => {
if (isRegistered) {
await timeoutJob.destroy()
await timeoutJob.stop()
isRegistered = undefined
}
}
Expand Down Expand Up @@ -183,7 +183,9 @@ const registerAllHandlers = async () => {
}

module.exports = {
timeout,
registerAllHandlers,
registerTimeoutHandler,
isRunning
isRunning,
stop
}
Loading