Skip to content

Commit

Permalink
fix: position changes (#1108)
Browse files Browse the repository at this point in the history
* fix: from argument in kafka notification for abort

* fix: position changes

* fix: to number

* fix: position change in timeout

* fix: related fxtransfer check

* fix: unit tests

* fix: timeout

* chore: deps

* fix fx-abort tests

* fix fx-timeout tests

* chore: added a comment

* fix more tests

* fix: invalid fulfilment

* fix: unit test

* chore(snapshot): 17.8.0-snapshot.28

* chore(snapshot): 17.8.0-snapshot.29

* fix: lint

* chore(snapshot): 17.8.0-snapshot.30

---------

Co-authored-by: Kevin Leyow <kleyow@gmail.com>
  • Loading branch information
vijayg10 and kleyow authored Sep 18, 2024
1 parent 4bf39b7 commit 610031f
Show file tree
Hide file tree
Showing 23 changed files with 432 additions and 81 deletions.
46 changes: 46 additions & 0 deletions migrations/310404_participantPositionChange-change.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Contributors
--------------
This is the official list of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.
* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>
* ModusBox
- Vijaya Kumar Guthi <vijaya.guthi@infitx.com>
--------------
******/

'use strict'

exports.up = async (knex) => {
return await knex.schema.hasTable('participantPositionChange').then(function(exists) {
if (exists) {
return knex.schema.alterTable('participantPositionChange', (t) => {
t.decimal('change', 18, 2).notNullable()
})
}
})
}

exports.down = async (knex) => {
return await knex.schema.hasTable('participantPositionChange').then(function(exists) {
if (exists) {
return knex.schema.alterTable('participantPositionChange', (t) => {
t.dropColumn('change')
})
}
})
}
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "17.8.0-snapshot.28",
"version": "17.8.0-snapshot.31",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "v18.8.0",
"@mojaloop/central-services-shared": "18.8.0",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down Expand Up @@ -133,7 +133,7 @@
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.5",
"npm-check-updates": "17.1.1",
"npm-check-updates": "17.1.2",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
Expand Down
4 changes: 2 additions & 2 deletions src/domain/fx/cyril.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ const _getPositionChanges = async (commitRequestIdList, transferIdList) => {
commitRequestId,
notifyTo: fxRecord.initiatingFspName,
participantCurrencyId: fxPositionChange.participantCurrencyId,
amount: -fxPositionChange.value
amount: -fxPositionChange.change
})
})
}
Expand All @@ -238,7 +238,7 @@ const _getPositionChanges = async (commitRequestIdList, transferIdList) => {
transferId,
notifyTo: transferRecord.payerFsp,
participantCurrencyId: transferPositionChange.participantCurrencyId,
amount: -transferPositionChange.value
amount: -transferPositionChange.change
})
})
}
Expand Down
13 changes: 9 additions & 4 deletions src/domain/position/abort.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const processPositionAbortBin = async (
accumulatedTransferStatesCopy[positionChangeToBeProcessed.transferId] = transferStateId
}
binItem.result = { success: true }
const from = binItem.message.value.from
cyrilResult.positionChanges[positionChangeIndex].isDone = true
const nextIndex = cyrilResult.positionChanges.findIndex(positionChange => !positionChange.isDone)
if (nextIndex === -1) {
Expand All @@ -91,11 +92,11 @@ const processPositionAbortBin = async (
for (const positionChange of cyrilResult.positionChanges) {
if (positionChange.isFxTransferStateChange) {
// Construct notification message for fx transfer state change
const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, Config.HUB_NAME, positionChange.notifyTo)
const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, from, positionChange.notifyTo)
resultMessages.push({ binItem, message: resultMessage })
} else {
// Construct notification message for transfer state change
const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, Config.HUB_NAME, positionChange.notifyTo)
const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, from, positionChange.notifyTo)
resultMessages.push({ binItem, message: resultMessage })
}
}
Expand Down Expand Up @@ -127,7 +128,9 @@ const processPositionAbortBin = async (

const _constructAbortResultMessage = (binItem, id, from, notifyTo) => {
let apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.PAYEE_REJECTION
if (binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.FX_ABORT_VALIDATION) {
let fromCalculated = from
if (binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.FX_ABORT_VALIDATION || binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.ABORT_VALIDATION) {
fromCalculated = Config.HUB_NAME
apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR
}
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -153,8 +156,8 @@ const _constructAbortResultMessage = (binItem, id, from, notifyTo) => {
)
const resultMessage = Utility.StreamingProtocol.createMessage(
id,
from,
notifyTo,
fromCalculated,
metadata,
binItem.message.value.content.headers, // Headers don't really matter here. ml-api-adapter will ignore them and create their own.
fspiopError,
Expand All @@ -173,6 +176,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand All @@ -194,6 +198,7 @@ const _handleParticipantPositionChangeFx = (runningPosition, transferAmount, com
commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId
fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand Down
2 changes: 2 additions & 0 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand All @@ -286,6 +287,7 @@ const _handleParticipantPositionChangeFx = (runningPosition, transferAmount, com
commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId
fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand Down
3 changes: 2 additions & 1 deletion src/domain/position/fx-prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const processFxPositionPrepareBin = async (
let resultMessage
const fxTransfer = binItem.decodedPayload
const cyrilResult = binItem.message.value.content.context.cyrilResult
const transferAmount = fxTransfer.targetAmount.currency === cyrilResult.currencyId ? new MLNumber(fxTransfer.targetAmount.amount) : new MLNumber(fxTransfer.sourceAmount.amount)
const transferAmount = fxTransfer.targetAmount.currency === cyrilResult.currencyId ? fxTransfer.targetAmount.amount : fxTransfer.sourceAmount.amount

Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::transfer:processingMessage: ${JSON.stringify(fxTransfer)}`)

Expand Down Expand Up @@ -205,6 +205,7 @@ const processFxPositionPrepareBin = async (
commitRequestId: fxTransfer.commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId
fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries
value: currentPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}
participantPositionChanges.push(participantPositionChange)
Expand Down
3 changes: 2 additions & 1 deletion src/domain/position/fx-timeout-reserved.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const processPositionFxTimeoutReservedBin = async (
} else {
Logger.isDebugEnabled && Logger.debug(`accumulatedFxTransferStates: ${JSON.stringify(accumulatedFxTransferStates)}`)

const transferAmount = fetchedReservedPositionChangesByCommitRequestIds[commitRequestId][participantAccountId].value
const transferAmount = fetchedReservedPositionChangesByCommitRequestIds[commitRequestId][participantAccountId].change

// Construct payee notification message
const resultMessage = _constructFxTimeoutReservedResultMessage(
Expand Down Expand Up @@ -141,6 +141,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, commi
commitRequestId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand Down
1 change: 1 addition & 0 deletions src/domain/position/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ const processPositionPrepareBin = async (
transferId: transfer.transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: currentPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}
participantPositionChanges.push(participantPositionChange)
Expand Down
1 change: 1 addition & 0 deletions src/domain/position/timeout-reserved.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ const _handleParticipantPositionChange = (runningPosition, transferAmount, trans
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: updatedRunningPosition.toNumber(),
change: transferAmount,
reservedValue: accumulatedPositionReservedValue
}

Expand Down
32 changes: 29 additions & 3 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,40 @@ const processFulfilMessage = async (message, functionality, span) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInvalidFulfilment--${actionLetter}9`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, 'invalid fulfilment')
const apiFSPIOPError = fspiopError.toApiErrorObject(Config.ERROR_HANDLING)
await TransferService.handlePayeeResponse(transferId, payload, action, apiFSPIOPError)
await TransferService.handlePayeeResponse(transferId, payload, TransferEventAction.ABORT_VALIDATION, apiFSPIOPError)
const eventDetail = { functionality: TransferEventType.POSITION, action: TransferEventAction.ABORT_VALIDATION }
/**
* TODO: BulkProcessingHandler (not in scope of #967) The individual transfer is ABORTED by notification is never sent.
*/
// Key position validation abort with payer account id
const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME })

const cyrilResult = await FxService.Cyril.processAbortMessage(transferId)

params.message.value.content.context = {
...params.message.value.content.context,
cyrilResult
}
if (cyrilResult.positionChanges.length > 0) {
const participantCurrencyId = cyrilResult.positionChanges[0].participantCurrencyId
await Kafka.proceed(
Config.KAFKA_CONFIG,
params,
{
consumerCommit,
fspiopError: apiFSPIOPError,
eventDetail,
messageKey: participantCurrencyId.toString(),
topicNameOverride: Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.ABORT,
hubName: Config.HUB_NAME
}
)
} else {
const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError('Invalid cyril result')
throw fspiopError
}

// const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
// await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME })

// emit an extra message - RESERVED_ABORTED if action === TransferEventAction.RESERVE
if (action === TransferEventAction.RESERVE) {
Expand Down
1 change: 1 addition & 0 deletions src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ const prepare = async (error, messages) => {
if (proxyEnabled) {
const [initiatingFsp, counterPartyFsp] = isFx ? [payload.initiatingFsp, payload.counterPartyFsp] : [payload.payerFsp, payload.payeeFsp]

// TODO: We need to double check the following validation logic incase of payee side currency conversion
const payeeFspLookupOptions = isFx ? null : { validateCurrencyAccounts: true, accounts: [{ currency: payload.amount.currency, accountType: Enum.Accounts.LedgerAccountType.POSITION }] }

;[proxyObligation.initiatingFspProxyOrParticipantId, proxyObligation.counterPartyFspProxyOrParticipantId] = await Promise.all([
Expand Down
26 changes: 26 additions & 0 deletions src/handlers/transfers/validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const Config = require('../../lib/config')
const Participant = require('../../domain/participant')
const Transfer = require('../../domain/transfer')
const FxTransferModel = require('../../models/fxTransfer')
// const TransferStateChangeModel = require('../../models/transfer/transferStateChange')
const FxTransferStateChangeModel = require('../../models/fxTransfer/stateChange')
const CryptoConditions = require('../../cryptoConditions')
const Crypto = require('crypto')
const base64url = require('base64url')
Expand Down Expand Up @@ -208,6 +210,30 @@ const validatePrepare = async (payload, headers, isFx = false, determiningTransf
const initiatingFsp = isFx ? payload.initiatingFsp : payload.payerFsp
const counterPartyFsp = isFx ? payload.counterPartyFsp : payload.payeeFsp

// Check if determining transfers are failed
if (determiningTransferCheckResult.watchListRecords && determiningTransferCheckResult.watchListRecords.length > 0) {
// Iterate through determiningTransferCheckResult.watchListRecords
for (const watchListRecord of determiningTransferCheckResult.watchListRecords) {
if (isFx) {
// TODO: Check the transfer state of determiningTransferId
// const latestTransferStateChange = await TransferStateChangeModel.getByTransferId(watchListRecord.determiningTransferId)
// if (latestTransferStateChange.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED) {
// reasons.push('Related Transfer is not in reserved state')
// validationPassed = false
// return { validationPassed, reasons }
// }
} else {
// Check the transfer state of commitRequestId
const latestFxTransferStateChange = await FxTransferStateChangeModel.getByCommitRequestId(watchListRecord.commitRequestId)
if (latestFxTransferStateChange.transferStateId !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL_DEPENDENT) {
reasons.push('Related FX Transfer is not fulfilled')
validationPassed = false
return { validationPassed, reasons }
}
}
}
}

// Skip usual validation if preparing a proxy transfer or fxTransfer
if (!(proxyObligation?.isInitiatingFspProxy || proxyObligation?.isCounterPartyFspProxy)) {
validationPassed = (
Expand Down
4 changes: 3 additions & 1 deletion src/models/position/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,13 @@ const prepareChangeParticipantPositionTransaction = async (transferList) => {
const processedTransfersKeysList = Object.keys(processedTransfers)
const batchParticipantPositionChange = []
for (const keyIndex in processedTransfersKeysList) {
const { runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]]
const { transferAmount, runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]]
const participantPositionChange = {
participantPositionId: initialParticipantPosition.participantPositionId,
participantCurrencyId: participantCurrency.participantCurrencyId,
transferStateChangeId: processedTransferStateChangeIdList[keyIndex],
value: runningPosition,
change: transferAmount.toNumber(),
// processBatch: <uuid> - a single value uuid for this entire batch to make sure the set of transfers in this batch can be clearly grouped
reservedValue: runningReservedValue
}
Expand Down Expand Up @@ -294,6 +295,7 @@ const changeParticipantPositionTransaction = async (participantCurrencyId, isRev
participantCurrencyId,
transferStateChangeId: insertedTransferStateChange.transferStateChangeId,
value: latestPosition,
change: isReversal ? -amount : amount,
reservedValue: participantPosition.reservedValue,
createdDate: transactionTimestamp
}
Expand Down
Loading

0 comments on commit 610031f

Please sign in to comment.