Skip to content

Commit

Permalink
Feature/#1334 patch request notif (#751)
Browse files Browse the repository at this point in the history
* added handling of request for notification by payee functionality
* improved coverage and added missing action letter

Co-authored-by: Valentin <valentin.genev@modusbox.com>
  • Loading branch information
vgenev and Valentin authored Jun 30, 2020
1 parent d63bc73 commit 5ad8b83
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 43 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "10.5.1",
"version": "10.5.2",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -86,7 +86,7 @@
"@mojaloop/central-services-health": "10.4.0",
"@mojaloop/central-services-logger": "10.4.0",
"@mojaloop/central-services-metrics": "9.5.0",
"@mojaloop/central-services-shared": "10.5.1",
"@mojaloop/central-services-shared": "10.5.2",
"@mojaloop/central-services-stream": "9.5.0",
"@mojaloop/event-sdk": "10.4.0",
"@mojaloop/forensic-logging-client": "8.3.0",
Expand Down
26 changes: 16 additions & 10 deletions src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
const Logger = require('@mojaloop/central-services-logger')
const EventSdk = require('@mojaloop/event-sdk')
const TransferService = require('../../domain/transfer')
const TransferObjectTransform = require('../../domain/transfer/transform')
const PositionService = require('../../domain/position')
const Utility = require('@mojaloop/central-services-shared').Util
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
Expand Down Expand Up @@ -115,15 +116,16 @@ const positions = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, { method: 'positions' }))

const actionLetter = action === Enum.Events.Event.Action.PREPARE ? Enum.Events.ActionLetter.prepare
: (action === Enum.Events.Event.Action.COMMIT ? Enum.Events.ActionLetter.commit
: (action === Enum.Events.Event.Action.REJECT ? Enum.Events.ActionLetter.reject
: (action === Enum.Events.Event.Action.ABORT ? Enum.Events.ActionLetter.abort
: (action === Enum.Events.Event.Action.TIMEOUT_RESERVED ? Enum.Events.ActionLetter.timeout
: (action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare
: (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved
: (action === Enum.Events.Event.Action.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort
: Enum.Events.ActionLetter.unknown))))))))
: (action === Enum.Events.Event.Action.RESERVE ? Enum.Events.ActionLetter.reserve
: (action === Enum.Events.Event.Action.COMMIT ? Enum.Events.ActionLetter.commit
: (action === Enum.Events.Event.Action.REJECT ? Enum.Events.ActionLetter.reject
: (action === Enum.Events.Event.Action.ABORT ? Enum.Events.ActionLetter.abort
: (action === Enum.Events.Event.Action.TIMEOUT_RESERVED ? Enum.Events.ActionLetter.timeout
: (action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare
: (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved
: (action === Enum.Events.Event.Action.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort
: Enum.Events.ActionLetter.unknown)))))))))
const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer }
const eventDetail = { action }
if (![Enum.Events.Event.Action.BULK_PREPARE, Enum.Events.Event.Action.BULK_COMMIT, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED, Enum.Events.Event.Action.BULK_ABORT].includes(action)) {
Expand Down Expand Up @@ -156,7 +158,7 @@ const positions = async (error, messages) => {
throw fspiopError
}
}
} else if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.BULK_COMMIT].includes(action)) {
} else if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.RESERVE, Enum.Events.Event.Action.BULK_COMMIT].includes(action)) {
Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, { path: 'commit' }))
const transferInfo = await TransferService.getTransferInfoToChangePosition(transferId, Enum.Accounts.TransferParticipantRoleType.PAYEE_DFSP, Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) {
Expand All @@ -172,6 +174,10 @@ const positions = async (error, messages) => {
transferStateId: Enum.Transfers.TransferState.COMMITTED
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
if (action === Enum.Events.Event.Action.RESERVE) {
const transfer = await TransferService.getById(transferInfo.transferId)
message.value.content.payload = TransferObjectTransform.toFulfil(transfer)
}
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId, action })
return true
Expand Down
56 changes: 29 additions & 27 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,17 +274,19 @@ const fulfil = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, { method: `fulfil:${action}` }))

