Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mojaloop/#3904): add position event timeout reserved batch handling #1033

Merged
merged 10 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
"BULK_PREPARE": null,
"COMMIT": null,
"BULK_COMMIT": null,
"RESERVE": null
"RESERVE": null,
"TIMEOUT_RESERVED": null
}
},
"TOPIC_TEMPLATES": {
Expand Down
26 changes: 13 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
"docdash": "2.0.2",
"event-stream": "4.0.1",
"five-bells-condition": "5.0.1",
"glob": "10.3.12",
"glob": "10.3.15",
"hapi-auth-basic": "5.0.0",
"hapi-auth-bearer-token": "8.0.0",
"hapi-swagger": "17.2.1",
Expand Down
37 changes: 35 additions & 2 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const PositionPrepareDomain = require('./prepare')
const PositionFxPrepareDomain = require('./fx-prepare')
const PositionFulfilDomain = require('./fulfil')
const PositionFxFulfilDomain = require('./fx-fulfil')
const PositionTimeoutReservedDomain = require('./timeout-reserved')
const SettlementModelCached = require('../../models/settlement/settlementModelCached')
const Enum = require('@mojaloop/central-services-shared').Enum
const ErrorHandler = require('@mojaloop/central-services-error-handling')
Expand Down Expand Up @@ -105,8 +106,15 @@ const processBins = async (bins, trx) => {
array2.every((element) => array1.includes(element))
// If non-prepare/non-commit action found, log error
// We need to remove this once we implement all the actions
if (!isSubset([Enum.Events.Event.Action.PREPARE, Enum.Events.Event.Action.FX_PREPARE, Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.RESERVE, Enum.Events.Event.Action.FX_RESERVE], actions)) {
Logger.isErrorEnabled && Logger.error('Only prepare/fx-prepare/commit actions are allowed in a batch')
if (!isSubset([
Enum.Events.Event.Action.PREPARE,
Enum.Events.Event.Action.FX_PREPARE,
Enum.Events.Event.Action.COMMIT,
Enum.Events.Event.Action.RESERVE,
Enum.Events.Event.Action.FX_RESERVE,
Enum.Events.Event.Action.TIMEOUT_RESERVED
vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
], actions)) {
Logger.isErrorEnabled && Logger.error('Only prepare/fx-prepare/commit/reserve/timeout reserved actions are allowed in a batch')
}

const settlementParticipantPosition = positions[accountIdMap[accountID].settlementCurrencyId].value
Expand Down Expand Up @@ -142,6 +150,29 @@ const processBins = async (bins, trx) => {
accumulatedFxTransferStateChanges = accumulatedFxTransferStateChanges.concat(fxFulfilActionResult.accumulatedFxTransferStateChanges)
notifyMessages = notifyMessages.concat(fxFulfilActionResult.notifyMessages)

// If timeout-reserved action found then call processPositionTimeoutReserveBin function
const timeoutReservedActionResult = await PositionTimeoutReservedDomain.processPositionTimeoutReservedBin(
accountBin[Enum.Events.Event.Action.TIMEOUT_RESERVED],
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
accumulatedFxTransferStates,
latestTransferInfoByTransferId,
reservedActionTransfers
)

// Update accumulated values
accumulatedPositionValue = timeoutReservedActionResult.accumulatedPositionValue
accumulatedPositionReservedValue = timeoutReservedActionResult.accumulatedPositionReservedValue
accumulatedTransferStates = timeoutReservedActionResult.accumulatedTransferStates
accumulatedFxTransferStates = timeoutReservedActionResult.accumulatedFxTransferStates
// Append accumulated arrays
accumulatedTransferStateChanges = accumulatedTransferStateChanges.concat(timeoutReservedActionResult.accumulatedTransferStateChanges)
accumulatedFxTransferStateChanges = accumulatedFxTransferStateChanges.concat(timeoutReservedActionResult.accumulatedFxTransferStateChanges)
accumulatedPositionChanges = accumulatedPositionChanges.concat(timeoutReservedActionResult.accumulatedPositionChanges)
notifyMessages = notifyMessages.concat(timeoutReservedActionResult.notifyMessages)
followupMessages = followupMessages.concat(timeoutReservedActionResult.followupMessages)

// If fulfil action found then call processPositionPrepareBin function
const fulfilActionResult = await PositionFulfilDomain.processPositionFulfilBin(
[accountBin.commit, accountBin.reserve],
Expand Down Expand Up @@ -299,6 +330,8 @@ const _getTransferIdList = async (bins) => {
} else if (action === Enum.Events.Event.Action.RESERVE) {
transferIdList.push(item.message.value.content.uriParams.id)
reservedActionTransferIdList.push(item.message.value.content.uriParams.id)
} else if (action === Enum.Events.Event.Action.TIMEOUT_RESERVED) {
transferIdList.push(item.message.value.content.uriParams.id)
} else if (action === Enum.Events.Event.Action.FX_PREPARE) {
commitRequestIdList.push(item.decodedPayload.commitRequestId)
} else if (action === Enum.Events.Event.Action.FX_RESERVE) {
Expand Down
169 changes: 169 additions & 0 deletions src/domain/position/timeout-reserved.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
const { Enum } = require('@mojaloop/central-services-shared')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Config = require('../../lib/config')
const Utility = require('@mojaloop/central-services-shared').Util
const MLNumber = require('@mojaloop/ml-number')
const Logger = require('@mojaloop/central-services-logger')

/**
* @function processPositionTimeoutReservedBin
*
* @async
* @description This is the domain function to process a bin of timeout-reserved messages of a single participant account.
*
* @param {array} timeoutReservedBins - an array containing timeout-reserved action bins
* @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing
* @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency
* @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output.
* @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function.
* @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed
*/
const processPositionTimeoutReservedBin = async (
timeoutReservedBins,
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
accumulatedFxTransferStates,
transferInfoList,
reservedActionTransfers
) => {
const transferStateChanges = []
const fxTransferStateChanges = []
const participantPositionChanges = []
const resultMessages = []
const followupMessages = []
const accumulatedTransferStatesCopy = Object.assign({}, accumulatedTransferStates)
const accumulatedFxTransferStatesCopy = Object.assign({}, accumulatedFxTransferStates)
let runningPosition = new MLNumber(accumulatedPositionValue)
// Position action RESERVED_TIMEOUT event messages are keyed with payer account id.
// We need to revert the payer's position for the amount of the transfer.
// We need to notify the payee of the timeout.
if (timeoutReservedBins && timeoutReservedBins.length > 0) {
for (const binItem of timeoutReservedBins) {
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::binItem: ${JSON.stringify(binItem.message.value)}`)
const transferId = binItem.message.value.content.uriParams.id
const payeeFsp = binItem.message.value.to
const payerFsp = binItem.message.value.from

// If the transfer is not in `RESERVED_TIMEOUT`, a position timeout-reserved message was incorrectly published.
// i.e Something has gone extremely wrong.
if (accumulatedTransferStates[transferId] !== Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
throw ErrorHandler.Factory.createInternalServerFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR.message)
} else {
Logger.isDebugEnabled && Logger.debug(`accumulatedTransferStates: ${JSON.stringify(accumulatedTransferStates)}`)

const transferAmount = transferInfoList[transferId].amount

// Construct payee notification message
const resultMessage = _constructTimeoutReservedResultMessage(
binItem,
transferId,
payeeFsp,
payerFsp
)
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::resultMessage: ${JSON.stringify(resultMessage)}`)

// Revert payer's position for the amount of the transfer
const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } =
_handleParticipantPositionChange(runningPosition, transferAmount, transferId, accumulatedPositionReservedValue)
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`)
runningPosition = updatedRunningPosition
binItem.result = { success: true }
participantPositionChanges.push(participantPositionChange)
transferStateChanges.push(transferStateChange)
accumulatedTransferStatesCopy[transferId] = transferStateId
resultMessages.push({ binItem, message: resultMessage })
}
}
}

return {
accumulatedPositionValue: runningPosition.toNumber(),
accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after fulfil processing
accumulatedFxTransferStates: accumulatedFxTransferStatesCopy, // finalized transfer state after fx fulfil processing
accumulatedPositionReservedValue, // not used but kept for consistency
accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order
accumulatedFxTransferStateChanges: fxTransferStateChanges, // fx-transfer state changes to be persisted in order
accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order
notifyMessages: resultMessages, // array of objects containing bin item and result message. {binItem, message}
followupMessages // array of objects containing bin item, message key and followup message. {binItem, messageKey, message}
}
}

const _constructTimeoutReservedResultMessage = (binItem, transferId, payeeFsp, payerFsp) => {
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = payeeFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = payerFsp
delete headers['content-length']

// Create a FSPIOPError object for timeout payee notification
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED,
null,
null,
null,
null
).toApiErrorObject(Config.ERROR_HANDLING)

const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)

// Create metadata for the message, associating the payee notification
// with the position event timeout-reserved action
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transferId,
Enum.Kafka.Topics.POSITION,
Enum.Events.Event.Action.TIMEOUT_RESERVED,
state
)
const resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payeeFsp,
payerFsp,
metadata,
headers, // Unsure what headers need to be passed here
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 couldn't tell from the non batch code what the headers for this are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ml-api-adapter deals with notifying the payee and payer, so headers don't matter and the adapter will construct the proper headers!
https://github.com/mojaloop/ml-api-adapter/blob/main/src/handlers/notification/index.js#L545

As long as the to and from are the payer and payee, which they have been and the action is timeout-reserved ml adapter will notify both.

fspiopError,
{ id: transferId },
'application/json'
)

return resultMessage
}

const _handleParticipantPositionChange = (runningPosition, transferAmount, transferId, accumulatedPositionReservedValue) => {
const transferStateId = Enum.Transfers.TransferInternalState.EXPIRED_RESERVED
// Revert payer's position for the amount of the transfer
// NOTE: We are pulling the transferAmount using
// const latestTransferInfoByTransferId = await BatchPositionModel.getTransferInfoList(
// trx,
// transferIdList,
// Enum.Accounts.TransferParticipantRoleType.PAYEE_DFSP,
// Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE
// )
// Is it safe using this value of the payee even accounting for the negative value?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After dicussions this value is safe to use to revert the payer's position.

const updatedRunningPosition = new MLNumber(runningPosition.add(transferAmount).toFixed(Config.AMOUNT.SCALE))
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::_handleParticipantPositionChange::updatedRunningPosition: ${updatedRunningPosition.toString()}`)
Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::_handleParticipantPositionChange::transferAmount: ${transferAmount}`)
// Construct participant position change object
const participantPositionChange = {
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
reservedValue: accumulatedPositionReservedValue
}

// Construct transfer state change object
const transferStateChange = {
transferId,
transferStateId,
reason: ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message
}
return { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition }
}

module.exports = {
processPositionTimeoutReservedBin
}
9 changes: 8 additions & 1 deletion src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,14 @@ const positions = async (error, messages) => {
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED, null, null, null, payload.extensionList)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail })
await Kafka.proceed(
Config.KAFKA_CONFIG,
params,
{
consumerCommit,
fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING),
eventDetail
})
throw fspiopError
}
} else {
Expand Down
12 changes: 11 additions & 1 deletion src/handlers/timeouts/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,17 @@ const timeout = async () => {
message.metadata.event.type = Enum.Events.Event.Type.POSITION
message.metadata.event.action = Enum.Events.Event.Action.TIMEOUT_RESERVED
// Key position timeouts with payer account id
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, Enum.Events.Event.Action.TIMEOUT_RESERVED, message, state, result[i].payerParticipantCurrencyId?.toString(), span)
await Kafka.produceGeneralMessage(
Config.KAFKA_CONFIG,
Producer,
Enum.Kafka.Topics.POSITION,
Enum.Events.Event.Action.TIMEOUT_RESERVED,
message,
state,
result[i].payerParticipantCurrencyId?.toString(),
span,
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.TIMEOUT_RESERVED
)
}
} else { // individual transfer from a bulk
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
Expand Down
Loading