From 2b0c76debe9428bf3a726e613afde75e27425631 Mon Sep 17 00:00:00 2001 From: Eugen Klymniuk Date: Tue, 17 Sep 2024 18:00:57 +0100 Subject: [PATCH] feat(csi-650): updated transferTimeout handler to take into account externalParticipant (#1107) * feat(csi-650): updated transferTimeout handler to take into account externalParticipant * feat(csi-650): fixed ep1.externalParticipantId field * feat(csi-650): used leftJoin for externalParticipant table * feat(csi-650): added externalPayeeName as source to timeout handler * feat(csi-650): updated fxTimeout logic to take into account externalParticipant info * feat(csi-650): code cleaning up * feat(csi-650): code cleaning up --- package-lock.json | 16 +- package.json | 4 +- src/handlers/timeouts/handler.js | 151 +++++++++----- src/handlers/transfers/FxFulfilService.js | 11 +- .../transfers/createRemittanceEntity.js | 2 +- src/lib/proxyCache.js | 1 + src/models/fxTransfer/fxTransfer.js | 13 +- src/models/participant/externalParticipant.js | 33 ++-- src/models/participant/facade.js | 36 +++- src/models/transfer/facade.js | 89 +++++++-- test/fixtures.js | 28 ++- .../handlers/transfers/fxAbort.test.js | 2 +- .../handlers/transfers/fxTimeout.test.js | 14 +- .../prepare/prepare-internals.test.js | 177 +++++++++++++++++ test/unit/domain/fx/cyril.test.js | 16 +- .../participant/externalParticipant.test.js | 26 --- test/unit/models/transfer/facade.test.js | 187 ------------------ test/util/helpers.js | 4 +- 18 files changed, 480 insertions(+), 330 deletions(-) create mode 100644 test/integration-override/handlers/transfers/prepare/prepare-internals.test.js diff --git a/package-lock.json b/package-lock.json index 62c1423eb..f405d13b9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,7 +20,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.5.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "18.7.6", + "@mojaloop/central-services-shared": "18.8.0", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.6", "@mojaloop/event-sdk": "14.1.1", @@ -57,7 +57,7 @@ "get-port": "5.1.1", "jsdoc": "4.0.3", "jsonpath": "1.1.1", - "nodemon": "3.1.4", + "nodemon": "3.1.5", "npm-check-updates": "17.1.1", "nyc": "17.0.0", "pre-commit": "1.2.2", @@ -1623,9 +1623,9 @@ } }, "node_modules/@mojaloop/central-services-shared": { - "version": "18.7.6", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.7.6.tgz", - "integrity": "sha512-kcatwRT6qqIgKHnckj2PFASok99Gvox6JiAV9dyxfMj4Yy9vr7tJqSVcnDQmCoAsx/rVBz3bLMzgVuzyIXRmqA==", + "version": "18.8.0", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.8.0.tgz", + "integrity": "sha512-Y9U9ohOjF3ZqTH1gzOxPZcqvQO3GtPs0cyvpy3Wcr4Gnxqh02hWe7wjlgwlBvQArsQqstMs6/LWdESIwsJCpog==", "dependencies": { "@hapi/catbox": "12.1.1", "@hapi/catbox-memory": "5.0.1", @@ -9497,9 +9497,9 @@ "dev": true }, "node_modules/nodemon": { - "version": "3.1.4", - "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-3.1.4.tgz", - "integrity": "sha512-wjPBbFhtpJwmIeY2yP7QF+UKzPfltVGtfce1g/bB15/8vCGZj8uxD62b/b9M9/WVgme0NZudpownKN+c0plXlQ==", + "version": "3.1.5", + "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-3.1.5.tgz", + "integrity": "sha512-V5UtfYc7hjFD4SI3EzD5TR8ChAHEZ+Ns7Z5fBk8fAbTVAj+q3G+w7sHJrHxXBkVn6ApLVTljau8wfHwqmGUjMw==", "dev": true, "dependencies": { "chokidar": "^3.5.2", diff --git a/package.json b/package.json index 55275ab12..1cf793cf6 100644 --- a/package.json +++ b/package.json @@ -92,7 +92,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.5.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "18.7.6", + "@mojaloop/central-services-shared": "18.8.0", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.6", "@mojaloop/event-sdk": "14.1.1", @@ -132,7 +132,7 @@ "get-port": "5.1.1", "jsdoc": "4.0.3", "jsonpath": "1.1.1", - "nodemon": "3.1.4", + "nodemon": "3.1.5", "npm-check-updates": "17.1.1", "nyc": "17.0.0", "pre-commit": "1.2.2", diff --git a/src/handlers/timeouts/handler.js b/src/handlers/timeouts/handler.js index 88f6124ca..15e51df80 100644 --- a/src/handlers/timeouts/handler.js +++ b/src/handlers/timeouts/handler.js @@ -35,20 +35,29 @@ that actually holds the copyright for their contributions (see the */ const CronJob = require('cron').CronJob -const Config = require('../../lib/config') -const TimeoutService = require('../../domain/timeout') const Enum = require('@mojaloop/central-services-shared').Enum -const Kafka = require('@mojaloop/central-services-shared').Util.Kafka -const Producer = require('@mojaloop/central-services-stream').Util.Producer const Utility = require('@mojaloop/central-services-shared').Util +const Producer = require('@mojaloop/central-services-stream').Util.Producer const ErrorHandler = require('@mojaloop/central-services-error-handling') const EventSdk = require('@mojaloop/event-sdk') -const resourceVersions = require('@mojaloop/central-services-shared').Util.resourceVersions -const Logger = require('@mojaloop/central-services-logger') + +const Config = require('../../lib/config') +const TimeoutService = require('../../domain/timeout') +const { logger } = require('../../shared/logger') + +const { Kafka, resourceVersions } = Utility +const { Action, Type } = Enum.Events.Event + let timeoutJob let isRegistered let running = false +/** + * Processes timedOut transfers + * + * @param {TimedOutTransfer[]} transferTimeoutList + * @returns {Promise} + */ const _processTimedOutTransfers = async (transferTimeoutList) => { const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING) if (!Array.isArray(transferTimeoutList)) { @@ -56,58 +65,88 @@ const _processTimedOutTransfers = async (transferTimeoutList) => { { ...transferTimeoutList } ] } - for (let i = 0; i < transferTimeoutList.length; i++) { + + for (const TT of transferTimeoutList) { const span = EventSdk.Tracer.createSpan('cl_transfer_timeout') try { const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription) - const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(transferTimeoutList[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state) - const headers = Utility.Http.SwitchDefaultHeaders(transferTimeoutList[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Config.HUB_NAME, resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion) - const message = Utility.StreamingProtocol.createMessage(transferTimeoutList[i].transferId, transferTimeoutList[i].payeeFsp, transferTimeoutList[i].payerFsp, metadata, headers, fspiopError, { id: transferTimeoutList[i].transferId }, `application/vnd.interoperability.${Enum.Http.HeaderResources.TRANSFERS}+json;version=${resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion}`) - span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED)) + const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(TT.transferId, Enum.Kafka.Topics.NOTIFICATION, Action.TIMEOUT_RECEIVED, state) + const destination = TT.externalPayerName || TT.payerFsp + const source = TT.externalPayeeName || TT.payeeFsp + const headers = Utility.Http.SwitchDefaultHeaders(destination, Enum.Http.HeaderResources.TRANSFERS, Config.HUB_NAME, resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion) + const message = Utility.StreamingProtocol.createMessage(TT.transferId, destination, source, metadata, headers, fspiopError, { id: TT.transferId }, `application/vnd.interoperability.${Enum.Http.HeaderResources.TRANSFERS}+json;version=${resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion}`) + + span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Type.TRANSFER, Action.TIMEOUT_RECEIVED)) await span.audit({ state, metadata, headers, message }, EventSdk.AuditEventAction.start) - if (transferTimeoutList[i].bulkTransferId === null) { // regular transfer - if (transferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { - message.to = message.from + + if (TT.bulkTransferId === null) { // regular transfer + if (TT.transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { message.from = Config.HUB_NAME // event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED - await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span) - } else if (transferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) { - message.metadata.event.type = Enum.Events.Event.Type.POSITION - message.metadata.event.action = Enum.Events.Event.Action.TIMEOUT_RESERVED + await Kafka.produceGeneralMessage( + Config.KAFKA_CONFIG, + Producer, + Enum.Kafka.Topics.NOTIFICATION, + Action.TIMEOUT_RECEIVED, + message, + state, + null, + span + ) + } else if (TT.transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) { + message.metadata.event.type = Type.POSITION + message.metadata.event.action = Action.TIMEOUT_RESERVED // Key position timeouts with payer account id await Kafka.produceGeneralMessage( Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, - Enum.Events.Event.Action.TIMEOUT_RESERVED, + Action.TIMEOUT_RESERVED, message, state, - transferTimeoutList[i].effectedParticipantCurrencyId?.toString(), + TT.effectedParticipantCurrencyId?.toString(), span, Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.TIMEOUT_RESERVED ) } } else { // individual transfer from a bulk - if (transferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { - message.to = message.from + if (TT.transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { message.from = Config.HUB_NAME - message.metadata.event.type = Enum.Events.Event.Type.BULK_PROCESSING - message.metadata.event.action = Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED - await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.BULK_PROCESSING, Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, message, state, null, span) - } else if (transferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) { - message.metadata.event.type = Enum.Events.Event.Type.POSITION - message.metadata.event.action = Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED + message.metadata.event.type = Type.BULK_PROCESSING + message.metadata.event.action = Action.BULK_TIMEOUT_RECEIVED + await Kafka.produceGeneralMessage( + Config.KAFKA_CONFIG, + Producer, + Enum.Kafka.Topics.BULK_PROCESSING, + Action.BULK_TIMEOUT_RECEIVED, + message, + state, + null, + span + ) + } else if (TT.transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) { + message.metadata.event.type = Type.POSITION + message.metadata.event.action = Action.BULK_TIMEOUT_RESERVED // Key position timeouts with payer account id - await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED, message, state, transferTimeoutList[i].payerParticipantCurrencyId?.toString(), span) + await Kafka.produceGeneralMessage( + Config.KAFKA_CONFIG, + Producer, + Enum.Kafka.Topics.POSITION, + Action.BULK_TIMEOUT_RESERVED, + message, + state, + TT.payerParticipantCurrencyId?.toString(), + span + ) } } } catch (err) { - Logger.isErrorEnabled && Logger.error(err) + logger.error('error in _processTimedOutTransfers:', err) const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err) const state = new EventSdk.EventStateMetadata(EventSdk.EventStatusType.failed, fspiopError.apiErrorCode.code, fspiopError.apiErrorCode.message) await span.error(fspiopError, state) @@ -121,6 +160,12 @@ const _processTimedOutTransfers = async (transferTimeoutList) => { } } +/** + * Processes timedOut fxTransfers + * + * @param {TimedOutFxTransfer[]} fxTransferTimeoutList + * @returns {Promise} + */ const _processFxTimedOutTransfers = async (fxTransferTimeoutList) => { const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING) if (!Array.isArray(fxTransferTimeoutList)) { @@ -128,50 +173,55 @@ const _processFxTimedOutTransfers = async (fxTransferTimeoutList) => { { ...fxTransferTimeoutList } ] } - for (let i = 0; i < fxTransferTimeoutList.length; i++) { + for (const fTT of fxTransferTimeoutList) { const span = EventSdk.Tracer.createSpan('cl_fx_transfer_timeout') try { const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription) - const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(fxTransferTimeoutList[i].commitRequestId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state) - const headers = Utility.Http.SwitchDefaultHeaders(fxTransferTimeoutList[i].initiatingFsp, Enum.Http.HeaderResources.FX_TRANSFERS, Config.HUB_NAME, resourceVersions[Enum.Http.HeaderResources.FX_TRANSFERS].contentVersion) - const message = Utility.StreamingProtocol.createMessage(fxTransferTimeoutList[i].commitRequestId, fxTransferTimeoutList[i].counterPartyFsp, fxTransferTimeoutList[i].initiatingFsp, metadata, headers, fspiopError, { id: fxTransferTimeoutList[i].commitRequestId }, `application/vnd.interoperability.${Enum.Http.HeaderResources.FX_TRANSFERS}+json;version=${resourceVersions[Enum.Http.HeaderResources.FX_TRANSFERS].contentVersion}`) - span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.FX_TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED)) + const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(fTT.commitRequestId, Enum.Kafka.Topics.NOTIFICATION, Action.TIMEOUT_RECEIVED, state) + const destination = fTT.externalInitiatingFspName || fTT.initiatingFsp + const source = fTT.externalCounterPartyFspName || fTT.counterPartyFsp + const headers = Utility.Http.SwitchDefaultHeaders(destination, Enum.Http.HeaderResources.FX_TRANSFERS, Config.HUB_NAME, resourceVersions[Enum.Http.HeaderResources.FX_TRANSFERS].contentVersion) + const message = Utility.StreamingProtocol.createMessage(fTT.commitRequestId, destination, source, metadata, headers, fspiopError, { id: fTT.commitRequestId }, `application/vnd.interoperability.${Enum.Http.HeaderResources.FX_TRANSFERS}+json;version=${resourceVersions[Enum.Http.HeaderResources.FX_TRANSFERS].contentVersion}`) + + span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Type.FX_TRANSFER, Action.TIMEOUT_RECEIVED)) await span.audit({ state, metadata, headers, message }, EventSdk.AuditEventAction.start) - if (fxTransferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { - message.to = message.from + + if (fTT.transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { message.from = Config.HUB_NAME // event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED await Kafka.produceGeneralMessage( - Config.KAFKA_CONFIG, Producer, + Config.KAFKA_CONFIG, + Producer, Enum.Kafka.Topics.NOTIFICATION, - Enum.Events.Event.Action.FX_TIMEOUT_RESERVED, + Action.FX_TIMEOUT_RESERVED, message, state, null, span ) - } else if (fxTransferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) { - message.metadata.event.type = Enum.Events.Event.Type.POSITION - message.metadata.event.action = Enum.Events.Event.Action.FX_TIMEOUT_RESERVED + } else if (fTT.transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) { + message.metadata.event.type = Type.POSITION + message.metadata.event.action = Action.FX_TIMEOUT_RESERVED // Key position timeouts with payer account id await Kafka.produceGeneralMessage( - Config.KAFKA_CONFIG, Producer, + Config.KAFKA_CONFIG, + Producer, Enum.Kafka.Topics.POSITION, - Enum.Events.Event.Action.FX_TIMEOUT_RESERVED, + Action.FX_TIMEOUT_RESERVED, message, state, - fxTransferTimeoutList[i].effectedParticipantCurrencyId?.toString(), + fTT.effectedParticipantCurrencyId?.toString(), span, Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.FX_TIMEOUT_RESERVED ) } } catch (err) { - Logger.isErrorEnabled && Logger.error(err) + logger.error('error in _processFxTimedOutTransfers:', err) const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err) const state = new EventSdk.EventStateMetadata(EventSdk.EventStatusType.failed, fspiopError.apiErrorCode.code, fspiopError.apiErrorCode.message) await span.error(fspiopError, state) @@ -206,6 +256,7 @@ const timeout = async () => { const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0 const cleanup = await TimeoutService.cleanupTransferTimeout() const latestTransferStateChange = await TimeoutService.getLatestTransferStateChange() + const fxTimeoutSegment = await TimeoutService.getFxTimeoutSegment() const intervalMax = (latestTransferStateChange && parseInt(latestTransferStateChange.transferStateChangeId)) || 0 const fxIntervalMin = fxTimeoutSegment ? fxTimeoutSegment.value : 0 @@ -213,9 +264,11 @@ const timeout = async () => { const fxCleanup = await TimeoutService.cleanupFxTransferTimeout() const latestFxTransferStateChange = await TimeoutService.getLatestFxTransferStateChange() const fxIntervalMax = (latestFxTransferStateChange && parseInt(latestFxTransferStateChange.fxTransferStateChangeId)) || 0 + const { transferTimeoutList, fxTransferTimeoutList } = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax, fxSegmentId, fxIntervalMin, fxIntervalMax) transferTimeoutList && await _processTimedOutTransfers(transferTimeoutList) fxTransferTimeoutList && await _processFxTimedOutTransfers(fxTransferTimeoutList) + return { intervalMin, cleanup, @@ -227,7 +280,7 @@ const timeout = async () => { fxTransferTimeoutList } } catch (err) { - Logger.isErrorEnabled && Logger.error(err) + logger.error('error in timeout:', err) throw ErrorHandler.Factory.reformatFSPIOPError(err) } finally { running = false @@ -283,7 +336,7 @@ const registerTimeoutHandler = async () => { await timeoutJob.start() return true } catch (err) { - Logger.isErrorEnabled && Logger.error(err) + logger.error('error in registerTimeoutHandler:', err) throw ErrorHandler.Factory.reformatFSPIOPError(err) } } @@ -303,7 +356,7 @@ const registerAllHandlers = async () => { } return true } catch (err) { - Logger.isErrorEnabled && Logger.error(err) + logger.error('error in registerAllHandlers:', err) throw ErrorHandler.Factory.reformatFSPIOPError(err) } } diff --git a/src/handlers/transfers/FxFulfilService.js b/src/handlers/transfers/FxFulfilService.js index 0ca0eea0e..28cdf6227 100644 --- a/src/handlers/transfers/FxFulfilService.js +++ b/src/handlers/transfers/FxFulfilService.js @@ -52,9 +52,9 @@ class FxFulfilService { } async getFxTransferDetails(commitRequestId, functionality) { - const transfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId) + const fxTransfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId) - if (!transfer) { + if (!fxTransfer) { const fspiopError = fspiopErrorFactory.fxTransferNotFound() const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) const eventDetail = { @@ -72,8 +72,8 @@ class FxFulfilService { throw fspiopError } - this.log.debug('fxTransfer is found', { transfer }) - return transfer + this.log.debug('fxTransfer is found', { fxTransfer }) + return fxTransfer } async validateHeaders({ transfer, headers, payload }) { @@ -302,12 +302,13 @@ class FxFulfilService { const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) const eventDetail = { functionality: Type.POSITION, - action + action // FX_ABORT } this.log.warn('FX_ABORT case', { eventDetail, apiFSPIOPError }) await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, action, apiFSPIOPError) const cyrilResult = await this.cyril.processFxAbortMessage(transfer.commitRequestId) + // todo: add externalParticipantId to the message here? this.params.message.value.content.context = { ...this.params.message.value.content.context, diff --git a/src/handlers/transfers/createRemittanceEntity.js b/src/handlers/transfers/createRemittanceEntity.js index c520ce3c5..527c829b9 100644 --- a/src/handlers/transfers/createRemittanceEntity.js +++ b/src/handlers/transfers/createRemittanceEntity.js @@ -62,8 +62,8 @@ const createRemittanceEntity = (isFx) => { }, /** - * A determiningTransferCheckResult. * @typedef {Object} DeterminingTransferCheckResult + * * @property {boolean} determiningTransferExists - Indicates if the determining transfer exists. * @property {Array<{participantName, currencyId}>} participantCurrencyValidationList - List of validations for participant currencies. * @property {Object} [transferRecord] - Determining transfer for the FX transfer (optional). diff --git a/src/lib/proxyCache.js b/src/lib/proxyCache.js index 2413220c1..8c52ebfd0 100644 --- a/src/lib/proxyCache.js +++ b/src/lib/proxyCache.js @@ -35,6 +35,7 @@ const getCache = () => { /** * @typedef {Object} ProxyOrParticipant - An object containing the inScheme status, proxyId and FSP name + * * @property {boolean} inScheme - Is FSP in the scheme. * @property {string|null} proxyId - Proxy, associated with the FSP, if FSP is not in the scheme. * @property {string} name - FSP name. diff --git a/src/models/fxTransfer/fxTransfer.js b/src/models/fxTransfer/fxTransfer.js index 50cd56427..9aa037ee1 100644 --- a/src/models/fxTransfer/fxTransfer.js +++ b/src/models/fxTransfer/fxTransfer.js @@ -9,7 +9,6 @@ const { TABLE_NAMES } = require('../../shared/constants') const Db = require('../../lib/db') const participant = require('../participant/facade') const ParticipantCachedModel = require('../participant/participantCached') -const externalParticipantModel = require('../participant/externalParticipant') const TransferExtensionModel = require('./fxTransferExtension') const { TransferInternalState } = Enum.Transfers @@ -196,6 +195,7 @@ const getAllDetailsByCommitRequestIdForProxiedFxTransfer = async (commitRequestI return transferResult }) } catch (err) { + logger.warn('error in getAllDetailsByCommitRequestIdForProxiedFxTransfer', err) throw ErrorHandler.Factory.reformatFSPIOPError(err) } } @@ -272,8 +272,8 @@ const savePreparedRequest = async ( ledgerEntryTypeId: Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE } if (proxyObligation.isInitiatingFspProxy) { - initiatingParticipantRecord.externalParticipantId = await externalParticipantModel - .getIdByNameOrCreate(proxyObligation.initiatingFspProxyOrParticipantId) + initiatingParticipantRecord.externalParticipantId = await participant + .getExternalParticipantIdByNameOrCreate(proxyObligation.initiatingFspProxyOrParticipantId) } const counterPartyParticipantRecord1 = { @@ -286,8 +286,8 @@ const savePreparedRequest = async ( ledgerEntryTypeId: Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE } if (proxyObligation.isCounterPartyFspProxy) { - counterPartyParticipantRecord1.externalParticipantId = await externalParticipantModel - .getIdByNameOrCreate(proxyObligation.counterPartyFspProxyOrParticipantId) + counterPartyParticipantRecord1.externalParticipantId = await participant + .getExternalParticipantIdByNameOrCreate(proxyObligation.counterPartyFspProxyOrParticipantId) } let counterPartyParticipantRecord2 = null @@ -377,7 +377,6 @@ const savePreparedRequest = async ( } } -// todo: clarify this code const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopError) => { const histTimerSaveFulfilResponseEnd = Metrics.getHistogram( 'fx_model_transfer', @@ -562,10 +561,10 @@ module.exports = { getByDeterminingTransferId, getByIdLight, getAllDetailsByCommitRequestId, + getAllDetailsByCommitRequestIdForProxiedFxTransfer, getFxTransferParticipant, savePreparedRequest, saveFxFulfilResponse, saveFxTransfer, - getAllDetailsByCommitRequestIdForProxiedFxTransfer, updateFxPrepareReservedForwarded } diff --git a/src/models/participant/externalParticipant.js b/src/models/participant/externalParticipant.js index 2215212de..19494103b 100644 --- a/src/models/participant/externalParticipant.js +++ b/src/models/participant/externalParticipant.js @@ -51,10 +51,24 @@ const create = async ({ name, proxyId }) => { return result } catch (err) { log.error('error in create', err) + // If the cache is not up-to-date, then will get an error when inserting a record and that record already exists + // reload the cache at that point. + // todo: to implement above requirement, we need to detect duplication restriction error, and don't rethrow error throw ErrorHandler.Factory.reformatFSPIOPError(err) } } +// const getAll = async (options = {}) => { +// try { +// const result = await Db.from(TABLE).find({}, options) +// log.debug('getAll result:', { result }) +// return result +// } catch (err) { +// log.error('error in getAll:', err) +// throw ErrorHandler.Factory.reformatFSPIOPError(err) +// } +// } + const getOneBy = async (criteria, options) => { try { const result = await Db.from(TABLE).findOne(criteria, options) @@ -80,24 +94,6 @@ const getOneByNameCached = async (name, options = {}) => { return data } -const getIdByNameOrCreate = async ({ name, proxyId }) => { - try { - let dfsp = await getOneByNameCached(name) - if (!dfsp) { - await create({ name, proxyId }) - // todo: check if create returns id (to avoid getOneByNameCached call) - dfsp = await getOneByNameCached(name) - } - const id = dfsp?.[ID_FIELD] - log.verbose('getIdByNameOrCreate result:', { id, name }) - return id - } catch (err) { - log.child({ name, proxyId }).warn('error in getIdByNameOrCreate:', err) - return null - // todo: think, if we need to rethrow an error here? - } -} - const destroyBy = async (criteria) => { try { const result = await Db.from(TABLE).destroy(criteria) @@ -114,7 +110,6 @@ const destroyByName = async (name) => destroyBy({ name }) // todo: think, if we need update method module.exports = { create, - getIdByNameOrCreate, getOneByNameCached, getOneByName, getOneById, diff --git a/src/models/participant/facade.js b/src/models/participant/facade.js index c91d0a06f..7bf80fd8c 100644 --- a/src/models/participant/facade.js +++ b/src/models/participant/facade.js @@ -28,17 +28,20 @@ * @module src/models/participant/facade/ */ -const Db = require('../../lib/db') const Time = require('@mojaloop/central-services-shared').Util.Time +const { Enum } = require('@mojaloop/central-services-shared') const ErrorHandler = require('@mojaloop/central-services-error-handling') const Metrics = require('@mojaloop/central-services-metrics') + +const Db = require('../../lib/db') const Cache = require('../../lib/cache') const ParticipantModelCached = require('../../models/participant/participantCached') const ParticipantCurrencyModelCached = require('../../models/participant/participantCurrencyCached') const ParticipantLimitCached = require('../../models/participant/participantLimitCached') +const externalParticipant = require('../../models/participant/externalParticipant') const Config = require('../../lib/config') const SettlementModelModel = require('../settlement/settlementModel') -const { Enum } = require('@mojaloop/central-services-shared') +const { logger } = require('../../shared/logger') const getByNameAndCurrency = async (name, currencyId, ledgerAccountTypeId, isCurrencyActive) => { const histTimerParticipantGetByNameAndCurrencyEnd = Metrics.getHistogram( @@ -773,6 +776,32 @@ const getAllNonHubParticipantsWithCurrencies = async (trx) => { } } +const getExternalParticipantIdByNameOrCreate = async ({ name, proxyId }) => { + try { + let external = await externalParticipant.getOneByNameCached(name) + if (!external) { + const proxy = await ParticipantModelCached.getByName(proxyId) + if (!proxy) { + throw new Error(`Proxy participant not found: ${proxyId}`) + } + await externalParticipant.create({ + name, + proxyId: proxy.participantId + }) + // todo: - check if create returns id (to avoid getOneByNameCached call) + // - if isCreated === false, re-load all external participants cache + external = await externalParticipant.getOneByNameCached(name) + } + const id = external?.externalParticipantId + logger.verbose('getExternalParticipantIdByNameOrCreate result:', { id, name }) + return id + } catch (err) { + logger.child({ name, proxyId }).warn('error in getExternalParticipantIdByNameOrCreate:', err) + return null + // todo: think, if we need to rethrow an error here? + } +} + module.exports = { addHubAccountAndInitPosition, getByNameAndCurrency, @@ -789,5 +818,6 @@ module.exports = { getParticipantLimitsByParticipantId, getAllAccountsByNameAndCurrency, getLimitsForAllParticipants, - getAllNonHubParticipantsWithCurrencies + getAllNonHubParticipantsWithCurrencies, + getExternalParticipantIdByNameOrCreate } diff --git a/src/models/transfer/facade.js b/src/models/transfer/facade.js index 8b12e32ca..fed26a448 100644 --- a/src/models/transfer/facade.js +++ b/src/models/transfer/facade.js @@ -44,7 +44,6 @@ const Db = require('../../lib/db') const Config = require('../../lib/config') const ParticipantFacade = require('../participant/facade') const ParticipantCachedModel = require('../participant/participantCached') -const externalParticipantModel = require('../participant/externalParticipant') const TransferExtensionModel = require('./transferExtension') const TransferEventAction = Enum.Events.Event.Action @@ -483,7 +482,7 @@ const saveTransferPrepared = async (payload, stateReason = null, hasPassedValida let payerTransferParticipantRecord if (proxyObligation?.isInitiatingFspProxy) { - const externalParticipantId = await externalParticipantModel.getIdByNameOrCreate(proxyObligation.initiatingFspProxyOrParticipantId) + const externalParticipantId = await ParticipantFacade.getExternalParticipantIdByNameOrCreate(proxyObligation.initiatingFspProxyOrParticipantId) // todo: think, what if externalParticipantId is null? payerTransferParticipantRecord = { transferId: payload.transferId, @@ -508,7 +507,7 @@ const saveTransferPrepared = async (payload, stateReason = null, hasPassedValida logger.debug('saveTransferPrepared participants:', { participants }) let payeeTransferParticipantRecord if (proxyObligation?.isCounterPartyFspProxy) { - const externalParticipantId = await externalParticipantModel.getIdByNameOrCreate(proxyObligation.counterPartyFspProxyOrParticipantId) + const externalParticipantId = await ParticipantFacade.getExternalParticipantIdByNameOrCreate(proxyObligation.counterPartyFspProxyOrParticipantId) // todo: think, what if externalParticipantId is null? payeeTransferParticipantRecord = { transferId: payload.transferId, @@ -772,7 +771,8 @@ const _getTransferTimeoutList = async (knex, transactionTimestamp) => { .select('tsc1.transferId') .max('tsc1.transferStateChangeId AS maxTransferStateChangeId') .innerJoin('transferTimeout AS tt1', 'tt1.transferId', 'tsc1.transferId') - .groupBy('tsc1.transferId').as('ts'), 'ts.transferId', 'tt.transferId' + .groupBy('tsc1.transferId') + .as('ts'), 'ts.transferId', 'tt.transferId' ) .innerJoin('transferStateChange AS tsc', 'tsc.transferStateChangeId', 'ts.maxTransferStateChangeId') .innerJoin('transferParticipant AS tp1', function () { @@ -780,11 +780,13 @@ const _getTransferTimeoutList = async (knex, transactionTimestamp) => { .andOn('tp1.transferParticipantRoleTypeId', Enum.Accounts.TransferParticipantRoleType.PAYER_DFSP) .andOn('tp1.ledgerEntryTypeId', Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE) }) + .leftJoin('externalParticipant AS ep1', 'ep1.externalParticipantId', 'tp1.externalParticipantId') .innerJoin('transferParticipant AS tp2', function () { this.on('tp2.transferId', 'tt.transferId') .andOn('tp2.transferParticipantRoleTypeId', Enum.Accounts.TransferParticipantRoleType.PAYEE_DFSP) .andOn('tp2.ledgerEntryTypeId', Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE) }) + .leftJoin('externalParticipant AS ep2', 'ep2.externalParticipantId', 'tp2.externalParticipantId') .innerJoin('participant AS p1', 'p1.participantId', 'tp1.participantId') .innerJoin('participant AS p2', 'p2.participantId', 'tp2.participantId') .innerJoin(knex('transferStateChange AS tsc2') @@ -797,9 +799,18 @@ const _getTransferTimeoutList = async (knex, transactionTimestamp) => { .leftJoin('bulkTransferAssociation AS bta', 'bta.transferId', 'tt.transferId') .where('tt.expirationDate', '<', transactionTimestamp) - .select('tt.*', 'tsc.transferStateId', 'tp1.participantCurrencyId AS payerParticipantCurrencyId', - 'p1.name AS payerFsp', 'p2.name AS payeeFsp', 'tp2.participantCurrencyId AS payeeParticipantCurrencyId', - 'bta.bulkTransferId', 'tpc.participantCurrencyId AS effectedParticipantCurrencyId') + .select( + 'tt.*', + 'tsc.transferStateId', + 'tp1.participantCurrencyId AS payerParticipantCurrencyId', + 'p1.name AS payerFsp', + 'p2.name AS payeeFsp', + 'tp2.participantCurrencyId AS payeeParticipantCurrencyId', + 'bta.bulkTransferId', + 'tpc.participantCurrencyId AS effectedParticipantCurrencyId', + 'ep1.name AS externalPayerName', + 'ep2.name AS externalPayeeName' + ) } const _getFxTransferTimeoutList = async (knex, transactionTimestamp) => { @@ -808,7 +819,8 @@ const _getFxTransferTimeoutList = async (knex, transactionTimestamp) => { .select('ftsc1.commitRequestId') .max('ftsc1.fxTransferStateChangeId AS maxFxTransferStateChangeId') .innerJoin('fxTransferTimeout AS ftt1', 'ftt1.commitRequestId', 'ftsc1.commitRequestId') - .groupBy('ftsc1.commitRequestId').as('fts'), 'fts.commitRequestId', 'ftt.commitRequestId' + .groupBy('ftsc1.commitRequestId') + .as('fts'), 'fts.commitRequestId', 'ftt.commitRequestId' ) .innerJoin('fxTransferStateChange AS ftsc', 'ftsc.fxTransferStateChangeId', 'fts.maxFxTransferStateChangeId') .innerJoin('fxTransferParticipant AS ftp1', function () { @@ -816,12 +828,14 @@ const _getFxTransferTimeoutList = async (knex, transactionTimestamp) => { .andOn('ftp1.transferParticipantRoleTypeId', Enum.Accounts.TransferParticipantRoleType.INITIATING_FSP) .andOn('ftp1.ledgerEntryTypeId', Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE) }) + .leftJoin('externalParticipant AS ep1', 'ep1.externalParticipantId', 'ftp1.externalParticipantId') .innerJoin('fxTransferParticipant AS ftp2', function () { this.on('ftp2.commitRequestId', 'ftt.commitRequestId') .andOn('ftp2.transferParticipantRoleTypeId', Enum.Accounts.TransferParticipantRoleType.COUNTER_PARTY_FSP) .andOn('ftp2.fxParticipantCurrencyTypeId', Enum.Fx.FxParticipantCurrencyType.TARGET) .andOn('ftp2.ledgerEntryTypeId', Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE) }) + .leftJoin('externalParticipant AS ep2', 'ep2.externalParticipantId', 'ftp2.externalParticipantId') .innerJoin('participant AS p1', 'p1.participantId', 'ftp1.participantId') .innerJoin('participant AS p2', 'p2.participantId', 'ftp2.participantId') .innerJoin(knex('fxTransferStateChange AS ftsc2') @@ -831,10 +845,62 @@ const _getFxTransferTimeoutList = async (knex, transactionTimestamp) => { .as('ftpc'), 'ftpc.commitRequestId', 'ftt.commitRequestId' ) .where('ftt.expirationDate', '<', transactionTimestamp) - .select('ftt.*', 'ftsc.transferStateId', 'ftp1.participantCurrencyId AS initiatingParticipantCurrencyId', - 'p1.name AS initiatingFsp', 'p2.name AS counterPartyFsp', 'ftp2.participantCurrencyId AS counterPartyParticipantCurrencyId', 'ftpc.participantCurrencyId AS effectedParticipantCurrencyId') + .select( + 'ftt.*', + 'ftsc.transferStateId', + 'ftp1.participantCurrencyId AS initiatingParticipantCurrencyId', + 'p1.name AS initiatingFsp', + 'p2.name AS counterPartyFsp', + 'ftp2.participantCurrencyId AS counterPartyParticipantCurrencyId', + 'ftpc.participantCurrencyId AS effectedParticipantCurrencyId', + 'ep1.name AS externalInitiatingFspName', + 'ep2.name AS externalCounterPartyFspName' + ) } +/** + * @typedef {Object} TimedOutTransfer + * + * @property {Integer} transferTimeoutId + * @property {String} transferId + * @property {Date} expirationDate + * @property {Date} createdDate + * @property {String} transferStateId + * @property {String} payerFsp + * @property {String} payeeFsp + * @property {Integer} payerParticipantCurrencyId + * @property {Integer} payeeParticipantCurrencyId + * @property {Integer} bulkTransferId + * @property {Integer} effectedParticipantCurrencyId + * @property {String} externalPayerName + * @property {String} externalPayeeName + */ + +/** + * @typedef {Object} TimedOutFxTransfer + * + * @property {Integer} fxTransferTimeoutId + * @property {String} commitRequestId + * @property {Date} expirationDate + * @property {Date} createdDate + * @property {String} transferStateId + * @property {String} initiatingFsp + * @property {String} counterPartyFsp + * @property {Integer} initiatingParticipantCurrencyId + * @property {Integer} counterPartyParticipantCurrencyId + * @property {Integer} effectedParticipantCurrencyId + * @property {String} externalInitiatingFspName + * @property {String} externalCounterPartyFspName + */ + +/** + * Returns the list of transfers/fxTransfers that have timed out + * + * @returns {Promise<{ + * transferTimeoutList: TimedOutTransfer, + * fxTransferTimeoutList: TimedOutFxTransfer + * }>} + */ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax, fxSegmentId, fxIntervalMin, fxIntervalMax) => { try { const transactionTimestamp = Time.getUTCString(new Date()) @@ -850,7 +916,8 @@ const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax, fxSegm .max('transferStateChangeId AS maxTransferStateChangeId') .where('transferStateChangeId', '>', intervalMin) .andWhere('transferStateChangeId', '<=', intervalMax) - .groupBy('transferId').as('ts'), 'ts.transferId', 't.transferId' + .groupBy('transferId') + .as('ts'), 'ts.transferId', 't.transferId' ) .innerJoin('transferStateChange AS tsc', 'tsc.transferStateChangeId', 'ts.maxTransferStateChangeId') .leftJoin('transferTimeout AS tt', 'tt.transferId', 't.transferId') diff --git a/test/fixtures.js b/test/fixtures.js index 44149e5d9..5a12b70ce 100644 --- a/test/fixtures.js +++ b/test/fixtures.js @@ -309,6 +309,31 @@ const mockExternalParticipantDto = ({ ...(createdDate && { createdDate }) }) +/** + * @returns {ProxyObligation} proxyObligation + */ +const mockProxyObligationDto = ({ + isFx = false, + payloadClone = transferDto(), // or fxTransferDto() + proxy1 = null, + proxy2 = null +} = {}) => ({ + isFx, + payloadClone, + isInitiatingFspProxy: !!proxy1, + isCounterPartyFspProxy: !!proxy2, + initiatingFspProxyOrParticipantId: { + inScheme: !proxy1, + proxyId: proxy1, + name: payloadClone.payerFsp || payloadClone.initiatingFsp + }, + counterPartyFspProxyOrParticipantId: { + inScheme: !proxy2, + proxyId: proxy2, + name: payloadClone.payeeFsp || payloadClone.counterPartyFsp + } +}) + module.exports = { ILP_PACKET, CONDITION, @@ -335,5 +360,6 @@ module.exports = { fxFulfilResponseDto, fxtGetAllDetailsByCommitRequestIdDto, watchListItemDto, - mockExternalParticipantDto + mockExternalParticipantDto, + mockProxyObligationDto } diff --git a/test/integration-override/handlers/transfers/fxAbort.test.js b/test/integration-override/handlers/transfers/fxAbort.test.js index a4975c46c..79c44ed7c 100644 --- a/test/integration-override/handlers/transfers/fxAbort.test.js +++ b/test/integration-override/handlers/transfers/fxAbort.test.js @@ -477,7 +477,7 @@ Test('Handlers test', async handlersTest => { }) }) - await handlersTest.test('When only tranfer is sent and followed by transfer abort', async abortTest => { + await handlersTest.test('When only transfer is sent and followed by transfer abort', async abortTest => { const td = await prepareFxTestData(testFxData) await abortTest.test('update transfer state to RESERVED by PREPARE request', async (test) => { diff --git a/test/integration-override/handlers/transfers/fxTimeout.test.js b/test/integration-override/handlers/transfers/fxTimeout.test.js index fbee6d783..35cf021b2 100644 --- a/test/integration-override/handlers/transfers/fxTimeout.test.js +++ b/test/integration-override/handlers/transfers/fxTimeout.test.js @@ -301,7 +301,7 @@ const prepareFxTestData = async (dataObj) => { } } -Test('Handlers test', async handlersTest => { +Test('fxTimeout Handler Tests -->', async fxTimeoutTest => { const startTime = new Date() await Db.connect(Config.DATABASE) await ParticipantCached.initialize() @@ -365,7 +365,7 @@ Test('Handlers test', async handlersTest => { } ]) - await handlersTest.test('Setup kafka consumer should', async registerAllHandlers => { + await fxTimeoutTest.test('Setup kafka consumer should', async registerAllHandlers => { await registerAllHandlers.test('start consumer', async (test) => { // Set up the testConsumer here await testConsumer.startListening() @@ -379,7 +379,7 @@ Test('Handlers test', async handlersTest => { }) }) - await handlersTest.test('fxTransferPrepare should', async fxTransferPrepare => { + await fxTimeoutTest.test('fxTransferPrepare should', async fxTransferPrepare => { await fxTransferPrepare.test('should handle payer initiated conversion fxTransfer', async (test) => { const td = await prepareFxTestData(testFxData) const prepareConfig = Utility.getKafkaConfig( @@ -413,7 +413,7 @@ Test('Handlers test', async handlersTest => { fxTransferPrepare.end() }) - await handlersTest.test('When only fxTransfer is sent, fxTimeout should', async timeoutTest => { + await fxTimeoutTest.test('When only fxTransfer is sent, fxTimeout should', async timeoutTest => { const expiration = new Date((new Date()).getTime() + (10 * 1000)) // 10 seconds const newTestFxData = { ...testFxData, @@ -560,7 +560,7 @@ Test('Handlers test', async handlersTest => { timeoutTest.end() }) - await handlersTest.test('When fxTransfer followed by a transfer are sent, fxTimeout should', async timeoutTest => { + await fxTimeoutTest.test('When fxTransfer followed by a transfer are sent, fxTimeout should', async timeoutTest => { const td = await prepareFxTestData(testFxData) // Modify expiration of only fxTransfer const expiration = new Date((new Date()).getTime() + (10 * 1000)) // 10 seconds @@ -764,7 +764,7 @@ Test('Handlers test', async handlersTest => { timeoutTest.end() }) - await handlersTest.test('teardown', async (assert) => { + await fxTimeoutTest.test('teardown', async (assert) => { try { await Handlers.timeouts.stop() await Cache.destroyCache() @@ -786,7 +786,7 @@ Test('Handlers test', async handlersTest => { assert.fail() assert.end() } finally { - handlersTest.end() + fxTimeoutTest.end() } }) }) diff --git a/test/integration-override/handlers/transfers/prepare/prepare-internals.test.js b/test/integration-override/handlers/transfers/prepare/prepare-internals.test.js new file mode 100644 index 000000000..5071b03d6 --- /dev/null +++ b/test/integration-override/handlers/transfers/prepare/prepare-internals.test.js @@ -0,0 +1,177 @@ +/***** + License + -------------- + Copyright © 2017 Bill & Melinda Gates Foundation + The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + + Contributors + -------------- + This is the official list of the Mojaloop project contributors for this file. + Names of the original copyright holders (individuals or organizations) + should be listed with a '*' in the first column. People who have + contributed from an organization can be listed under the organization + that actually holds the copyright for their contributions (see the + Gates Foundation organization for an example). Those individuals should have + their names indented and be marked with a '-'. Email address can be added + optionally within square brackets . + * Gates Foundation + - Name Surname + + * Eugen Klymniuk + -------------- + **********/ + +const { randomUUID } = require('node:crypto') +const Test = require('tape') + +const prepareHandler = require('#src/handlers/transfers/prepare') +const config = require('#src/lib/config') +const Db = require('#src/lib/db') +const proxyCache = require('#src/lib/proxyCache') +const Cache = require('#src/lib/cache') +const transferFacade = require('#src/models/transfer/facade') +const externalParticipant = require('#src/models/participant/externalParticipant') +const ParticipantCached = require('#src/models/participant/participantCached') +const ParticipantCurrencyCached = require('#src/models/participant/participantCurrencyCached') +const ParticipantLimitCached = require('#src/models/participant/participantLimitCached') +// const { logger } = require('#src/shared/logger/index') + +const participantHelper = require('#test/integration/helpers/participant') +const fixtures = require('#test/fixtures') +const { tryCatchEndTest } = require('#test/util/helpers') + +Test('Prepare Handler internals Tests -->', (prepareHandlerTest) => { + const initiatingFsp = `externalPayer-${Date.now()}` + const counterPartyFsp = `externalPayee-${Date.now()}` + const proxyId1 = `proxy1-${Date.now()}` + const proxyId2 = `proxy2-${Date.now()}` + + const curr1 = 'BWP' + // const curr2 = 'TZS'; + + const transferId = randomUUID() + + prepareHandlerTest.test('setup', tryCatchEndTest(async (t) => { + await Db.connect(config.DATABASE) + await proxyCache.connect() + await ParticipantCached.initialize() + await ParticipantCurrencyCached.initialize() + await ParticipantLimitCached.initialize() + await Cache.initCache() + + const [proxy1, proxy2] = await Promise.all([ + participantHelper.prepareData(proxyId1, curr1, null, false, true), + participantHelper.prepareData(proxyId2, curr1, null, false, true) + ]) + t.ok(proxy1, 'proxy1 is created') + t.ok(proxy2, 'proxy2 is created') + + await Promise.all([ + ParticipantCurrencyCached.update(proxy1.participantCurrencyId, true), + ParticipantCurrencyCached.update(proxy1.participantCurrencyId2, true) + ]) + t.pass('proxy1 currencies are activated') + + const [isPayerAdded, isPayeeAdded] = await Promise.all([ + proxyCache.getCache().addDfspIdToProxyMapping(initiatingFsp, proxyId1), + proxyCache.getCache().addDfspIdToProxyMapping(counterPartyFsp, proxyId2) + ]) + t.ok(isPayerAdded, 'payer is added to proxyCache') + t.ok(isPayeeAdded, 'payee is added to proxyCache') + + t.pass('setup is done') + })) + + prepareHandlerTest.test('should create proxyObligation for inter-scheme fxTransfer', tryCatchEndTest(async (t) => { + const payload = fixtures.fxTransferDto({ initiatingFsp, counterPartyFsp }) + const isFx = true + + const obligation = await prepareHandler.calculateProxyObligation({ + payload, + isFx, + params: {}, + functionality: 'functionality', + action: 'action' + }) + t.equals(obligation.isFx, isFx) + t.equals(obligation.initiatingFspProxyOrParticipantId.inScheme, false) + t.equals(obligation.initiatingFspProxyOrParticipantId.proxyId, proxyId1) + t.equals(obligation.initiatingFspProxyOrParticipantId.name, initiatingFsp) + t.equals(obligation.counterPartyFspProxyOrParticipantId.inScheme, false) + t.equals(obligation.counterPartyFspProxyOrParticipantId.proxyId, proxyId2) + t.equals(obligation.counterPartyFspProxyOrParticipantId.name, counterPartyFsp) + })) + + prepareHandlerTest.test('should save preparedRequest for inter-scheme transfer, and create external participants', tryCatchEndTest(async (t) => { + let [extPayer, extPayee] = await Promise.all([ + externalParticipant.getOneByNameCached(initiatingFsp), + externalParticipant.getOneByNameCached(counterPartyFsp) + ]) + t.equals(extPayer, null) + t.equals(extPayee, null) + + const isFx = false + const payload = fixtures.transferDto({ + transferId, + payerFsp: initiatingFsp, + payeeFsp: counterPartyFsp + }) + const proxyObligation = fixtures.mockProxyObligationDto({ + isFx, + payloadClone: payload, + proxy1: proxyId1, + proxy2: proxyId2 + }) + const determiningTransferCheckResult = { + determiningTransferExistsInTransferList: null, + watchListRecords: [], + participantCurrencyValidationList: [] + } + + await prepareHandler.checkDuplication({ + isFx, + payload, + ID: transferId, + location: {} + }) + await prepareHandler.savePreparedRequest({ + isFx, + payload, + validationPassed: true, + reasons: [], + functionality: 'functionality', + params: {}, + location: {}, + determiningTransferCheckResult, + proxyObligation + }) + + const dbTransfer = await transferFacade.getByIdLight(payload.transferId) + t.ok(dbTransfer, 'transfer is saved') + t.equals(dbTransfer.transferId, transferId, 'dbTransfer.transferId') + + ;[extPayer, extPayee] = await Promise.all([ + externalParticipant.getOneByNameCached(initiatingFsp), + externalParticipant.getOneByNameCached(counterPartyFsp) + ]) + t.ok(extPayer) + t.ok(extPayee) + + const [participant1] = await transferFacade.getTransferParticipant(proxyId1, transferId) + t.equals(participant1.externalParticipantId, extPayer.externalParticipantId) + t.equals(participant1.participantId, extPayer.proxyId) + })) + + prepareHandlerTest.test('teardown', tryCatchEndTest(async (t) => { + await Promise.all([ + Db.disconnect(), + proxyCache.disconnect(), + Cache.destroyCache() + ]) + t.pass('connections are closed') + })) + + prepareHandlerTest.end() +}) diff --git a/test/unit/domain/fx/cyril.test.js b/test/unit/domain/fx/cyril.test.js index 7fb61eb5b..b03161372 100644 --- a/test/unit/domain/fx/cyril.test.js +++ b/test/unit/domain/fx/cyril.test.js @@ -1097,7 +1097,21 @@ Test('Cyril', cyrilTest => { const result = await Cyril.processFxAbortMessage(payload.transferId) - test.deepEqual(result, { positionChanges: [{ isFxTransferStateChange: true, commitRequestId: '88622a75-5bde-4da4-a6cc-f4cd23b268c4', notifyTo: 'fx_dfsp1', participantCurrencyId: 1, amount: -433.88 }, { isFxTransferStateChange: false, transferId: 'c05c3f31-33b5-4e33-8bfd-7c3a2685fb6c', notifyTo: 'dfsp1', participantCurrencyId: 1, amount: -433.88 }] }) + test.deepEqual(result, { + positionChanges: [{ + isFxTransferStateChange: true, + commitRequestId: '88622a75-5bde-4da4-a6cc-f4cd23b268c4', + notifyTo: 'fx_dfsp1', + participantCurrencyId: 1, + amount: -433.88 + }, { + isFxTransferStateChange: false, + transferId: 'c05c3f31-33b5-4e33-8bfd-7c3a2685fb6c', + notifyTo: 'dfsp1', + participantCurrencyId: 1, + amount: -433.88 + }] + }) test.pass('Error not thrown') test.end() } catch (e) { diff --git a/test/unit/models/participant/externalParticipant.test.js b/test/unit/models/participant/externalParticipant.test.js index 8ba7dfb4b..c9c59c072 100644 --- a/test/unit/models/participant/externalParticipant.test.js +++ b/test/unit/models/participant/externalParticipant.test.js @@ -83,32 +83,6 @@ Test('externalParticipant Model Tests -->', (epmTest) => { t.ok(Db[EP_TABLE].findOne.notCalled, 'db.findOne is called') })) - epmTest.test('should get externalParticipant ID from db (no data in cache)', tryCatchEndTest(async (t) => { - const name = `extFsp-${Date.now()}` - const data = mockExternalParticipantDto({ name }) - Db[EP_TABLE].findOne.withArgs({ name }).resolves(data) - - const id = await model.getIdByNameOrCreate({ name }) - t.equal(id, data.externalParticipantId) - })) - - epmTest.test('should create externalParticipant, and get its id from db (if no data in db)', tryCatchEndTest(async (t) => { - const data = mockExternalParticipantDto() - const { name, proxyId } = data - const fspList = [] - Db[EP_TABLE].findOne = async json => (json.name === name && fspList[0]) - Db[EP_TABLE].insert = async json => { if (json.name === name && json.proxyId === proxyId) fspList.push(data) } - - const id = await model.getIdByNameOrCreate({ name, proxyId }) - t.equal(id, data.externalParticipantId) - })) - - epmTest.test('should return null in case of error inside getIdByNameOrCreate method', tryCatchEndTest(async (t) => { - Db[EP_TABLE].findOne.rejects(new Error('DB error')) - const id = await model.getIdByNameOrCreate(mockExternalParticipantDto()) - t.equal(id, null) - })) - epmTest.test('should get externalParticipant by id', tryCatchEndTest(async (t) => { const id = 'id123' const data = { name: 'extFsp', proxyId: '123' } diff --git a/test/unit/models/transfer/facade.test.js b/test/unit/models/transfer/facade.test.js index adc19e77d..0858e40e4 100644 --- a/test/unit/models/transfer/facade.test.js +++ b/test/unit/models/transfer/facade.test.js @@ -1464,193 +1464,6 @@ Test('Transfer facade', async (transferFacadeTest) => { } }) - await timeoutExpireReservedTest.test('perform timeout successfully', async test => { - try { - let segmentId - const intervalMin = 1 - const intervalMax = 10 - let fxSegmentId - const fxIntervalMin = 1 - const fxIntervalMax = 10 - const transferTimeoutListMock = 1 - const fxTransferTimeoutListMock = undefined - const expectedResult = { - transferTimeoutList: transferTimeoutListMock, - fxTransferTimeoutList: fxTransferTimeoutListMock - } - - const knexStub = sandbox.stub() - sandbox.stub(Db, 'getKnex').returns(knexStub) - const trxStub = sandbox.stub() - knexStub.transaction = sandbox.stub().callsArgWith(0, trxStub) - const context = sandbox.stub() - context.from = sandbox.stub().returns({ - innerJoin: sandbox.stub().returns({ - select: sandbox.stub(), - innerJoin: sandbox.stub().returns({ - leftJoin: sandbox.stub().returns({ - leftJoin: sandbox.stub().returns({ - whereNull: sandbox.stub().returns({ - whereIn: sandbox.stub().returns({ - select: sandbox.stub() - }) - }) - }), - whereNull: sandbox.stub().returns({ - whereIn: sandbox.stub().returns({ - select: sandbox.stub() - }) - }) - }), - where: sandbox.stub().returns({ - andWhere: sandbox.stub().returns({ - select: sandbox.stub() - }) - }), - select: sandbox.stub() - }), - where: sandbox.stub().returns({ - select: sandbox.stub() - }) - }) - }) - context.on = sandbox.stub().returns({ - andOn: sandbox.stub().returns({ - andOn: sandbox.stub().returns({ - andOn: sandbox.stub() - }) - }) - }) - knexStub.returns({ - select: sandbox.stub().returns({ - max: sandbox.stub().returns({ - where: sandbox.stub().returns({ - andWhere: sandbox.stub().returns({ - groupBy: sandbox.stub().returns({ - as: sandbox.stub() - }) - }) - }), - innerJoin: sandbox.stub().returns({ - groupBy: sandbox.stub().returns({ - as: sandbox.stub() - }) - }), - groupBy: sandbox.stub().returns({ - as: sandbox.stub() - }) - }), - innerJoin: sandbox.stub().returns({ - innerJoin: sandbox.stub().returns({ - where: sandbox.stub().returns({ - whereIn: sandbox.stub().returns({ - as: sandbox.stub() - }) - }), - as: sandbox.stub() - }), - whereRaw: sandbox.stub().returns({ - whereIn: sandbox.stub().returns({ - as: sandbox.stub() - }) - }) - }) - }), - transacting: sandbox.stub().returns({ - insert: sandbox.stub(), - where: sandbox.stub().returns({ - update: sandbox.stub() - }) - }), - innerJoin: sandbox.stub().returns({ - innerJoin: sandbox.stub().returns({ - innerJoin: sandbox.stub().callsArgOn(1, context).returns({ - innerJoin: sandbox.stub().callsArgOn(1, context).returns({ - innerJoin: sandbox.stub().returns({ - innerJoin: sandbox.stub().returns({ - innerJoin: sandbox.stub().returns({ - where: sandbox.stub().returns({ // This is for _getFxTransferTimeoutList - select: sandbox.stub() - }), - leftJoin: sandbox.stub().returns({ - where: sandbox.stub().returns({ - select: sandbox.stub().returns( - Promise.resolve(transferTimeoutListMock) - ) - }) - }), - innerJoin: sandbox.stub().returns({ - where: sandbox.stub().returns({ // This is for _getFxTransferTimeoutList - select: sandbox.stub() - }), - innerJoin: sandbox.stub().returns({ - where: sandbox.stub().returns({ // This is for _getFxTransferTimeoutList - select: sandbox.stub() - }), - leftJoin: sandbox.stub().returns({ - where: sandbox.stub().returns({ - select: sandbox.stub().returns( - Promise.resolve(transferTimeoutListMock) - ) - }) - }) - }), - leftJoin: sandbox.stub().returns({ - where: sandbox.stub().returns({ - select: sandbox.stub().returns( - Promise.resolve(transferTimeoutListMock) - ) - }) - }) - }) - }) - }) - }) - }) - }) - }) - }) - }) - knexStub.raw = sandbox.stub() - knexStub.from = sandbox.stub().returns({ - transacting: sandbox.stub().returns({ - insert: sandbox.stub().callsArgOn(0, context).returns({ - onConflict: sandbox.stub().returns({ - merge: sandbox.stub() - }) - }) - }) - }) - - let result - try { - segmentId = 0 - fxSegmentId = 0 - result = await TransferFacade.timeoutExpireReserved(segmentId, intervalMin, intervalMax, fxSegmentId, fxIntervalMin, fxIntervalMax) - test.equal(result.transferTimeoutList, expectedResult.transferTimeoutList, 'Expected transferTimeoutList returned.') - test.equal(result.fxTransferTimeoutList, expectedResult.fxTransferTimeoutList, 'Expected fxTransferTimeoutList returned.') - } catch (err) { - Logger.error(`timeoutExpireReserved failed with error - ${err}`) - test.fail() - } - try { - segmentId = 1 - fxSegmentId = 1 - await TransferFacade.timeoutExpireReserved(segmentId, intervalMin, intervalMax, intervalMax, fxSegmentId, fxIntervalMin, fxIntervalMax) - test.equal(result.transferTimeoutList, expectedResult.transferTimeoutList, 'Expected transferTimeoutList returned.') - test.equal(result.fxTransferTimeoutList, expectedResult.fxTransferTimeoutList, 'Expected fxTransferTimeoutList returned.') - } catch (err) { - Logger.error(`timeoutExpireReserved failed with error - ${err}`) - test.fail() - } - test.end() - } catch (err) { - Logger.error(`timeoutExpireReserved failed with error - ${err}`) - test.fail() - test.end() - } - }) - await timeoutExpireReservedTest.end() } catch (err) { Logger.error(`transferFacadeTest failed with error - ${err}`) diff --git a/test/util/helpers.js b/test/util/helpers.js index 19ebcc99d..da32ed8c5 100644 --- a/test/util/helpers.js +++ b/test/util/helpers.js @@ -184,8 +184,8 @@ const tryCatchEndTest = (testFn) => async (t) => { try { await testFn(t) } catch (err) { - logger.error(`error in test: "${t.name}"`, err) - t.fail(t.name) + logger.error(`error in test "${t.name}":`, err) + t.fail(`${t.name} failed due to error: ${err?.message}`) } t.end() }