diff --git a/config/default.json b/config/default.json index a244a7b1f..93d34614a 100644 --- a/config/default.json +++ b/config/default.json @@ -91,7 +91,8 @@ "BULK_PREPARE": null, "COMMIT": null, "BULK_COMMIT": null, - "RESERVE": null + "RESERVE": null, + "TIMEOUT_RESERVED": null } }, "TOPIC_TEMPLATES": { diff --git a/package-lock.json b/package-lock.json index d33a1de38..defc34446 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,7 +20,7 @@ "@mojaloop/central-services-logger": "11.3.1", "@mojaloop/central-services-metrics": "12.0.8", "@mojaloop/central-services-shared": "18.4.0-snapshot.12", - "@mojaloop/central-services-stream": "11.2.5", + "@mojaloop/central-services-stream": "11.2.6", "@mojaloop/database-lib": "11.0.5", "@mojaloop/event-sdk": "14.0.2", "@mojaloop/ml-number": "11.2.4", @@ -36,7 +36,7 @@ "docdash": "2.0.2", "event-stream": "4.0.1", "five-bells-condition": "5.0.1", - "glob": "10.3.12", + "glob": "10.3.15", "hapi-auth-basic": "5.0.0", "hapi-auth-bearer-token": "8.0.0", "hapi-swagger": "17.2.1", @@ -49,7 +49,6 @@ "require-glob": "^4.1.0" }, "devDependencies": { - "async-retry": "1.3.3", "audit-ci": "^6.6.1", "get-port": "5.1.1", "jsdoc": "4.0.3", @@ -1754,9 +1753,9 @@ "integrity": "sha512-/c6rf4UJlmHlC9b5BaNvzAcFv7HZ2QHaV0D4/HNlBdvFnvQq8RI4kYdhyPCl7Xj+oWvTWQ8ujhqS53LIgAe6KQ==" }, "node_modules/@mojaloop/central-services-stream": { - "version": "11.2.5", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-stream/-/central-services-stream-11.2.5.tgz", - "integrity": "sha512-7OfOvXBtBOE2zBLhkIv5gR4BN72sdVEWDyit9uT01pu/1KjNstn3nopErBhjTo2ANgdB4Jx74UMhLlokwl24IQ==", + "version": "11.2.6", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-stream/-/central-services-stream-11.2.6.tgz", + "integrity": "sha512-U94lMqIIEqIjPACimOGzT9I98e7zP8oM2spbHznbc5kUDePjsookXi0xQ4H89OECEr4MoKwykDSTAuxUVtczjg==", "dependencies": { "async": "3.2.5", "async-exit-hook": "2.0.1", @@ -3008,15 +3007,6 @@ "node": ">=0.12.0" } }, - "node_modules/async-retry": { - "version": "1.3.3", - "resolved": "https://registry.npmjs.org/async-retry/-/async-retry-1.3.3.tgz", - "integrity": "sha512-wfr/jstw9xNi/0teMHrRW7dsz3Lt5ARhYNZ2ewpadnhaIp5mbALhOAP+EAdsC7t4Z6wqsDVv9+W6gm1Dk9mEyw==", - "dev": true, - "dependencies": { - "retry": "0.13.1" - } - }, "node_modules/asynciterator.prototype": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/asynciterator.prototype/-/asynciterator.prototype-1.0.0.tgz", @@ -7241,21 +7231,21 @@ "dev": true }, "node_modules/glob": { - "version": "10.3.12", - "resolved": "https://registry.npmjs.org/glob/-/glob-10.3.12.tgz", - "integrity": "sha512-TCNv8vJ+xz4QiqTpfOJA7HvYv+tNIRHKfUWw/q+v2jdgN4ebz+KY9tGx5J4rHP0o84mNP+ApH66HRX8us3Khqg==", + "version": "10.3.15", + "resolved": "https://registry.npmjs.org/glob/-/glob-10.3.15.tgz", + "integrity": "sha512-0c6RlJt1TICLyvJYIApxb8GsXoai0KUP7AxKKAtsYXdgJR1mGEUa7DgwShbdk1nly0PYoZj01xd4hzbq3fsjpw==", "dependencies": { "foreground-child": "^3.1.0", "jackspeak": "^2.3.6", "minimatch": "^9.0.1", "minipass": "^7.0.4", - "path-scurry": "^1.10.2" + "path-scurry": "^1.11.0" }, "bin": { "glob": "dist/esm/bin.mjs" }, "engines": { - "node": ">=16 || 14 >=14.17" + "node": ">=16 || 14 >=14.18" }, "funding": { "url": "https://github.com/sponsors/isaacs" @@ -12603,24 +12593,24 @@ "integrity": "sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw==" }, "node_modules/path-scurry": { - "version": "1.10.2", - "resolved": "https://registry.npmjs.org/path-scurry/-/path-scurry-1.10.2.tgz", - "integrity": "sha512-7xTavNy5RQXnsjANvVvMkEjvloOinkAjv/Z6Ildz9v2RinZ4SBKTWFOVRbaF8p0vpHnyjV/UwNDdKuUv6M5qcA==", + "version": "1.11.1", + "resolved": "https://registry.npmjs.org/path-scurry/-/path-scurry-1.11.1.tgz", + "integrity": "sha512-Xa4Nw17FS9ApQFJ9umLiJS4orGjm7ZzwUrwamcGQuHSzDyth9boKDaycYdDcZDuqYATXw4HFXgaqWTctW/v1HA==", "dependencies": { "lru-cache": "^10.2.0", "minipass": "^5.0.0 || ^6.0.2 || ^7.0.0" }, "engines": { - "node": ">=16 || 14 >=14.17" + "node": ">=16 || 14 >=14.18" }, "funding": { "url": "https://github.com/sponsors/isaacs" } }, "node_modules/path-scurry/node_modules/lru-cache": { - "version": "10.2.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.2.0.tgz", - "integrity": "sha512-2bIM8x+VAf6JT4bKAljS1qUWgMsqZRPGJS6FSahIMPVvctcNhyVp7AJu7quxOW9jwkryBReKZY5tY5JYv2n/7Q==", + "version": "10.2.2", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.2.2.tgz", + "integrity": "sha512-9hp3Vp2/hFQUiIwKo8XCeFVnrg8Pk3TYNPIR7tJADKi5YfcF7vEaK7avFHTlSy3kOKYaJQaalfEo6YuXdceBOQ==", "engines": { "node": "14 || >=16.14" } @@ -14083,15 +14073,6 @@ "through": "~2.3.4" } }, - "node_modules/retry": { - "version": "0.13.1", - "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", - "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==", - "dev": true, - "engines": { - "node": ">= 4" - } - }, "node_modules/reusify": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", diff --git a/package.json b/package.json index 5f8955f96..ea9485fe6 100644 --- a/package.json +++ b/package.json @@ -92,7 +92,7 @@ "@mojaloop/central-services-logger": "11.3.1", "@mojaloop/central-services-metrics": "12.0.8", "@mojaloop/central-services-shared": "18.4.0-snapshot.12", - "@mojaloop/central-services-stream": "11.2.5", + "@mojaloop/central-services-stream": "11.2.6", "@mojaloop/database-lib": "11.0.5", "@mojaloop/event-sdk": "14.0.2", "@mojaloop/ml-number": "11.2.4", @@ -108,7 +108,7 @@ "docdash": "2.0.2", "event-stream": "4.0.1", "five-bells-condition": "5.0.1", - "glob": "10.3.12", + "glob": "10.3.15", "hapi-auth-basic": "5.0.0", "hapi-auth-bearer-token": "8.0.0", "hapi-swagger": "17.2.1", @@ -124,7 +124,6 @@ "mysql": "2.18.1" }, "devDependencies": { - "async-retry": "1.3.3", "audit-ci": "^6.6.1", "get-port": "5.1.1", "jsdoc": "4.0.3", diff --git a/src/domain/position/binProcessor.js b/src/domain/position/binProcessor.js index 7f3b2c67b..26924b457 100644 --- a/src/domain/position/binProcessor.js +++ b/src/domain/position/binProcessor.js @@ -37,6 +37,7 @@ const PositionPrepareDomain = require('./prepare') const PositionFxPrepareDomain = require('./fx-prepare') const PositionFulfilDomain = require('./fulfil') const PositionFxFulfilDomain = require('./fx-fulfil') +const PositionTimeoutReservedDomain = require('./timeout-reserved') const SettlementModelCached = require('../../models/settlement/settlementModelCached') const Enum = require('@mojaloop/central-services-shared').Enum const ErrorHandler = require('@mojaloop/central-services-error-handling') @@ -105,8 +106,15 @@ const processBins = async (bins, trx) => { array2.every((element) => array1.includes(element)) // If non-prepare/non-commit action found, log error // We need to remove this once we implement all the actions - if (!isSubset([Enum.Events.Event.Action.PREPARE, Enum.Events.Event.Action.FX_PREPARE, Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.RESERVE, Enum.Events.Event.Action.FX_RESERVE], actions)) { - Logger.isErrorEnabled && Logger.error('Only prepare/fx-prepare/commit actions are allowed in a batch') + if (!isSubset([ + Enum.Events.Event.Action.PREPARE, + Enum.Events.Event.Action.FX_PREPARE, + Enum.Events.Event.Action.COMMIT, + Enum.Events.Event.Action.RESERVE, + Enum.Events.Event.Action.FX_RESERVE, + Enum.Events.Event.Action.TIMEOUT_RESERVED + ], actions)) { + Logger.isErrorEnabled && Logger.error('Only prepare/fx-prepare/commit/reserve/timeout reserved actions are allowed in a batch') } const settlementParticipantPosition = positions[accountIdMap[accountID].settlementCurrencyId].value @@ -165,6 +173,24 @@ const processBins = async (bins, trx) => { notifyMessages = notifyMessages.concat(fulfilActionResult.notifyMessages) followupMessages = followupMessages.concat(fulfilActionResult.followupMessages) + // If timeout-reserved action found then call processPositionTimeoutReserveBin function + const timeoutReservedActionResult = await PositionTimeoutReservedDomain.processPositionTimeoutReservedBin( + accountBin[Enum.Events.Event.Action.TIMEOUT_RESERVED], + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + latestTransferInfoByTransferId + ) + + // Update accumulated values + accumulatedPositionValue = timeoutReservedActionResult.accumulatedPositionValue + accumulatedPositionReservedValue = timeoutReservedActionResult.accumulatedPositionReservedValue + accumulatedTransferStates = timeoutReservedActionResult.accumulatedTransferStates + // Append accumulated arrays + accumulatedTransferStateChanges = accumulatedTransferStateChanges.concat(timeoutReservedActionResult.accumulatedTransferStateChanges) + accumulatedPositionChanges = accumulatedPositionChanges.concat(timeoutReservedActionResult.accumulatedPositionChanges) + notifyMessages = notifyMessages.concat(timeoutReservedActionResult.notifyMessages) + // If prepare action found then call processPositionPrepareBin function const prepareActionResult = await PositionPrepareDomain.processPositionPrepareBin( accountBin.prepare, @@ -299,6 +325,8 @@ const _getTransferIdList = async (bins) => { } else if (action === Enum.Events.Event.Action.RESERVE) { transferIdList.push(item.message.value.content.uriParams.id) reservedActionTransferIdList.push(item.message.value.content.uriParams.id) + } else if (action === Enum.Events.Event.Action.TIMEOUT_RESERVED) { + transferIdList.push(item.message.value.content.uriParams.id) } else if (action === Enum.Events.Event.Action.FX_PREPARE) { commitRequestIdList.push(item.decodedPayload.commitRequestId) } else if (action === Enum.Events.Event.Action.FX_RESERVE) { diff --git a/src/domain/position/timeout-reserved.js b/src/domain/position/timeout-reserved.js new file mode 100644 index 000000000..d5bf17dfd --- /dev/null +++ b/src/domain/position/timeout-reserved.js @@ -0,0 +1,155 @@ +const { Enum } = require('@mojaloop/central-services-shared') +const ErrorHandler = require('@mojaloop/central-services-error-handling') +const Config = require('../../lib/config') +const Utility = require('@mojaloop/central-services-shared').Util +const MLNumber = require('@mojaloop/ml-number') +const Logger = require('@mojaloop/central-services-logger') + +/** + * @function processPositionTimeoutReservedBin + * + * @async + * @description This is the domain function to process a bin of timeout-reserved messages of a single participant account. + * + * @param {array} timeoutReservedBins - an array containing timeout-reserved action bins + * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing + * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency + * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. + * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed + */ +const processPositionTimeoutReservedBin = async ( + timeoutReservedBins, + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + transferInfoList +) => { + const transferStateChanges = [] + const participantPositionChanges = [] + const resultMessages = [] + const accumulatedTransferStatesCopy = Object.assign({}, accumulatedTransferStates) + let runningPosition = new MLNumber(accumulatedPositionValue) + // Position action RESERVED_TIMEOUT event messages are keyed with payer account id. + // We need to revert the payer's position for the amount of the transfer. + // We need to notify the payee of the timeout. + if (timeoutReservedBins && timeoutReservedBins.length > 0) { + for (const binItem of timeoutReservedBins) { + Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::binItem: ${JSON.stringify(binItem.message.value)}`) + const transferId = binItem.message.value.content.uriParams.id + const payeeFsp = binItem.message.value.to + const payerFsp = binItem.message.value.from + + // If the transfer is not in `RESERVED_TIMEOUT`, a position timeout-reserved message was incorrectly published. + // i.e Something has gone extremely wrong. + if (accumulatedTransferStates[transferId] !== Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) { + throw ErrorHandler.Factory.createInternalServerFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR.message) + } else { + Logger.isDebugEnabled && Logger.debug(`accumulatedTransferStates: ${JSON.stringify(accumulatedTransferStates)}`) + + const transferAmount = transferInfoList[transferId].amount + + // Construct payee notification message + const resultMessage = _constructTimeoutReservedResultMessage( + binItem, + transferId, + payeeFsp, + payerFsp + ) + Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::resultMessage: ${JSON.stringify(resultMessage)}`) + + // Revert payer's position for the amount of the transfer + const { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } = + _handleParticipantPositionChange(runningPosition, transferAmount, transferId, accumulatedPositionReservedValue) + Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`) + runningPosition = updatedRunningPosition + binItem.result = { success: true } + participantPositionChanges.push(participantPositionChange) + transferStateChanges.push(transferStateChange) + accumulatedTransferStatesCopy[transferId] = transferStateId + resultMessages.push({ binItem, message: resultMessage }) + } + } + } + + return { + accumulatedPositionValue: runningPosition.toNumber(), + accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after fulfil processing + accumulatedPositionReservedValue, // not used but kept for consistency + accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order + accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order + notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message} + } +} + +const _constructTimeoutReservedResultMessage = (binItem, transferId, payeeFsp, payerFsp) => { + // IMPORTANT: This singular message is taken by the ml-api-adapter and used to + // notify the payer and payee of the timeout. + // As long as the `to` and `from` message values are the payer and payee, + // and the action is `timeout-reserved`, the ml-api-adapter will notify both. + // Create a FSPIOPError object for timeout payee notification + const fspiopError = ErrorHandler.Factory.createFSPIOPError( + ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED, + null, + null, + null, + null + ).toApiErrorObject(Config.ERROR_HANDLING) + + const state = Utility.StreamingProtocol.createEventState( + Enum.Events.EventStatus.FAILURE.status, + fspiopError.errorInformation.errorCode, + fspiopError.errorInformation.errorDescription + ) + + // Create metadata for the message, associating the payee notification + // with the position event timeout-reserved action + const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent( + transferId, + Enum.Kafka.Topics.POSITION, + Enum.Events.Event.Action.TIMEOUT_RESERVED, + state + ) + const resultMessage = Utility.StreamingProtocol.createMessage( + transferId, + payeeFsp, + payerFsp, + metadata, + binItem.message.value.content.headers, // Headers don't really matter here. ml-api-adapter will ignore them and create their own. + fspiopError, + { id: transferId }, + 'application/json' + ) + + return resultMessage +} + +const _handleParticipantPositionChange = (runningPosition, transferAmount, transferId, accumulatedPositionReservedValue) => { + // NOTE: The transfer info amount is pulled from the payee records in a batch `SELECT` query. + // And will have a negative value. We add that value to the payer's position + // to revert the position for the amount of the transfer. + const transferStateId = Enum.Transfers.TransferInternalState.EXPIRED_RESERVED + // Revert payer's position for the amount of the transfer + const updatedRunningPosition = new MLNumber(runningPosition.add(transferAmount).toFixed(Config.AMOUNT.SCALE)) + Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::_handleParticipantPositionChange::updatedRunningPosition: ${updatedRunningPosition.toString()}`) + Logger.isDebugEnabled && Logger.debug(`processPositionTimeoutReservedBin::_handleParticipantPositionChange::transferAmount: ${transferAmount}`) + // Construct participant position change object + const participantPositionChange = { + transferId, // Need to delete this in bin processor while updating transferStateChangeId + transferStateChangeId: null, // Need to update this in bin processor while executing queries + value: updatedRunningPosition.toNumber(), + reservedValue: accumulatedPositionReservedValue + } + + // Construct transfer state change object + const transferStateChange = { + transferId, + transferStateId, + reason: ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message + } + return { participantPositionChange, transferStateChange, transferStateId, updatedRunningPosition } +} + +module.exports = { + processPositionTimeoutReservedBin +} diff --git a/src/handlers/positions/handler.js b/src/handlers/positions/handler.js index 21c678cc9..d32f7e135 100644 --- a/src/handlers/positions/handler.js +++ b/src/handlers/positions/handler.js @@ -234,7 +234,14 @@ const positions = async (error, messages) => { } await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange) const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED, null, null, null, payload.extensionList) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail }) + await Kafka.proceed( + Config.KAFKA_CONFIG, + params, + { + consumerCommit, + fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), + eventDetail + }) throw fspiopError } } else { diff --git a/src/handlers/timeouts/handler.js b/src/handlers/timeouts/handler.js index 0bd1b2e86..e9ff41dca 100644 --- a/src/handlers/timeouts/handler.js +++ b/src/handlers/timeouts/handler.js @@ -100,7 +100,17 @@ const timeout = async () => { message.metadata.event.type = Enum.Events.Event.Type.POSITION message.metadata.event.action = Enum.Events.Event.Action.TIMEOUT_RESERVED // Key position timeouts with payer account id - await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, Enum.Events.Event.Action.TIMEOUT_RESERVED, message, state, result[i].payerParticipantCurrencyId?.toString(), span) + await Kafka.produceGeneralMessage( + Config.KAFKA_CONFIG, + Producer, + Enum.Kafka.Topics.POSITION, + Enum.Events.Event.Action.TIMEOUT_RESERVED, + message, + state, + result[i].payerParticipantCurrencyId?.toString(), + span, + Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.TIMEOUT_RESERVED + ) } } else { // individual transfer from a bulk if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { diff --git a/test/integration-override/handlers/positions/handlerBatch.test.js b/test/integration-override/handlers/positions/handlerBatch.test.js index 29fed9ff2..5127592d7 100644 --- a/test/integration-override/handlers/positions/handlerBatch.test.js +++ b/test/integration-override/handlers/positions/handlerBatch.test.js @@ -1591,6 +1591,7 @@ Test('Handlers test', async handlersTest => { testConsumer.clearEvents() test.end() }) + await transferPositionPrepare.test('process batch of fx prepare/ fx reserve messages with mixed keys (accountIds) and update transfer state to COMMITTED', async (test) => { // Construct test data for 10 transfers. Default object contains 10 transfers. const td = await prepareTestData(testFxData) @@ -1684,6 +1685,147 @@ Test('Handlers test', async handlersTest => { testConsumer.clearEvents() test.end() }) + + await transferPositionPrepare.test('timeout should', async timeoutTest => { + const td = await prepareTestData(testData) + + await timeoutTest.test('update transfer state to RESERVED by PREPARE request', async (test) => { + // Produce prepare messages for transfersArray + for (const transfer of td.transfersArray) { + transfer.messageProtocolPrepare.content.payload.expiration = new Date((new Date()).getTime() + (5 * 1000)) // 4 seconds + await Producer.produceMessage(transfer.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) + } + await new Promise(resolve => setTimeout(resolve, 2500)) + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'prepare' + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + + // filter positionPrepare messages where destination is not Hub + const positionPrepareFiltered = positionPrepare.filter((notification) => notification.to !== 'Hub') + test.equal(positionPrepareFiltered.length, 10, 'Notification Messages received for all 10 transfers') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + const tests = async (totalTransferAmounts) => { + for (const value of Object.values(totalTransferAmounts)) { + const payerCurrentPosition = await ParticipantService.getPositionByParticipantCurrencyId(value.payer.participantCurrencyId) || {} + const payerInitialPosition = value.payer.payerLimitAndInitialPosition.participantPosition.value + const payerExpectedPosition = payerInitialPosition + value.totalTransferAmount + const payerPositionChange = await ParticipantService.getPositionChangeByParticipantPositionId(payerCurrentPosition.participantPositionId) || {} + test.equal(payerCurrentPosition.value, payerExpectedPosition, 'Payer position incremented by transfer amount and updated in participantPosition') + test.equal(payerPositionChange.value, payerCurrentPosition.value, 'Payer position change value inserted and matches the updated participantPosition value') + } + } + + try { + const totalTransferAmounts = {} + for (const tdTest of td.transfersArray) { + const transfer = await TransferService.getById(tdTest.messageProtocolPrepare.content.payload.transferId) || {} + if (transfer?.transferState !== TransferState.RESERVED) { + if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) + throw ErrorHandler.Factory.createFSPIOPError( + ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, + `#1 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail. TRANSFER STATE: ${transfer?.transferState}` + ) + } + totalTransferAmounts[tdTest.payer.participantCurrencyId] = { + payer: tdTest.payer, + totalTransferAmount: ( + (totalTransferAmounts[tdTest.payer.participantCurrencyId] && + totalTransferAmounts[tdTest.payer.participantCurrencyId].totalTransferAmount) || 0 + ) + tdTest.transferPayload.amount.amount + } + } + await tests(totalTransferAmounts) + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + testConsumer.clearEvents() + test.end() + }) + + await timeoutTest.test('update transfer after timeout with timeout status & error', async (test) => { + for (const tf of td.transfersArray) { + // Re-try function with conditions + const inspectTransferState = async () => { + try { + // Fetch Transfer record + const transfer = await TransferService.getById(tf.messageProtocolPrepare.content.payload.transferId) || {} + + // Check Transfer for correct state + if (transfer?.transferState === Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) { + // We have a Transfer with the correct state, lets check if we can get the TransferError record + try { + // Fetch the TransferError record + const transferError = await TransferService.getTransferErrorByTransferId(tf.messageProtocolPrepare.content.payload.transferId) + // TransferError record found, so lets return it + return { + transfer, + transferError + } + } catch (err) { + // NO TransferError record found, so lets return the transfer and the error + return { + transfer, + err + } + } + } else { + // NO Transfer with the correct state was found, so we return false + return false + } + } catch (err) { + // NO Transfer with the correct state was found, so we return false + Logger.error(err) + return false + } + } + const result = await wrapWithRetries( + inspectTransferState, + wrapWithRetriesConf.remainingRetries, + wrapWithRetriesConf.timeout + ) + + // Assert + if (result === false) { + test.fail(`Transfer['${tf.messageProtocolPrepare.content.payload.transferId}'].TransferState failed to transition to ${Enum.Transfers.TransferInternalState.EXPIRED_RESERVED}`) + } else { + test.equal(result.transfer && result.transfer?.transferState, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED, `Transfer['${tf.messageProtocolPrepare.content.payload.transferId}'].TransferState = ${Enum.Transfers.TransferInternalState.EXPIRED_RESERVED}`) + test.equal(result.transferError && result.transferError.errorCode, ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code, `Transfer['${tf.messageProtocolPrepare.content.payload.transferId}'].transferError.errorCode = ${ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.code}`) + test.equal(result.transferError && result.transferError.errorDescription, ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message, `Transfer['${tf.messageProtocolPrepare.content.payload.transferId}'].transferError.errorDescription = ${ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message}`) + test.pass() + } + } + test.end() + }) + + await timeoutTest.test('position resets after a timeout', async (test) => { + // Arrange + for (const payer of td.payerList) { + const payerInitialPosition = payer.payerLimitAndInitialPosition.participantPosition.value + // Act + const payerPositionDidReset = async () => { + const payerCurrentPosition = await ParticipantService.getPositionByParticipantCurrencyId(payer.participantCurrencyId) + console.log(payerCurrentPosition) + return payerCurrentPosition.value === payerInitialPosition + } + // wait until we know the position reset, or throw after 5 tries + await wrapWithRetries(payerPositionDidReset, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + const payerCurrentPosition = await ParticipantService.getPositionByParticipantCurrencyId(payer.participantCurrencyId) || {} + + // Assert + test.equal(payerCurrentPosition.value, payerInitialPosition, 'Position resets after a timeout') + } + + test.end() + }) + + timeoutTest.end() + }) transferPositionPrepare.end() }) diff --git a/test/integration/handlers/transfers/handlers.test.js b/test/integration/handlers/transfers/handlers.test.js index a6d5d97de..6a95ce16f 100644 --- a/test/integration/handlers/transfers/handlers.test.js +++ b/test/integration/handlers/transfers/handlers.test.js @@ -27,7 +27,6 @@ const Test = require('tape') const { randomUUID } = require('crypto') -const retry = require('async-retry') const Logger = require('@mojaloop/central-services-logger') const Config = require('#src/lib/config') const Time = require('@mojaloop/central-services-shared').Util.Time @@ -1004,14 +1003,15 @@ Test('Handlers test', async handlersTest => { } try { - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#1 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) @@ -1044,14 +1044,15 @@ Test('Handlers test', async handlersTest => { } try { - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferState.COMMITTED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#2 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) @@ -1103,14 +1104,15 @@ Test('Handlers test', async handlersTest => { } try { - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#1 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) @@ -1141,14 +1143,15 @@ Test('Handlers test', async handlersTest => { } try { - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferState.COMMITTED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#2 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) @@ -1179,14 +1182,15 @@ Test('Handlers test', async handlersTest => { } try { - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#3 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) @@ -1218,14 +1222,15 @@ Test('Handlers test', async handlersTest => { } try { - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferInternalState.ABORTED_REJECTED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#4 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) @@ -1257,14 +1262,15 @@ Test('Handlers test', async handlersTest => { } try { - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#5 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) @@ -1304,14 +1310,15 @@ Test('Handlers test', async handlersTest => { } try { - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferInternalState.ABORTED_ERROR) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `#6 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) @@ -1366,20 +1373,15 @@ Test('Handlers test', async handlersTest => { } try { - const retryTimeoutOpts = { - retries: Number(retryOpts.retries) * 2, - minTimeout: retryOpts.minTimeout, - maxTimeout: retryOpts.maxTimeout - } - - await retry(async () => { // use bail(new Error('to break before max retries')) + await wrapWithRetries(async () => { const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} if (transfer?.transferState !== TransferState.RESERVED) { if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - throw new Error(`#7 Max retry count ${retryCount} reached after ${retryCount * retryDelay / 1000}s. Tests fail`) + return null } - return tests() - }, retryTimeoutOpts) + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + await tests() } catch (err) { Logger.error(err) test.fail(err.message) diff --git a/test/scripts/test-integration.sh b/test/scripts/test-integration.sh index 5224f3b73..3743ac66d 100644 --- a/test/scripts/test-integration.sh +++ b/test/scripts/test-integration.sh @@ -60,6 +60,8 @@ echo "Starting Service in the background" export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE='topic-transfer-position-batch' export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__COMMIT='topic-transfer-position-batch' export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__RESERVE='topic-transfer-position-batch' +export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__TIMEOUT_RESERVED='topic-transfer-position-batch' + npm start > ./test/results/cl-service-override.log & ## Store PID for cleanup echo $! > /tmp/int-test-service.pid @@ -69,6 +71,7 @@ echo $! > /tmp/int-test-handler.pid unset CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE unset CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__COMMIT unset CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__RESERVE +unset CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__TIMEOUT_RESERVED PID1=$(cat /tmp/int-test-service.pid) echo "Service started with Process ID=$PID1" diff --git a/test/unit/domain/position/binProcessor.test.js b/test/unit/domain/position/binProcessor.test.js index 16159cacd..9caf825b3 100644 --- a/test/unit/domain/position/binProcessor.test.js +++ b/test/unit/domain/position/binProcessor.test.js @@ -60,7 +60,7 @@ const prepareTransfers = [ ...prepareTransfersBin2 ] -const fulfillTransfers = [ +const fulfilTransfers = [ '4830fa00-0c2a-4de1-9640-5ad4e68f5f62', '33d42717-1dc9-4224-8c9b-45aab4fe6457', 'f33add51-38b1-4715-9876-83d8a08c485d', @@ -69,6 +69,10 @@ const fulfillTransfers = [ 'fe332218-07d6-4f00-8399-76671594697a' ] +const timeoutReservedTransfers = [ + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5' +] + Test('BinProcessor', async (binProcessorTest) => { let sandbox binProcessorTest.beforeEach(async test => { @@ -79,10 +83,13 @@ Test('BinProcessor', async (binProcessorTest) => { sandbox.stub(participantFacade) const prepareTransfersStates = Object.fromEntries(prepareTransfers.map((transferId) => [transferId, { transferStateChangeId: 1, transferStateId: Enum.Transfers.TransferInternalState.RECEIVED_PREPARE }])) - const fulfillTransfersStates = Object.fromEntries(fulfillTransfers.map((transferId) => [transferId, { transferStateChangeId: 1, transferStateId: Enum.Transfers.TransferInternalState.RECEIVED_FULFIL }])) + const fulfilTransfersStates = Object.fromEntries(fulfilTransfers.map((transferId) => [transferId, { transferStateChangeId: 1, transferStateId: Enum.Transfers.TransferInternalState.RECEIVED_FULFIL }])) + const timeoutReservedTransfersStates = Object.fromEntries(timeoutReservedTransfers.map((transferId) => [transferId, { transferStateChangeId: 1, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT }])) + BatchPositionModel.getLatestTransferStateChangesByTransferIdList.returns({ ...prepareTransfersStates, - ...fulfillTransfersStates + ...fulfilTransfersStates, + ...timeoutReservedTransfersStates }) BatchPositionModelCached.getParticipantCurrencyByIds.returns([ @@ -363,6 +370,9 @@ Test('BinProcessor', async (binProcessorTest) => { }, 'fe332218-07d6-4f00-8399-76671594697a': { amount: -2 + }, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { + amount: -50 } }) @@ -434,7 +444,7 @@ Test('BinProcessor', async (binProcessorTest) => { const result = await BinProcessor.processBins(sampleBins, trx) // Assert on result.notifyMessages - test.equal(result.notifyMessages.length, 13, 'processBins should return the expected number of notify messages') + test.equal(result.notifyMessages.length, 14, 'processBins should return the expected number of notify messages') // Assert on result.limitAlarms // test.equal(result.limitAlarms.length, 1, 'processBin should return the expected number of limit alarms') @@ -447,7 +457,7 @@ Test('BinProcessor', async (binProcessorTest) => { // Assert on DB update for position values of all accounts in each function call test.deepEqual(BatchPositionModel.updateParticipantPosition.getCalls().map(call => call.args), [ - [{}, 7, 0, 0], + [{}, 7, -50, 0], [{}, 15, 2, 0] ], 'updateParticipantPosition should be called with the expected arguments') @@ -479,6 +489,8 @@ Test('BinProcessor', async (binProcessorTest) => { sampleBinsDeepCopy[15].commit = [] sampleBinsDeepCopy[7].reserve = [] sampleBinsDeepCopy[15].reserve = [] + sampleBinsDeepCopy[7]['timeout-reserved'] = [] + sampleBinsDeepCopy[15]['timeout-reserved'] = [] const result = await BinProcessor.processBins(sampleBinsDeepCopy, trx) // Assert on result.notifyMessages @@ -526,6 +538,8 @@ Test('BinProcessor', async (binProcessorTest) => { sampleBinsDeepCopy[15].prepare = [] sampleBinsDeepCopy[7].reserve = [] sampleBinsDeepCopy[15].reserve = [] + sampleBinsDeepCopy[7]['timeout-reserved'] = [] + sampleBinsDeepCopy[15]['timeout-reserved'] = [] const result = await BinProcessor.processBins(sampleBinsDeepCopy, trx) // Assert on result.notifyMessages @@ -571,6 +585,8 @@ Test('BinProcessor', async (binProcessorTest) => { sampleBinsDeepCopy[15].prepare = [] sampleBinsDeepCopy[7].commit = [] sampleBinsDeepCopy[15].commit = [] + sampleBinsDeepCopy[7]['timeout-reserved'] = [] + sampleBinsDeepCopy[15]['timeout-reserved'] = [] const result = await BinProcessor.processBins(sampleBinsDeepCopy, trx) // Assert on result.notifyMessages @@ -595,6 +611,53 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) + prepareActionTest.test('processBins should handle timeout-reserved messages', async (test) => { + const sampleParticipantLimitReturnValues = [ + { + participantId: 2, + currencyId: 'USD', + participantLimitTypeId: 1, + value: 1000000 + }, + { + participantId: 3, + currencyId: 'USD', + participantLimitTypeId: 1, + value: 1000000 + } + ] + participantFacade.getParticipantLimitByParticipantCurrencyLimit.returns(sampleParticipantLimitReturnValues.shift()) + const sampleBinsDeepCopy = JSON.parse(JSON.stringify(sampleBins)) + sampleBinsDeepCopy[7].prepare = [] + sampleBinsDeepCopy[15].prepare = [] + sampleBinsDeepCopy[7].commit = [] + sampleBinsDeepCopy[15].commit = [] + sampleBinsDeepCopy[7].reserve = [] + sampleBinsDeepCopy[15].reserve = [] + const result = await BinProcessor.processBins(sampleBinsDeepCopy, trx) + + // Assert on result.notifyMessages + test.equal(result.notifyMessages.length, 1, 'processBins should return 3 messages') + + // TODO: What if there are no position changes in a batch? + // Assert on number of function calls for DB update on position value + // test.ok(BatchPositionModel.updateParticipantPosition.notCalled, 'updateParticipantPosition should not be called') + + // TODO: Assert on number of function calls for DB bulk insert of transferStateChanges + // TODO: Assert on number of function calls for DB bulk insert of positionChanges + + // Assert on DB update for position values of all accounts in each function call + test.deepEqual(BatchPositionModel.updateParticipantPosition.getCalls().map(call => call.args), [ + [{}, 7, -50, 0], + [{}, 15, 0, 0] + ], 'updateParticipantPosition should be called with the expected arguments') + + // TODO: Assert on DB bulk insert of transferStateChanges in each function call + // TODO: Assert on DB bulk insert of positionChanges in each function call + + test.end() + }) + prepareActionTest.test('processBins should throw error if any accountId cannot be matched to atleast one participantCurrencyId', async (test) => { const sampleParticipantLimitReturnValues = [ { @@ -727,7 +790,7 @@ Test('BinProcessor', async (binProcessorTest) => { const result = await BinProcessor.processBins(sampleBins, trx) // Assert on result.notifyMessages - test.equal(result.notifyMessages.length, 13, 'processBins should return 13 messages') + test.equal(result.notifyMessages.length, 14, 'processBins should return 14 messages') // TODO: What if there are no position changes in a batch? // Assert on number of function calls for DB update on position value @@ -738,7 +801,7 @@ Test('BinProcessor', async (binProcessorTest) => { // Assert on DB update for position values of all accounts in each function call test.deepEqual(BatchPositionModel.updateParticipantPosition.getCalls().map(call => call.args), [ - [{}, 7, 0, 0], + [{}, 7, -50, 0], [{}, 15, 2, 0] ], 'updateParticipantPosition should be called with the expected arguments') @@ -771,6 +834,8 @@ Test('BinProcessor', async (binProcessorTest) => { delete sampleBinsDeepCopy[15].commit delete sampleBinsDeepCopy[7].reserve delete sampleBinsDeepCopy[15].reserve + delete sampleBinsDeepCopy[7]['timeout-reserved'] + delete sampleBinsDeepCopy[15]['timeout-reserved'] const result = await BinProcessor.processBins(sampleBinsDeepCopy, trx) // Assert on result.notifyMessages @@ -830,7 +895,7 @@ Test('BinProcessor', async (binProcessorTest) => { const spyCb = sandbox.spy() await BinProcessor.iterateThroughBins(sampleBins, spyCb) - test.equal(spyCb.callCount, 13, 'callback should be called 13 times') + test.equal(spyCb.callCount, 14, 'callback should be called 14 times') test.end() }) iterateThroughBinsTest.test('iterateThroughBins should call error callback function if callback function throws error', async (test) => { @@ -840,7 +905,7 @@ Test('BinProcessor', async (binProcessorTest) => { spyCb.onThirdCall().throws() await BinProcessor.iterateThroughBins(sampleBins, spyCb, errorCb) - test.equal(spyCb.callCount, 13, 'callback should be called 13 times') + test.equal(spyCb.callCount, 14, 'callback should be called 14 times') test.equal(errorCb.callCount, 2, 'error callback should be called 2 times') test.end() }) @@ -849,7 +914,7 @@ Test('BinProcessor', async (binProcessorTest) => { spyCb.onFirstCall().throws() await BinProcessor.iterateThroughBins(sampleBins, spyCb) - test.equal(spyCb.callCount, 13, 'callback should be called 13 times') + test.equal(spyCb.callCount, 14, 'callback should be called 14 times') test.end() }) iterateThroughBinsTest.end() diff --git a/test/unit/domain/position/sampleBins.js b/test/unit/domain/position/sampleBins.js index 30cc2811d..8037b21ec 100644 --- a/test/unit/domain/position/sampleBins.js +++ b/test/unit/domain/position/sampleBins.js @@ -668,6 +668,84 @@ module.exports = { }, span: {} } + ], + 'timeout-reserved': [ + { + message: { + value: { + from: 'payerFsp69185571', + to: 'payeeFsp69186326', + id: '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5', + content: { + uriParams: { + id: '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5' + }, + headers: { + accept: 'application/vnd.interoperability.transfers+json;version=1.0', + 'FSPIOP-Destination': 'payerFsp69185571', + 'Content-Type': 'application/vnd.interoperability.transfers+json;version=1.0', + date: 'Tue, 14 May 2024 00:13:15 GMT', + 'FSPIOP-Source': 'switch' + }, + payload: { + errorInformation: { + errorCode: '3303', + errorDescription: 'Transfer expired', + extensionList: { + extension: [ + { + key: 'cause', + value: 'FSPIOPError at Object.createFSPIOPError (/home/kleyow/mojaloop/central-ledger/node_modules/@mojaloop/central-services-error-handling/src/factory.js:198:12) at CronJob.timeout (/home/kleyow/moj...' + } + ] + } + } + } + }, + type: 'application/vnd.interoperability.transfers+json;version=1.0', + metadata: { + correlationId: '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5', + event: { + type: 'position', + action: 'timeout-reserved', + createdAt: '2024-05-14T00:13:15.092Z', + state: { + status: 'error', + code: '3303', + description: 'Transfer expired' + }, + id: '1ef2f45c-f7a4-4b67-a0fc-7164ed43f0f1' + }, + trace: { + service: 'cl_transfer_timeout', + traceId: 'de8e410463b73e45203fc916d68cf98c', + spanId: 'bb0abd2ea5fdfbbd', + startTimestamp: '2024-05-14T00:13:15.092Z', + tags: { + tracestate: 'acmevendor=eyJzcGFuSWQiOiJiYjBhYmQyZWE1ZmRmYmJkIn0=', + transactionType: 'transfer', + transactionAction: 'timeout-received', + source: 'switch', + destination: 'payerFsp69185571' + }, + tracestates: { + acmevendor: { + spanId: 'bb0abd2ea5fdfbbd' + } + } + }, + 'protocol.createdAt': 1715645595093 + } + }, + size: 3489, + key: 51, + topic: 'topic-transfer-position', + offset: 4073, + partition: 0, + timestamp: 1694175690401 + }, + span: {} + } ] }, 15: { diff --git a/test/unit/domain/position/timeout-reserved.test.js b/test/unit/domain/position/timeout-reserved.test.js new file mode 100644 index 000000000..7e87dd1f8 --- /dev/null +++ b/test/unit/domain/position/timeout-reserved.test.js @@ -0,0 +1,272 @@ +/***** + License + -------------- + Copyright © 2017 Bill & Melinda Gates Foundation + The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + Contributors + -------------- + This is the official list of the Mojaloop project contributors for this file. + Names of the original copyright holders (individuals or organizations) + should be listed with a '*' in the first column. People who have + contributed from an organization can be listed under the organization + that actually holds the copyright for their contributions (see the + Gates Foundation organization for an example). Those individuals should have + their names indented and be marked with a '-'. Email address can be added + optionally within square brackets . + * Gates Foundation + - Name Surname + + * Kevin Leyow + -------------- + ******/ + +'use strict' + +const Test = require('tapes')(require('tape')) +const { Enum } = require('@mojaloop/central-services-shared') +const Sinon = require('sinon') +const { processPositionTimeoutReservedBin } = require('../../../../src/domain/position/timeout-reserved') + +const timeoutMessage1 = { + value: { + from: 'perffsp1', + to: 'perffsp2', + id: 'd6a036a5-65a3-48af-a0c7-ee089c412ada', + content: { + uriParams: { + id: 'd6a036a5-65a3-48af-a0c7-ee089c412ada' + }, + headers: { + accept: 'application/vnd.interoperability.transfers+json;version=1.0', + 'fspiop-destination': 'perffsp2', + 'Content-Type': 'application/vnd.interoperability.transfers+json;version=1.0', + date: 'Tue, 14 May 2024 00:13:15 GMT', + 'fspiop-source': 'perffsp1' + }, + payload: { + errorInformation: { + errorCode: '3303', + errorDescription: 'Transfer expired', + extensionList: { + extension: [ + { + key: 'cause', + value: 'FSPIOPError at Object.createFSPIOPError...' + } + ] + } + } + } + }, + type: 'application/vnd.interoperability.transfers+json;version=1.0', + metadata: { + correlationId: 'd6a036a5-65a3-48af-a0c7-ee089c412ada', + event: { + type: 'position', + action: 'timeout-reserved', + createdAt: '2024-05-14T00:13:15.092Z', + state: { + status: 'error', + code: '3303', + description: 'Transfer expired' + }, + id: '1ef2f45c-f7a4-4b67-a0fc-7164ed43f0f1' + }, + trace: { + service: 'cl_transfer_timeout', + traceId: 'de8e410463b73e45203fc916d68cf98c', + spanId: 'bb0abd2ea5fdfbbd', + startTimestamp: '2024-05-14T00:13:15.092Z', + tags: { + tracestate: 'acmevendor=eyJzcGFuSWQiOiJiYjBhYmQyZWE1ZmRmYmJkIn0=', + transactionType: 'transfer', + transactionAction: 'timeout-received', + source: 'switch', + destination: 'perffsp1' + }, + tracestates: { + acmevendor: { + spanId: 'bb0abd2ea5fdfbbd' + } + } + }, + 'protocol.createdAt': 1715645595093 + } + }, + size: 3489, + key: 51, + topic: 'topic-transfer-position', + offset: 4073, + partition: 0, + timestamp: 1694175690401 +} +const timeoutMessage2 = { + value: { + from: 'perffsp1', + to: 'perffsp2', + id: '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5', + content: { + uriParams: { + id: '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5' + }, + headers: { + accept: 'application/vnd.interoperability.transfers+json;version=1.0', + 'fspiop-destination': 'perffsp2', + 'Content-Type': 'application/vnd.interoperability.transfers+json;version=1.0', + date: 'Tue, 14 May 2024 00:13:15 GMT', + 'fspiop-source': 'perffsp1' + }, + payload: { + errorInformation: { + errorCode: '3303', + errorDescription: 'Transfer expired', + extensionList: { + extension: [ + { + key: 'cause', + value: 'FSPIOPError at Object.createFSPIOPError...' + } + ] + } + } + } + }, + type: 'application/vnd.interoperability.transfers+json;version=1.0', + metadata: { + correlationId: '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5', + event: { + type: 'position', + action: 'timeout-reserved', + createdAt: '2024-05-14T00:13:15.092Z', + state: { + status: 'error', + code: '3303', + description: 'Transfer expired' + }, + id: '1ef2f45c-f7a4-4b67-a0fc-7164ed43f0f1' + }, + trace: { + service: 'cl_transfer_timeout', + traceId: 'de8e410463b73e45203fc916d68cf98c', + spanId: 'bb0abd2ea5fdfbbd', + startTimestamp: '2024-05-14T00:13:15.092Z', + tags: { + tracestate: 'acmevendor=eyJzcGFuSWQiOiJiYjBhYmQyZWE1ZmRmYmJkIn0=', + transactionType: 'transfer', + transactionAction: 'timeout-received', + source: 'switch', + destination: 'perffsp1' + }, + tracestates: { + acmevendor: { + spanId: 'bb0abd2ea5fdfbbd' + } + } + }, + 'protocol.createdAt': 1715645595093 + } + }, + size: 3489, + key: 51, + topic: 'topic-transfer-position', + offset: 4073, + partition: 0, + timestamp: 1694175690401 +} + +const span = {} +const binItems = [{ + message: timeoutMessage1, + span, + decodedPayload: {} +}, +{ + message: timeoutMessage2, + span, + decodedPayload: {} +}] + +Test('timeout reserved domain', positionIndexTest => { + let sandbox + + positionIndexTest.beforeEach(t => { + sandbox = Sinon.createSandbox() + t.end() + }) + + positionIndexTest.afterEach(t => { + sandbox.restore() + t.end() + }) + + positionIndexTest.test('processPositionTimeoutReservedBin should', changeParticipantPositionTest => { + changeParticipantPositionTest.test('produce abort message for transfers not in the right transfer state', async (test) => { + try { + await processPositionTimeoutReservedBin( + binItems, + 0, // Accumulated position value + 0, + { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': 'INVALID_STATE', + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': 'INVALID_STATE' + }, + {} + ) + test.fail('Error not thrown') + } catch (e) { + test.pass('Error thrown') + } + test.end() + }) + + changeParticipantPositionTest.test('produce reserved messages/position changes for valid timeout messages', async (test) => { + const processedMessages = await processPositionTimeoutReservedBin( + binItems, + 0, // Accumulated position value + 0, + { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT + }, + { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': { + amount: -10 + }, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { + amount: -5 + } + } + ) + test.equal(processedMessages.notifyMessages.length, 2) + + test.equal(processedMessages.accumulatedPositionChanges.length, 2) + + test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-destination'], timeoutMessage1.value.to) + test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], timeoutMessage1.value.from) + test.equal(processedMessages.notifyMessages[0].message.content.headers['content-type'], timeoutMessage1.value.content.headers['content-type']) + test.equal(processedMessages.accumulatedPositionChanges[0].value, -10) + test.equal(processedMessages.accumulatedTransferStates[timeoutMessage1.value.id], Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + + test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-destination'], timeoutMessage2.value.to) + test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], timeoutMessage1.value.from) + test.equal(processedMessages.notifyMessages[1].message.content.headers['content-type'], timeoutMessage2.value.content.headers['content-type']) + test.equal(processedMessages.accumulatedPositionChanges[1].value, -15) + test.equal(processedMessages.accumulatedTransferStates[timeoutMessage2.value.id], Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + + test.equal(processedMessages.accumulatedTransferStateChanges[0].transferId, timeoutMessage1.value.id) + test.equal(processedMessages.accumulatedTransferStateChanges[1].transferId, timeoutMessage2.value.id) + + test.equal(processedMessages.accumulatedTransferStateChanges[0].transferStateId, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + test.equal(processedMessages.accumulatedTransferStateChanges[1].transferStateId, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + + test.equal(processedMessages.accumulatedPositionValue, -15) + test.end() + }) + + changeParticipantPositionTest.end() + }) + + positionIndexTest.end() +})