Skip to content

Commit

Permalink
feat(csi-650): updated transferTimeout handler to take into account e…
Browse files Browse the repository at this point in the history
…xternalParticipant (#1107)

* feat(csi-650): updated transferTimeout handler to take into account externalParticipant

* feat(csi-650): fixed ep1.externalParticipantId field

* feat(csi-650): used leftJoin for externalParticipant table

* feat(csi-650): added externalPayeeName as source to timeout handler

* feat(csi-650): updated fxTimeout logic to take into account externalParticipant info

* feat(csi-650): code cleaning up

* feat(csi-650): code cleaning up
  • Loading branch information
geka-evk authored Sep 17, 2024
1 parent 31d1e35 commit 2b0c76d
Show file tree
Hide file tree
Showing 18 changed files with 480 additions and 330 deletions.
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.7.6",
"@mojaloop/central-services-shared": "18.8.0",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down Expand Up @@ -132,7 +132,7 @@
"get-port": "5.1.1",
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.4",
"nodemon": "3.1.5",
"npm-check-updates": "17.1.1",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
Expand Down
151 changes: 102 additions & 49 deletions src/handlers/timeouts/handler.js

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions src/handlers/transfers/FxFulfilService.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class FxFulfilService {
}

async getFxTransferDetails(commitRequestId, functionality) {
const transfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId)
const fxTransfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId)

