Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(csi-1016): fx abort fixes #1133

Merged
merged 13 commits into from
Dec 12, 2024
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.4.1",
"@mojaloop/central-services-shared": "18.14.0",
"@mojaloop/central-services-shared": "18.14.1",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down
50 changes: 35 additions & 15 deletions src/domain/fx/cyril.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,12 @@ const processFxFulfilMessage = async (commitRequestId) => {
*
* @param {Array<string>} commitRequestIdList - List of commit request IDs to retrieve FX-related position changes.
* @param {Array<string>} transferIdList - List of transfer IDs to retrieve regular transfer-related position changes.
* @returns {Promise<PositionChangeItem[]>} - A promise that resolves to an array of position change objects.
* @returns {Promise<{ PositionChangeItem[], TransferStateChangeItem[] }>} - A promise that resolves to an object containing
* array of position change objects and transfer state change objects (transfer state changes with no position changes).
*/
const _getPositionChanges = async (commitRequestIdList, transferIdList, originalId) => {
const _getPositionChanges = async (commitRequestIdList, transferIdList, originalId, isAbort = false) => {
const positionChanges = []
const transferStateChanges = []
for (const commitRequestId of commitRequestIdList) {
const fxRecord = await fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId)
const fxPositionChanges = await ParticipantPositionChangesModel.getReservedPositionChangesByCommitRequestId(commitRequestId)
Expand All @@ -250,19 +252,35 @@ const _getPositionChanges = async (commitRequestIdList, transferIdList, original
for (const transferId of transferIdList) {
const transferRecord = await TransferFacade.getById(transferId)
const transferPositionChanges = await ParticipantPositionChangesModel.getReservedPositionChangesByTransferId(transferId)
transferPositionChanges.forEach((transferPositionChange) => {
positionChanges.push({
isFxTransferStateChange: false,

// Context: processing interscheme transfer abort with accompanying fx transfer where the payee DFSP is proxied
//
// If the transferPositionChanges is empty and there is a commitRequestId and the tranferId is the same as the originalId,
// then it is a case where the transfer has no position change for the transfer in the buffer scheme but has position change for the fx transfer.
// In that case we need to add a transfer state change so that we can notify the payer and update the transfer state.
if (isAbort && transferRecord && transferRecord.payeeIsProxy && transferPositionChanges.length === 0 && !!commitRequestIdList.length && originalId === transferId) {
transferStateChanges.push({
transferId,
isOriginalId: originalId === transferId,
notifyTo: transferRecord.externalPayerName || transferRecord.payerFsp,
participantCurrencyId: transferPositionChange.participantCurrencyId,
amount: -transferPositionChange.change
transferStateId: Enum.Transfers.TransferInternalState.ABORTED_ERROR,
reason: null,
isOriginalId: originalId === transferId, // added to help in constructing the notification
notifyTo: transferRecord.externalPayerName || transferRecord.payerFsp // added to help in constructing the notification
})
})
} else {
transferPositionChanges.forEach((transferPositionChange) => {
positionChanges.push({
isFxTransferStateChange: false,
transferId,
isOriginalId: originalId === transferId,
notifyTo: transferRecord.externalPayerName || transferRecord.payerFsp,
participantCurrencyId: transferPositionChange.participantCurrencyId,
amount: -transferPositionChange.change
})
})
}
}

return positionChanges
return { positionChanges, transferStateChanges }
}

/**
Expand All @@ -282,11 +300,12 @@ const processFxAbortMessage = async (commitRequestId) => {
const relatedFxTransferRecords = await fxTransfer.getByDeterminingTransferId(fxTransferRecord.determiningTransferId)

// Get position changes
const positionChanges = await _getPositionChanges(relatedFxTransferRecords.map(item => item.commitRequestId), [fxTransferRecord.determiningTransferId], commitRequestId)
const { positionChanges, transferStateChanges } = await _getPositionChanges(relatedFxTransferRecords.map(item => item.commitRequestId), [fxTransferRecord.determiningTransferId], commitRequestId, false)

histTimer({ success: true })
return {
positionChanges
positionChanges,
transferStateChanges
}
}

Expand All @@ -301,11 +320,12 @@ const processAbortMessage = async (transferId) => {
const relatedFxTransferRecords = await fxTransfer.getByDeterminingTransferId(transferId)

// Get position changes
const positionChanges = await _getPositionChanges(relatedFxTransferRecords.map(item => item.commitRequestId), [transferId], transferId)
const { positionChanges, transferStateChanges } = await _getPositionChanges(relatedFxTransferRecords.map(item => item.commitRequestId), [transferId], transferId, true)

histTimer({ success: true })
return {
positionChanges
positionChanges,
transferStateChanges
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/domain/position/abort.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ const processPositionAbortBin = async (
resultMessages.push({ binItem, message: Utility.clone(resultMessage) })
}
}
// Add notifications in the transferStateChanges if available
if (Array.isArray(cyrilResult.transferStateChanges)) {
for (const transferStateChange of cyrilResult.transferStateChanges) {
const resultMessage = _constructAbortResultMessage(binItem, transferStateChange.transferId, from, transferStateChange.notifyTo, transferStateChange.isOriginalId, false)
resultMessages.push({ binItem, message: Utility.clone(resultMessage) })
}
}
oderayi marked this conversation as resolved.
Show resolved Hide resolved
} else {
// There are still position changes to be processed
// Send position-commit kafka message again for the next item
Expand Down
19 changes: 10 additions & 9 deletions src/domain/position/fx-prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const processFxPositionPrepareBin = async (

Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::transfer:processingMessage: ${JSON.stringify(fxTransfer)}`)

/* eslint-disable brace-style */
// Check if fxTransfer is in correct state for processing, produce an internal error message
if (accumulatedFxTransferStates[fxTransfer.commitRequestId] !== Enum.Transfers.TransferInternalState.RECEIVED_PREPARE) {
Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::transferState: ${accumulatedFxTransferStates[fxTransfer.commitRequestId]} !== ${Enum.Transfers.TransferInternalState.RECEIVED_PREPARE}`)
Expand Down Expand Up @@ -109,9 +110,9 @@ const processFxPositionPrepareBin = async (
)

binItem.result = { success: false }

// Check if payer has insufficient liquidity, produce an error message and abort transfer
} else if (changePositions && availablePositionBasedOnLiquidityCover.toNumber() < transferAmount) {
}
// Check if payer has insufficient liquidity, produce an error message and abort transfer
else if (changePositions && availablePositionBasedOnLiquidityCover.toNumber() < transferAmount) {
transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY.message

Expand Down Expand Up @@ -152,9 +153,9 @@ const processFxPositionPrepareBin = async (
)

binItem.result = { success: false }

// Check if payer has surpassed their limit, produce an error message and abort transfer
} else if (changePositions && availablePositionBasedOnPayerLimit.toNumber() < transferAmount) {
}
// Check if payer has surpassed their limit, produce an error message and abort transfer
else if (changePositions && availablePositionBasedOnPayerLimit.toNumber() < transferAmount) {
transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_LIMIT_ERROR.message

Expand Down Expand Up @@ -195,9 +196,9 @@ const processFxPositionPrepareBin = async (
)

binItem.result = { success: false }

// Payer has sufficient liquidity and limit
} else {
}
// Payer has sufficient liquidity and limit
else {
transferStateId = Enum.Transfers.TransferInternalState.RESERVED

if (changePositions) {
Expand Down
4 changes: 3 additions & 1 deletion src/handlers/transfers/FxFulfilService.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ class FxFulfilService {

async validateTransferState(transfer, functionality) {
if (transfer.transferState !== TransferInternalState.RESERVED &&
transfer.transferState !== TransferInternalState.RESERVED_FORWARDED) {
transfer.transferState !== TransferInternalState.RESERVED_FORWARDED &&
transfer.transferState !== TransferInternalState.RECEIVED_FULFIL_DEPENDENT // for the case where we need to abort an fx transfer whose actual transfer is rejected/aborted by payee
) {
const fspiopError = fspiopErrorFactory.fxTransferNonReservedState()
const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING)
const eventDetail = {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ const definePositionParticipant = async ({ isFx, payload, determiningTransferChe
.getPositionParticipant(payload, determiningTransferCheckResult, proxyObligation)

let messageKey
// On a proxied transfer prepare if there is a corresponding fx transfer `getPositionParticipant`
// On a proxied transfer prepare, if there is a corresponding fx transfer, `getPositionParticipant`
// should return the fxp's proxy as the participantName since the fxp proxy would be saved as the counterPartyFsp
// in the prior fx transfer prepare.
// Following interscheme rules, if the debtor(fxTransfer FXP) and the creditor(transfer payee) are
Expand Down
42 changes: 41 additions & 1 deletion test/unit/domain/fx/cyril.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,45 @@ Test('Cyril', cyrilTest => {
participantCurrencyId: 1,
amount: -433.88
}
],
transferStateChanges: []
})
test.pass('Error not thrown')
test.end()
} catch (e) {
test.fail('Error Thrown')
test.end()
}
})

processAbortMessageTest.test('return transferStateChanges if no position changes but has commitRequestId', async (test) => {
try {
fxTransfer.getByDeterminingTransferId.returns(Promise.resolve([
{ commitRequestId: fxPayload.commitRequestId }
]))
// Mocks for _getPositionChnages
fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve({
initiatingFspName: fxPayload.initiatingFsp
}))
ParticipantPositionChangesModel.getReservedPositionChangesByCommitRequestId.returns(Promise.resolve([]))
TransferFacade.getById.returns(Promise.resolve({
payerFsp: payload.payerFsp,
payeeIsProxy: true
}))
ParticipantPositionChangesModel.getReservedPositionChangesByTransferId.returns(Promise.resolve([]))

const result = await Cyril.processAbortMessage(payload.transferId)

test.deepEqual(result, {
positionChanges: [],
transferStateChanges: [
{
isOriginalId: true,
notifyTo: 'dfsp1',
reason: null,
transferId: 'b51ec534-ee48-4575-b6a9-ead2955b8999',
transferStateId: Enum.Transfers.TransferInternalState.ABORTED_ERROR
}
]
})
test.pass('Error not thrown')
Expand Down Expand Up @@ -1198,7 +1237,8 @@ Test('Cyril', cyrilTest => {
notifyTo: 'dfsp1',
participantCurrencyId: 1,
amount: -433.88
}]
}],
transferStateChanges: []
})
test.pass('Error not thrown')
test.end()
Expand Down
6 changes: 4 additions & 2 deletions test/unit/domain/position/abort.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ const abortMessage1 = {
participantCurrencyId: 2,
amount: -10
}
]
],
transferStateChanges: []
}
}
},
Expand Down Expand Up @@ -147,7 +148,8 @@ const abortMessage2 = {
participantCurrencyId: 1,
amount: -10
}
]
],
transferStateChanges: []
}
}
},
Expand Down