Skip to content

Commit

Permalink
fix(csi-1016): fx abort fixes (#1133)
Browse files Browse the repository at this point in the history
  • Loading branch information
oderayi authored Dec 12, 2024
1 parent 8e9da0e commit b480bc8
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 34 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.4.1",
"@mojaloop/central-services-shared": "18.14.0",
"@mojaloop/central-services-shared": "18.14.1",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down
50 changes: 35 additions & 15 deletions src/domain/fx/cyril.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,12 @@ const processFxFulfilMessage = async (commitRequestId) => {
*
* @param {Array<string>} commitRequestIdList - List of commit request IDs to retrieve FX-related position changes.
* @param {Array<string>} transferIdList - List of transfer IDs to retrieve regular transfer-related position changes.
* @returns {Promise<PositionChangeItem[]>} - A promise that resolves to an array of position change objects.
* @returns {Promise<{ PositionChangeItem[], TransferStateChangeItem[] }>} - A promise that resolves to an object containing
* array of position change objects and transfer state change objects (transfer state changes with no position changes).
*/
const _getPositionChanges = async (commitRequestIdList, transferIdList, originalId) => {
const _getPositionChanges = async (commitRequestIdList, transferIdList, originalId, isAbort = false) => {
const positionChanges = []
const transferStateChanges = []
for (const commitRequestId of commitRequestIdList) {
const fxRecord = await fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId)
const fxPositionChanges = await ParticipantPositionChangesModel.getReservedPositionChangesByCommitRequestId(commitRequestId)
Expand All @@ -250,19 +252,35 @@ const _getPositionChanges = async (commitRequestIdList, transferIdList, original
for (const transferId of transferIdList) {
const transferRecord = await TransferFacade.getById(transferId)
const transferPositionChanges = await ParticipantPositionChangesModel.getReservedPositionChangesByTransferId(transferId)
transferPositionChanges.forEach((transferPositionChange) => {
positionChanges.push({
isFxTransferStateChange: false,

// Context: processing interscheme transfer abort with accompanying fx transfer where the payee DFSP is proxied
//
// If the transferPositionChanges is empty and there is a commitRequestId and the tranferId is the same as the originalId,
// then it is a case where the transfer has no position change for the transfer in the buffer scheme but has position change for the fx transfer.
// In that case we need to add a transfer state change so that we can notify the payer and update the transfer state.
if (isAbort && transferRecord && transferRecord.payeeIsProxy && transferPositionChanges.length === 0 && !!commitRequestIdList.length && originalId === transferId) {
transferStateChanges.push({
transferId,
isOriginalId: originalId === transferId,
notifyTo: transferRecord.externalPayerName || transferRecord.payerFsp,
participantCurrencyId: transferPositionChange.participantCurrencyId,
amount: -transferPositionChange.change
transferStateId: Enum.Transfers.TransferInternalState.ABORTED_ERROR,
reason: null,
isOriginalId: originalId === transferId, // added to help in constructing the notification
notifyTo: transferRecord.externalPayerName || transferRecord.payerFsp // added to help in constructing the notification
})
})
} else {
transferPositionChanges.forEach((transferPositionChange) => {
positionChanges.push({
isFxTransferStateChange: false,
transferId,
isOriginalId: originalId === transferId,
notifyTo: transferRecord.externalPayerName || transferRecord.payerFsp,
participantCurrencyId: transferPositionChange.participantCurrencyId,
amount: -transferPositionChange.change
})
})
}
}

return positionChanges
return { positionChanges, transferStateChanges }
}

/**
Expand All @@ -282,11 +300,12 @@ const processFxAbortMessage = async (commitRequestId) => {
const relatedFxTransferRecords = await fxTransfer.getByDeterminingTransferId(fxTransferRecord.determiningTransferId)

// Get position changes
const positionChanges = await _getPositionChanges(relatedFxTransferRecords.map(item => item.commitRequestId), [fxTransferRecord.determiningTransferId], commitRequestId)
const { positionChanges, transferStateChanges } = await _getPositionChanges(relatedFxTransferRecords.map(item => item.commitRequestId), [fxTransferRecord.determiningTransferId], commitRequestId, false)

histTimer({ success: true })
return {
positionChanges
positionChanges,
transferStateChanges
}
}

Expand All @@ -301,11 +320,12 @@ const processAbortMessage = async (transferId) => {
const relatedFxTransferRecords = await fxTransfer.getByDeterminingTransferId(transferId)

// Get position changes
const positionChanges = await _getPositionChanges(relatedFxTransferRecords.map(item => item.commitRequestId), [transferId], transferId)
const { positionChanges, transferStateChanges } = await _getPositionChanges(relatedFxTransferRecords.map(item => item.commitRequestId), [transferId], transferId, true)

histTimer({ success: true })
return {
positionChanges
positionChanges,
transferStateChanges
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/domain/position/abort.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ const processPositionAbortBin = async (
resultMessages.push({ binItem, message: Utility.clone(resultMessage) })
}
}
// Add notifications in the transferStateChanges if available
if (Array.isArray(cyrilResult.transferStateChanges)) {
for (const transferStateChange of cyrilResult.transferStateChanges) {
const resultMessage = _constructAbortResultMessage(binItem, transferStateChange.transferId, from, transferStateChange.notifyTo, transferStateChange.isOriginalId, false)
resultMessages.push({ binItem, message: Utility.clone(resultMessage) })
delete transferStateChange.isOriginalId
delete transferStateChange.notifyTo
transferStateChanges.push({ ...transferStateChange })
accumulatedTransferStatesCopy[transferStateChange.transferId] = transferStateChange.transferStateId
}
}
} else {
// There are still position changes to be processed
// Send position-commit kafka message again for the next item
Expand Down
19 changes: 10 additions & 9 deletions src/domain/position/fx-prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const processFxPositionPrepareBin = async (

Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::transfer:processingMessage: ${JSON.stringify(fxTransfer)}`)

/* eslint-disable brace-style */
// Check if fxTransfer is in correct state for processing, produce an internal error message
if (accumulatedFxTransferStates[fxTransfer.commitRequestId] !== Enum.Transfers.TransferInternalState.RECEIVED_PREPARE) {
Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::transferState: ${accumulatedFxTransferStates[fxTransfer.commitRequestId]} !== ${Enum.Transfers.TransferInternalState.RECEIVED_PREPARE}`)
Expand Down Expand Up @@ -109,9 +110,9 @@ const processFxPositionPrepareBin = async (
)

binItem.result = { success: false }

// Check if payer has insufficient liquidity, produce an error message and abort transfer
} else if (changePositions && availablePositionBasedOnLiquidityCover.toNumber() < transferAmount) {
}
// Check if payer has insufficient liquidity, produce an error message and abort transfer
else if (changePositions && availablePositionBasedOnLiquidityCover.toNumber() < transferAmount) {
transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY.message

Expand Down Expand Up @@ -152,9 +153,9 @@ const processFxPositionPrepareBin = async (
)

binItem.result = { success: false }

// Check if payer has surpassed their limit, produce an error message and abort transfer
} else if (changePositions && availablePositionBasedOnPayerLimit.toNumber() < transferAmount) {
}
// Check if payer has surpassed their limit, produce an error message and abort transfer
else if (changePositions && availablePositionBasedOnPayerLimit.toNumber() < transferAmount) {
transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_LIMIT_ERROR.message

Expand Down Expand Up @@ -195,9 +196,9 @@ const processFxPositionPrepareBin = async (
)

binItem.result = { success: false }

// Payer has sufficient liquidity and limit
} else {
}
// Payer has sufficient liquidity and limit
else {
transferStateId = Enum.Transfers.TransferInternalState.RESERVED

if (changePositions) {
Expand Down
4 changes: 3 additions & 1 deletion src/handlers/transfers/FxFulfilService.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ class FxFulfilService {

async validateTransferState(transfer, functionality) {
if (transfer.transferState !== TransferInternalState.RESERVED &&
transfer.transferState !== TransferInternalState.RESERVED_FORWARDED) {
transfer.transferState !== TransferInternalState.RESERVED_FORWARDED &&
transfer.transferState !== TransferInternalState.RECEIVED_FULFIL_DEPENDENT // for the case where we need to abort an fx transfer whose actual transfer is rejected/aborted by payee
) {
const fspiopError = fspiopErrorFactory.fxTransferNonReservedState()
const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING)
const eventDetail = {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ const definePositionParticipant = async ({ isFx, payload, determiningTransferChe
.getPositionParticipant(payload, determiningTransferCheckResult, proxyObligation)

let messageKey
// On a proxied transfer prepare if there is a corresponding fx transfer `getPositionParticipant`
// On a proxied transfer prepare, if there is a corresponding fx transfer, `getPositionParticipant`
// should return the fxp's proxy as the participantName since the fxp proxy would be saved as the counterPartyFsp
// in the prior fx transfer prepare.
// Following interscheme rules, if the debtor(fxTransfer FXP) and the creditor(transfer payee) are
Expand Down
42 changes: 41 additions & 1 deletion test/unit/domain/fx/cyril.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,45 @@ Test('Cyril', cyrilTest => {
participantCurrencyId: 1,
amount: -433.88
}
],
transferStateChanges: []
})
test.pass('Error not thrown')
test.end()
} catch (e) {
test.fail('Error Thrown')
test.end()
}
})

processAbortMessageTest.test('return transferStateChanges if no position changes but has commitRequestId', async (test) => {
try {
fxTransfer.getByDeterminingTransferId.returns(Promise.resolve([
{ commitRequestId: fxPayload.commitRequestId }
]))
// Mocks for _getPositionChnages
fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve({
initiatingFspName: fxPayload.initiatingFsp
}))
ParticipantPositionChangesModel.getReservedPositionChangesByCommitRequestId.returns(Promise.resolve([]))
TransferFacade.getById.returns(Promise.resolve({
payerFsp: payload.payerFsp,
payeeIsProxy: true
}))
ParticipantPositionChangesModel.getReservedPositionChangesByTransferId.returns(Promise.resolve([]))

const result = await Cyril.processAbortMessage(payload.transferId)

test.deepEqual(result, {
positionChanges: [],
transferStateChanges: [
{
isOriginalId: true,
notifyTo: 'dfsp1',
reason: null,
transferId: 'b51ec534-ee48-4575-b6a9-ead2955b8999',
transferStateId: Enum.Transfers.TransferInternalState.ABORTED_ERROR
}
]
})
test.pass('Error not thrown')
Expand Down Expand Up @@ -1198,7 +1237,8 @@ Test('Cyril', cyrilTest => {
notifyTo: 'dfsp1',
participantCurrencyId: 1,
amount: -433.88
}]
}],
transferStateChanges: []
})
test.pass('Error not thrown')
test.end()
Expand Down
6 changes: 4 additions & 2 deletions test/unit/domain/position/abort.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ const abortMessage1 = {
participantCurrencyId: 2,
amount: -10
}
]
],
transferStateChanges: []
}
}
},
Expand Down Expand Up @@ -147,7 +148,8 @@ const abortMessage2 = {
participantCurrencyId: 1,
amount: -10
}
]
],
transferStateChanges: []
}
}
},
Expand Down

0 comments on commit b480bc8

Please sign in to comment.