Skip to content

Commit

Permalink
feat(mojaloop/#3904): add position event timeout reserved batch handl…
Browse files Browse the repository at this point in the history
…ing (#1033)

* chore: add integration test for batch

* unit

* reenable

* chore: comments

* cleanup function

* remove

* unskip

* fix potential int test failures

* reorder

* fix replace
  • Loading branch information
kleyow authored May 17, 2024
1 parent 031e16c commit 0b6606a
Show file tree
Hide file tree
Showing 13 changed files with 840 additions and 97 deletions.
3 changes: 2 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
"BULK_PREPARE": null,
"COMMIT": null,
"BULK_COMMIT": null,
"RESERVE": null
"RESERVE": null,
"TIMEOUT_RESERVED": null
}
},
"TOPIC_TEMPLATES": {
Expand Down
53 changes: 17 additions & 36 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-logger": "11.3.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.4.0-snapshot.12",
"@mojaloop/central-services-stream": "11.2.5",
"@mojaloop/central-services-stream": "11.2.6",
"@mojaloop/database-lib": "11.0.5",
"@mojaloop/event-sdk": "14.0.2",
"@mojaloop/ml-number": "11.2.4",
Expand All @@ -108,7 +108,7 @@
"docdash": "2.0.2",
"event-stream": "4.0.1",
"five-bells-condition": "5.0.1",
"glob": "10.3.12",
"glob": "10.3.15",
"hapi-auth-basic": "5.0.0",
"hapi-auth-bearer-token": "8.0.0",
"hapi-swagger": "17.2.1",
Expand All @@ -124,7 +124,6 @@
"mysql": "2.18.1"
},
"devDependencies": {
"async-retry": "1.3.3",
"audit-ci": "^6.6.1",
"get-port": "5.1.1",
"jsdoc": "4.0.3",
Expand Down
32 changes: 30 additions & 2 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const PositionPrepareDomain = require('./prepare')
const PositionFxPrepareDomain = require('./fx-prepare')
const PositionFulfilDomain = require('./fulfil')
const PositionFxFulfilDomain = require('./fx-fulfil')
const PositionTimeoutReservedDomain = require('./timeout-reserved')
const SettlementModelCached = require('../../models/settlement/settlementModelCached')
const Enum = require('@mojaloop/central-services-shared').Enum
const ErrorHandler = require('@mojaloop/central-services-error-handling')
Expand Down Expand Up @@ -105,8 +106,15 @@ const processBins = async (bins, trx) => {
array2.every((element) => array1.includes(element))
// If non-prepare/non-commit action found, log error
// We need to remove this once we implement all the actions
if (!isSubset([Enum.Events.Event.Action.PREPARE, Enum.Events.Event.Action.FX_PREPARE, Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.RESERVE, Enum.Events.Event.Action.FX_RESERVE], actions)) {
Logger.isErrorEnabled && Logger.error('Only prepare/fx-prepare/commit actions are allowed in a batch')
if (!isSubset([
Enum.Events.Event.Action.PREPARE,
Enum.Events.Event.Action.FX_PREPARE,
Enum.Events.Event.Action.COMMIT,
Enum.Events.Event.Action.RESERVE,
Enum.Events.Event.Action.FX_RESERVE,
Enum.Events.Event.Action.TIMEOUT_RESERVED
], actions)) {
Logger.isErrorEnabled && Logger.error('Only prepare/fx-prepare/commit/reserve/timeout reserved actions are allowed in a batch')
}

const settlementParticipantPosition = positions[accountIdMap[accountID].settlementCurrencyId].value
Expand Down Expand Up @@ -165,6 +173,24 @@ const processBins = async (bins, trx) => {
notifyMessages = notifyMessages.concat(fulfilActionResult.notifyMessages)
followupMessages = followupMessages.concat(fulfilActionResult.followupMessages)

// If timeout-reserved action found then call processPositionTimeoutReserveBin function
const timeoutReservedActionResult = await PositionTimeoutReservedDomain.processPositionTimeoutReservedBin(
accountBin[Enum.Events.Event.Action.TIMEOUT_RESERVED],
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
latestTransferInfoByTransferId
)

// Update accumulated values
accumulatedPositionValue = timeoutReservedActionResult.accumulatedPositionValue
accumulatedPositionReservedValue = timeoutReservedActionResult.accumulatedPositionReservedValue
accumulatedTransferStates = timeoutReservedActionResult.accumulatedTransferStates
// Append accumulated arrays
accumulatedTransferStateChanges = accumulatedTransferStateChanges.concat(timeoutReservedActionResult.accumulatedTransferStateChanges)
accumulatedPositionChanges = accumulatedPositionChanges.concat(timeoutReservedActionResult.accumulatedPositionChanges)
notifyMessages = notifyMessages.concat(timeoutReservedActionResult.notifyMessages)

// If prepare action found then call processPositionPrepareBin function
const prepareActionResult = await PositionPrepareDomain.processPositionPrepareBin(
accountBin.prepare,
Expand Down Expand Up @@ -299,6 +325,8 @@ const _getTransferIdList = async (bins) => {
} else if (action === Enum.Events.Event.Action.RESERVE) {
transferIdList.push(item.message.value.content.uriParams.id)
reservedActionTransferIdList.push(item.message.value.content.uriParams.id)
} else if (action === Enum.Events.Event.Action.TIMEOUT_RESERVED) {
transferIdList.push(item.message.value.content.uriParams.id)
} else if (action === Enum.Events.Event.Action.FX_PREPARE) {
commitRequestIdList.push(item.decodedPayload.commitRequestId)
} else if (action === Enum.Events.Event.Action.FX_RESERVE) {
Expand Down
155 changes: 155 additions & 0 deletions src/domain/position/timeout-reserved.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
const { Enum } = require('@mojaloop/central-services-shared')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Config = require('../../lib/config')
const Utility = require('@mojaloop/central-services-shared').Util
const MLNumber = require('@mojaloop/ml-number')
const Logger = require('@mojaloop/central-services-logger')

/**
* @function processPositionTimeoutReservedBin
*
* @async
* @description This is the domain function to process a bin of timeout-reserved messages of a single participant account.
*
* @param {array} timeoutReservedBins - an array containing timeout-reserved action bins
* @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing
* @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency
* @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output.
* @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function.
* @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed
*/
const processPositionTimeoutReservedBin = async (
timeoutReservedBins,
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
transferInfoList
) => {
const transferStateChanges = []
const participantPositionChanges = []
const resultMessages = []
const accumulatedTransferStatesCopy = Object.assign({}, accumulatedTransferStates)
let runningPosition = new MLNumber(accumulatedPositionValue)
// Position action RESERVED_TIMEOUT event messages are keyed with payer account id.
// We need to revert the payer's position for the amount of the transfer.
// We need to notify the payee of the timeout.
if (timeoutReservedBins && timeoutReservedBins.length > 0) {
for (const binItem of timeoutReservedBins) {
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::binItem: ${JSON.stringify(binItem.message.value)}`)
const transferId = binItem.message.value.content.uriParams.id
const payeeFsp = binItem.message.value.to
const payerFsp = binItem.message.value.from

// If the transfer is not in `RESERVED_TIMEOUT`, a position timeout-reserved message was incorrectly published.
// i.e Something has gone extremely wrong.
if (accumulatedTransferStates[transferId] !== Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
throw ErrorHandler.Factory.createInternalServerFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR.message)
} else {
Logger.isDebugEnabled && Logger.debug(`accumulatedTransferStates: ${JSON.stringify(accumulatedTransferStates)}`)

const transferAmount = transferInfoList[transferId].amount

// Construct payee notification message
const resultMessage = _constructTimeoutReservedResultMessage(
binItem,
transferId,
payeeFsp,
payerFsp
)
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::resultMessage: ${JSON.stringify(resultMessage)}`)

// Revert payer's position for the amount of the transfer
const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, transferAmount, transferId, accumulatedPositionReservedValue)
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`)
runningPosition = updatedRunningPosition
binItem.result = { success: true }
participantPositionChanges.push(participantPositionChange)
transferStateChanges.push(transferStateChange)
accumulatedTransferStatesCopy[transferId] = transferStateId
resultMessages.push({ binItem, message: resultMessage })
}
}
}

return {
accumulatedPositionValue: runningPosition.toNumber(),
accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after fulfil processing
accumulatedPositionReservedValue, // not used but kept for consistency
accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order
accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order
notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message}
}
}

const _constructTimeoutReservedResultMessage = (binItem, transferId, payeeFsp, payerFsp) => {
// IMPORTANT: This singular message is taken by the ml-api-adapter and used to
// notify the payer and payee of the timeout.
// As long as the `to` and `from` message values are the payer and payee,
// and the action is `timeout-reserved`, the ml-api-adapter will notify both.
// Create a FSPIOPError object for timeout payee notification
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED,
null,
null,
null,
null
).toApiErrorObject(Config.ERROR_HANDLING)

const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)

// Create metadata for the message, associating the payee notification
// with the position event timeout-reserved action
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transferId,
Enum.Kafka.Topics.POSITION,
Enum.Events.Event.Action.TIMEOUT_RESERVED,
state
)
const resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payeeFsp,
payerFsp,
metadata,
binItem.message.value.content.headers, // Headers don't really matter here. ml-api-adapter will ignore them and create their own.
fspiopError,
{ id: transferId },
'application/json'
)

return resultMessage
}

const _handleParticipantPositionChange = (runningPosition, transferAmount, transferId, accumulatedPositionReservedValue) => {
// NOTE: The transfer info amount is pulled from the payee records in a batch `SELECT` query.
// And will have a negative value. We add that value to the payer's position
// to revert the position for the amount of the transfer.
const transferStateId = Enum.Transfers.TransferInternalState.EXPIRED_RESERVED
// Revert payer's position for the amount of the transfer
const updatedRunningPosition = new MLNumber(runningPosition.add(transferAmount).toFixed(Config.AMOUNT.SCALE))
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::_handleParticipantPositionChange::updatedRunningPosition: ${updatedRunningPosition.toString()}`)
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::_handleParticipantPositionChange::transferAmount: ${transferAmount}`)
// Construct participant position change object
const participantPositionChange = {
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
reservedValue: accumulatedPositionReservedValue
}

// Construct transfer state change object
const transferStateChange = {
transferId,
transferStateId,
reason: ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message
}
return { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition }
}

module.exports = {
processPositionTimeoutReservedBin
}
9 changes: 8 additions & 1 deletion src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,14 @@ const positions = async (error, messages) => {
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED, null, null, null, payload.extensionList)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail })
await Kafka.proceed(
Config.KAFKA_CONFIG,
params,
{
consumerCommit,
fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING),
eventDetail
})
throw fspiopError
}
} else {
Expand Down
Loading

0 comments on commit 0b6606a

Please sign in to comment.