const actionLetter = action === TransferEventAction.COMMIT ? Enum.Events.ActionLetter.commit
: (action === TransferEventAction.REJECT ? Enum.Events.ActionLetter.reject
: (action === TransferEventAction.ABORT ? Enum.Events.ActionLetter.abort
: (action === TransferEventAction.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === TransferEventAction.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort
: Enum.Events.ActionLetter.unknown))))
: (action === TransferEventAction.RESERVE ? Enum.Events.ActionLetter.reserve
: (action === TransferEventAction.REJECT ? Enum.Events.ActionLetter.reject
: (action === TransferEventAction.ABORT ? Enum.Events.ActionLetter.abort
: (action === TransferEventAction.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === TransferEventAction.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort
: Enum.Events.ActionLetter.unknown)))))
const functionality = action === TransferEventAction.COMMIT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.REJECT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.ABORT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.BULK_COMMIT ? TransferEventType.BULK_PROCESSING
: (action === TransferEventAction.BULK_ABORT ? TransferEventType.BULK_PROCESSING
: Enum.Events.ActionLetter.unknown))))
: (action === TransferEventAction.RESERVE ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.REJECT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.ABORT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.BULK_COMMIT ? TransferEventType.BULK_PROCESSING
: (action === TransferEventAction.BULK_ABORT ? TransferEventType.BULK_PROCESSING
: Enum.Events.ActionLetter.unknown)))))
// fulfil-specific declarations
const isTransferError = action === TransferEventAction.ABORT
const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer }
Expand Down Expand Up @@ -344,22 +346,22 @@ const fulfil = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, 'handleResend'))
if (transferStateEnum === TransferState.COMMITTED || transferStateEnum === TransferState.ABORTED) {
message.value.content.payload = TransferObjectTransform.toFulfil(transfer)
if (!isTransferError) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized2--${actionLetter}3`))
const eventDetail = { functionality, action: TransferEventAction.FULFIL_DUPLICATE }
/**
* HOWTO: During bulk fulfil use an individualTransfer from a previous bulk fulfil
*/
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized3--${actionLetter}4`))
const eventDetail = { functionality, action: TransferEventAction.ABORT_DUPLICATE }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
const eventDetail = { functionality, action }
if (action !== TransferEventAction.RESERVE) {
if (!isTransferError) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized2--${actionLetter}3`))
eventDetail.action = TransferEventAction.FULFIL_DUPLICATE
/**
* HOWTO: During bulk fulfil use an individualTransfer from a previous bulk fulfil
*/
} else {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized3--${actionLetter}4`))
eventDetail.action = TransferEventAction.ABORT_DUPLICATE
}
}
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else if (transferStateEnum === TransferState.RECEIVED || transferStateEnum === TransferState.RESERVED) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `inProgress2--${actionLetter}5`))
/**
Expand Down Expand Up @@ -401,7 +403,7 @@ const fulfil = async (error, messages) => {
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
} else { // !hasDuplicateId
if (type === TransferEventType.FULFIL && [TransferEventAction.COMMIT, TransferEventAction.REJECT, TransferEventAction.ABORT, TransferEventAction.BULK_COMMIT, TransferEventAction.BULK_ABORT].includes(action)) {
if (type === TransferEventType.FULFIL && [TransferEventAction.COMMIT, TransferEventAction.RESERVE, TransferEventAction.REJECT, TransferEventAction.ABORT, TransferEventAction.BULK_COMMIT, TransferEventAction.BULK_ABORT].includes(action)) {
Util.breadcrumb(location, { path: 'validationCheck' })
if (payload.fulfilment && !Validator.validateFulfilCondition(payload.fulfilment, transfer.condition)) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInvalidFulfilment--${actionLetter}9`))
Expand Down Expand Up @@ -434,7 +436,7 @@ const fulfil = async (error, messages) => {
throw fspiopError
} else { // validations success
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, { path: 'validationPassed' }))
if ([TransferEventAction.COMMIT, TransferEventAction.BULK_COMMIT].includes(action)) {
if ([TransferEventAction.COMMIT, TransferEventAction.RESERVE, TransferEventAction.BULK_COMMIT].includes(action)) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `positionTopic2--${actionLetter}12`))
await TransferService.handlePayeeResponse(transferId, payload, action)
const eventDetail = { functionality: TransferEventType.POSITION, action }
Expand Down
1 change: 1 addition & 0 deletions src/models/transfer/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ const savePayeeTransferResponse = async (transferId, payload, action, fspiopErro
switch (action) {
case TransferEventAction.COMMIT:
case TransferEventAction.BULK_COMMIT:
case TransferEventAction.RESERVE:
state = TransferInternalState.RECEIVED_FULFIL
extensionList = payload.extensionList
isFulfilment = true
Expand Down
24 changes: 24 additions & 0 deletions test/unit/handlers/transfers/handler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,30 @@ Test('Transfer handler', transferHandlerTest => {
test.end()
})

fulfilTest.test('produce message to position topic when validations pass and action is RESERVE', async (test) => {
const localfulfilMessages = MainUtil.clone(fulfilMessages)
localfulfilMessages[0].value.metadata.event.action = 'reserve'
await Consumer.createHandler(topicName, config, command)
Kafka.transformGeneralTopicName.returns(topicName)
TransferService.getById.returns(Promise.resolve({ condition: 'condition', payeeFsp: 'dfsp2', transferState: TransferState.RESERVED }))
ilp.update.returns(Promise.resolve())
Validator.validateFulfilCondition.returns(true)
localfulfilMessages[0].value.content.headers['fspiop-source'] = 'dfsp2'
localfulfilMessages[0].value.content.payload.fulfilment = 'condition'
Kafka.proceed.returns(true)

TransferService.getTransferDuplicateCheck.returns(Promise.resolve(null))
TransferService.saveTransferDuplicateCheck.returns(Promise.resolve(null))
Comparators.duplicateCheckComparator.withArgs(transfer.transferId, localfulfilMessages[0].value.content.payload).returns(Promise.resolve({
hasDuplicateId: false,
hasDuplicateHash: false
}))

const result = await allTransferHandlers.fulfil(null, localfulfilMessages)
test.equal(result, true)
test.end()
})

fulfilTest.test('produce message to position topic when BULK_COMMIT validations pass', async (test) => {
const localfulfilMessages = MainUtil.clone(fulfilMessages)
await Consumer.createHandler(topicName, config, command)
Expand Down

0 comments on commit 5ad8b83

Please sign in to comment.