if (!transfer) {
if (!fxTransfer) {
const fspiopError = fspiopErrorFactory.fxTransferNotFound()
const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING)
const eventDetail = {
Expand All @@ -72,8 +72,8 @@ class FxFulfilService {
throw fspiopError
}

this.log.debug('fxTransfer is found', { transfer })
return transfer
this.log.debug('fxTransfer is found', { fxTransfer })
return fxTransfer
}

async validateHeaders({ transfer, headers, payload }) {
Expand Down Expand Up @@ -302,12 +302,13 @@ class FxFulfilService {
const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING)
const eventDetail = {
functionality: Type.POSITION,
action
action // FX_ABORT
}
this.log.warn('FX_ABORT case', { eventDetail, apiFSPIOPError })

await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, action, apiFSPIOPError)
const cyrilResult = await this.cyril.processFxAbortMessage(transfer.commitRequestId)
// todo: add externalParticipantId to the message here?

this.params.message.value.content.context = {
...this.params.message.value.content.context,
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/transfers/createRemittanceEntity.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ const createRemittanceEntity = (isFx) => {
},

/**
* A determiningTransferCheckResult.
* @typedef {Object} DeterminingTransferCheckResult
*
* @property {boolean} determiningTransferExists - Indicates if the determining transfer exists.
* @property {Array<{participantName, currencyId}>} participantCurrencyValidationList - List of validations for participant currencies.
* @property {Object} [transferRecord] - Determining transfer for the FX transfer (optional).
Expand Down
1 change: 1 addition & 0 deletions src/lib/proxyCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const getCache = () => {

/**
* @typedef {Object} ProxyOrParticipant - An object containing the inScheme status, proxyId and FSP name
*
* @property {boolean} inScheme - Is FSP in the scheme.
* @property {string|null} proxyId - Proxy, associated with the FSP, if FSP is not in the scheme.
* @property {string} name - FSP name.
Expand Down
13 changes: 6 additions & 7 deletions src/models/fxTransfer/fxTransfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const { TABLE_NAMES } = require('../../shared/constants')
const Db = require('../../lib/db')
const participant = require('../participant/facade')
const ParticipantCachedModel = require('../participant/participantCached')
const externalParticipantModel = require('../participant/externalParticipant')
const TransferExtensionModel = require('./fxTransferExtension')

const { TransferInternalState } = Enum.Transfers
Expand Down Expand Up @@ -196,6 +195,7 @@ const getAllDetailsByCommitRequestIdForProxiedFxTransfer = async (commitRequestI
return transferResult
})
} catch (err) {
logger.warn('error in getAllDetailsByCommitRequestIdForProxiedFxTransfer', err)
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}
Expand Down Expand Up @@ -272,8 +272,8 @@ const savePreparedRequest = async (
ledgerEntryTypeId: Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE
}
if (proxyObligation.isInitiatingFspProxy) {
initiatingParticipantRecord.externalParticipantId = await externalParticipantModel
.getIdByNameOrCreate(proxyObligation.initiatingFspProxyOrParticipantId)
initiatingParticipantRecord.externalParticipantId = await participant
.getExternalParticipantIdByNameOrCreate(proxyObligation.initiatingFspProxyOrParticipantId)
}

const counterPartyParticipantRecord1 = {
Expand All @@ -286,8 +286,8 @@ const savePreparedRequest = async (
ledgerEntryTypeId: Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE
}
if (proxyObligation.isCounterPartyFspProxy) {
counterPartyParticipantRecord1.externalParticipantId = await externalParticipantModel
.getIdByNameOrCreate(proxyObligation.counterPartyFspProxyOrParticipantId)
counterPartyParticipantRecord1.externalParticipantId = await participant
.getExternalParticipantIdByNameOrCreate(proxyObligation.counterPartyFspProxyOrParticipantId)
}

let counterPartyParticipantRecord2 = null
Expand Down Expand Up @@ -377,7 +377,6 @@ const savePreparedRequest = async (
}
}

// todo: clarify this code
const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopError) => {
const histTimerSaveFulfilResponseEnd = Metrics.getHistogram(
'fx_model_transfer',
Expand Down Expand Up @@ -562,10 +561,10 @@ module.exports = {
getByDeterminingTransferId,
getByIdLight,
getAllDetailsByCommitRequestId,
getAllDetailsByCommitRequestIdForProxiedFxTransfer,
getFxTransferParticipant,
savePreparedRequest,
saveFxFulfilResponse,
saveFxTransfer,
getAllDetailsByCommitRequestIdForProxiedFxTransfer,
updateFxPrepareReservedForwarded
}
33 changes: 14 additions & 19 deletions src/models/participant/externalParticipant.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,24 @@ const create = async ({ name, proxyId }) => {
return result
} catch (err) {
log.error('error in create', err)
// If the cache is not up-to-date, then will get an error when inserting a record and that record already exists
// reload the cache at that point.
// todo: to implement above requirement, we need to detect duplication restriction error, and don't rethrow error
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

// const getAll = async (options = {}) => {
// try {
// const result = await Db.from(TABLE).find({}, options)
// log.debug('getAll result:', { result })
// return result
// } catch (err) {
// log.error('error in getAll:', err)
// throw ErrorHandler.Factory.reformatFSPIOPError(err)
// }
// }

const getOneBy = async (criteria, options) => {
try {
const result = await Db.from(TABLE).findOne(criteria, options)
Expand All @@ -80,24 +94,6 @@ const getOneByNameCached = async (name, options = {}) => {
return data
}

const getIdByNameOrCreate = async ({ name, proxyId }) => {
try {
let dfsp = await getOneByNameCached(name)
if (!dfsp) {
await create({ name, proxyId })
// todo: check if create returns id (to avoid getOneByNameCached call)
dfsp = await getOneByNameCached(name)
}
const id = dfsp?.[ID_FIELD]
log.verbose('getIdByNameOrCreate result:', { id, name })
return id
} catch (err) {
log.child({ name, proxyId }).warn('error in getIdByNameOrCreate:', err)
return null
// todo: think, if we need to rethrow an error here?
}
}

const destroyBy = async (criteria) => {
try {
const result = await Db.from(TABLE).destroy(criteria)
Expand All @@ -114,7 +110,6 @@ const destroyByName = async (name) => destroyBy({ name })
// todo: think, if we need update method
module.exports = {
create,
getIdByNameOrCreate,
getOneByNameCached,
getOneByName,
getOneById,
Expand Down
36 changes: 33 additions & 3 deletions src/models/participant/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@
* @module src/models/participant/facade/
*/

const Db = require('../../lib/db')
const Time = require('@mojaloop/central-services-shared').Util.Time
const { Enum } = require('@mojaloop/central-services-shared')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Metrics = require('@mojaloop/central-services-metrics')

const Db = require('../../lib/db')
const Cache = require('../../lib/cache')
const ParticipantModelCached = require('../../models/participant/participantCached')
const ParticipantCurrencyModelCached = require('../../models/participant/participantCurrencyCached')
const ParticipantLimitCached = require('../../models/participant/participantLimitCached')
const externalParticipant = require('../../models/participant/externalParticipant')
const Config = require('../../lib/config')
const SettlementModelModel = require('../settlement/settlementModel')
const { Enum } = require('@mojaloop/central-services-shared')
const { logger } = require('../../shared/logger')

const getByNameAndCurrency = async (name, currencyId, ledgerAccountTypeId, isCurrencyActive) => {
const histTimerParticipantGetByNameAndCurrencyEnd = Metrics.getHistogram(
Expand Down Expand Up @@ -773,6 +776,32 @@ const getAllNonHubParticipantsWithCurrencies = async (trx) => {
}
}

const getExternalParticipantIdByNameOrCreate = async ({ name, proxyId }) => {
try {
let external = await externalParticipant.getOneByNameCached(name)
if (!external) {
const proxy = await ParticipantModelCached.getByName(proxyId)
if (!proxy) {
throw new Error(`Proxy participant not found: ${proxyId}`)
}
await externalParticipant.create({
name,
proxyId: proxy.participantId
})
// todo: - check if create returns id (to avoid getOneByNameCached call)
// - if isCreated === false, re-load all external participants cache
external = await externalParticipant.getOneByNameCached(name)
}
const id = external?.externalParticipantId
logger.verbose('getExternalParticipantIdByNameOrCreate result:', { id, name })
return id
} catch (err) {
logger.child({ name, proxyId }).warn('error in getExternalParticipantIdByNameOrCreate:', err)
return null
// todo: think, if we need to rethrow an error here?
}
}

module.exports = {
addHubAccountAndInitPosition,
getByNameAndCurrency,
Expand All @@ -789,5 +818,6 @@ module.exports = {
getParticipantLimitsByParticipantId,
getAllAccountsByNameAndCurrency,
getLimitsForAllParticipants,
getAllNonHubParticipantsWithCurrencies
getAllNonHubParticipantsWithCurrencies,
getExternalParticipantIdByNameOrCreate
}
Loading

0 comments on commit 2b0c76d

Please sign in to comment.