diff --git a/package-lock.json b/package-lock.json index 1ae37ec88..133874546 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "@mojaloop/central-ledger", - "version": "10.5.1", + "version": "10.5.2", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -857,9 +857,9 @@ } }, "@mojaloop/central-services-shared": { - "version": "10.5.1", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-10.5.1.tgz", - "integrity": "sha512-o71foh8GiEaxYpcgz5gmOXyvteYI50ABOxzu3l6VorQUAowX8pzg4p0dBCSbCY36gqtfkrzCuDBXQK+rMbAW2A==", + "version": "10.5.2", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-10.5.2.tgz", + "integrity": "sha512-e8mLezEPFmUk3HTxxGnWn0IbHSHLuzgLukenB5iLYrbEc61340Y50peYH5Fmq/5KI8LVXFNVoUUm4GhOEdksuQ==", "requires": { "@hapi/catbox": "11.1.0", "@hapi/catbox-memory": "5.0.0", diff --git a/package.json b/package.json index f8f3c1a0b..70e501a63 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mojaloop/central-ledger", - "version": "10.5.1", + "version": "10.5.2", "description": "Central ledger hosted by a scheme to record and settle transfers", "license": "Apache-2.0", "author": "ModusBox", @@ -86,7 +86,7 @@ "@mojaloop/central-services-health": "10.4.0", "@mojaloop/central-services-logger": "10.4.0", "@mojaloop/central-services-metrics": "9.5.0", - "@mojaloop/central-services-shared": "10.5.1", + "@mojaloop/central-services-shared": "10.5.2", "@mojaloop/central-services-stream": "9.5.0", "@mojaloop/event-sdk": "10.4.0", "@mojaloop/forensic-logging-client": "8.3.0", diff --git a/src/handlers/positions/handler.js b/src/handlers/positions/handler.js index cde86aea2..13bb351a6 100644 --- a/src/handlers/positions/handler.js +++ b/src/handlers/positions/handler.js @@ -41,6 +41,7 @@ const Logger = require('@mojaloop/central-services-logger') const EventSdk = require('@mojaloop/event-sdk') const TransferService = require('../../domain/transfer') +const TransferObjectTransform = require('../../domain/transfer/transform') const PositionService = require('../../domain/position') const Utility = require('@mojaloop/central-services-shared').Util const Kafka = require('@mojaloop/central-services-shared').Util.Kafka @@ -115,15 +116,16 @@ const positions = async (error, messages) => { Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, { method: 'positions' })) const actionLetter = action === Enum.Events.Event.Action.PREPARE ? Enum.Events.ActionLetter.prepare - : (action === Enum.Events.Event.Action.COMMIT ? Enum.Events.ActionLetter.commit - : (action === Enum.Events.Event.Action.REJECT ? Enum.Events.ActionLetter.reject - : (action === Enum.Events.Event.Action.ABORT ? Enum.Events.ActionLetter.abort - : (action === Enum.Events.Event.Action.TIMEOUT_RESERVED ? Enum.Events.ActionLetter.timeout - : (action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare - : (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit - : (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved - : (action === Enum.Events.Event.Action.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort - : Enum.Events.ActionLetter.unknown)))))))) + : (action === Enum.Events.Event.Action.RESERVE ? Enum.Events.ActionLetter.reserve + : (action === Enum.Events.Event.Action.COMMIT ? Enum.Events.ActionLetter.commit + : (action === Enum.Events.Event.Action.REJECT ? Enum.Events.ActionLetter.reject + : (action === Enum.Events.Event.Action.ABORT ? Enum.Events.ActionLetter.abort + : (action === Enum.Events.Event.Action.TIMEOUT_RESERVED ? Enum.Events.ActionLetter.timeout + : (action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare + : (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit + : (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved + : (action === Enum.Events.Event.Action.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort + : Enum.Events.ActionLetter.unknown))))))))) const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer } const eventDetail = { action } if (![Enum.Events.Event.Action.BULK_PREPARE, Enum.Events.Event.Action.BULK_COMMIT, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED, Enum.Events.Event.Action.BULK_ABORT].includes(action)) { @@ -156,7 +158,7 @@ const positions = async (error, messages) => { throw fspiopError } } - } else if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.BULK_COMMIT].includes(action)) { + } else if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.RESERVE, Enum.Events.Event.Action.BULK_COMMIT].includes(action)) { Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, { path: 'commit' })) const transferInfo = await TransferService.getTransferInfoToChangePosition(transferId, Enum.Accounts.TransferParticipantRoleType.PAYEE_DFSP, Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE) if (transferInfo.transferStateId !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) { @@ -172,6 +174,10 @@ const positions = async (error, messages) => { transferStateId: Enum.Transfers.TransferState.COMMITTED } await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange) + if (action === Enum.Events.Event.Action.RESERVE) { + const transfer = await TransferService.getById(transferInfo.transferId) + message.value.content.payload = TransferObjectTransform.toFulfil(transfer) + } await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId, action }) return true diff --git a/src/handlers/transfers/handler.js b/src/handlers/transfers/handler.js index d9b38154e..682073485 100644 --- a/src/handlers/transfers/handler.js +++ b/src/handlers/transfers/handler.js @@ -274,17 +274,19 @@ const fulfil = async (error, messages) => { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, { method: `fulfil:${action}` })) const actionLetter = action === TransferEventAction.COMMIT ? Enum.Events.ActionLetter.commit - : (action === TransferEventAction.REJECT ? Enum.Events.ActionLetter.reject - : (action === TransferEventAction.ABORT ? Enum.Events.ActionLetter.abort - : (action === TransferEventAction.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit - : (action === TransferEventAction.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort - : Enum.Events.ActionLetter.unknown)))) + : (action === TransferEventAction.RESERVE ? Enum.Events.ActionLetter.reserve + : (action === TransferEventAction.REJECT ? Enum.Events.ActionLetter.reject + : (action === TransferEventAction.ABORT ? Enum.Events.ActionLetter.abort + : (action === TransferEventAction.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit + : (action === TransferEventAction.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort + : Enum.Events.ActionLetter.unknown))))) const functionality = action === TransferEventAction.COMMIT ? TransferEventType.NOTIFICATION - : (action === TransferEventAction.REJECT ? TransferEventType.NOTIFICATION - : (action === TransferEventAction.ABORT ? TransferEventType.NOTIFICATION - : (action === TransferEventAction.BULK_COMMIT ? TransferEventType.BULK_PROCESSING - : (action === TransferEventAction.BULK_ABORT ? TransferEventType.BULK_PROCESSING - : Enum.Events.ActionLetter.unknown)))) + : (action === TransferEventAction.RESERVE ? TransferEventType.NOTIFICATION + : (action === TransferEventAction.REJECT ? TransferEventType.NOTIFICATION + : (action === TransferEventAction.ABORT ? TransferEventType.NOTIFICATION + : (action === TransferEventAction.BULK_COMMIT ? TransferEventType.BULK_PROCESSING + : (action === TransferEventAction.BULK_ABORT ? TransferEventType.BULK_PROCESSING + : Enum.Events.ActionLetter.unknown))))) // fulfil-specific declarations const isTransferError = action === TransferEventAction.ABORT const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer } @@ -344,22 +346,22 @@ const fulfil = async (error, messages) => { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, 'handleResend')) if (transferStateEnum === TransferState.COMMITTED || transferStateEnum === TransferState.ABORTED) { message.value.content.payload = TransferObjectTransform.toFulfil(transfer) - if (!isTransferError) { - Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized2--${actionLetter}3`)) - const eventDetail = { functionality, action: TransferEventAction.FULFIL_DUPLICATE } - /** - * HOWTO: During bulk fulfil use an individualTransfer from a previous bulk fulfil - */ - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch }) - histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) - return true - } else { - Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized3--${actionLetter}4`)) - const eventDetail = { functionality, action: TransferEventAction.ABORT_DUPLICATE } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch }) - histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) - return true + const eventDetail = { functionality, action } + if (action !== TransferEventAction.RESERVE) { + if (!isTransferError) { + Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized2--${actionLetter}3`)) + eventDetail.action = TransferEventAction.FULFIL_DUPLICATE + /** + * HOWTO: During bulk fulfil use an individualTransfer from a previous bulk fulfil + */ + } else { + Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized3--${actionLetter}4`)) + eventDetail.action = TransferEventAction.ABORT_DUPLICATE + } } + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch }) + histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) + return true } else if (transferStateEnum === TransferState.RECEIVED || transferStateEnum === TransferState.RESERVED) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `inProgress2--${actionLetter}5`)) /** @@ -401,7 +403,7 @@ const fulfil = async (error, messages) => { await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) throw fspiopError } else { // !hasDuplicateId - if (type === TransferEventType.FULFIL && [TransferEventAction.COMMIT, TransferEventAction.REJECT, TransferEventAction.ABORT, TransferEventAction.BULK_COMMIT, TransferEventAction.BULK_ABORT].includes(action)) { + if (type === TransferEventType.FULFIL && [TransferEventAction.COMMIT, TransferEventAction.RESERVE, TransferEventAction.REJECT, TransferEventAction.ABORT, TransferEventAction.BULK_COMMIT, TransferEventAction.BULK_ABORT].includes(action)) { Util.breadcrumb(location, { path: 'validationCheck' }) if (payload.fulfilment && !Validator.validateFulfilCondition(payload.fulfilment, transfer.condition)) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInvalidFulfilment--${actionLetter}9`)) @@ -434,7 +436,7 @@ const fulfil = async (error, messages) => { throw fspiopError } else { // validations success Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, { path: 'validationPassed' })) - if ([TransferEventAction.COMMIT, TransferEventAction.BULK_COMMIT].includes(action)) { + if ([TransferEventAction.COMMIT, TransferEventAction.RESERVE, TransferEventAction.BULK_COMMIT].includes(action)) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `positionTopic2--${actionLetter}12`)) await TransferService.handlePayeeResponse(transferId, payload, action) const eventDetail = { functionality: TransferEventType.POSITION, action } diff --git a/src/models/transfer/facade.js b/src/models/transfer/facade.js index 5d74e73e7..34afb4879 100644 --- a/src/models/transfer/facade.js +++ b/src/models/transfer/facade.js @@ -266,6 +266,7 @@ const savePayeeTransferResponse = async (transferId, payload, action, fspiopErro switch (action) { case TransferEventAction.COMMIT: case TransferEventAction.BULK_COMMIT: + case TransferEventAction.RESERVE: state = TransferInternalState.RECEIVED_FULFIL extensionList = payload.extensionList isFulfilment = true diff --git a/test/unit/handlers/transfers/handler.test.js b/test/unit/handlers/transfers/handler.test.js index 4b56b5c1b..3125c6495 100644 --- a/test/unit/handlers/transfers/handler.test.js +++ b/test/unit/handlers/transfers/handler.test.js @@ -1110,6 +1110,30 @@ Test('Transfer handler', transferHandlerTest => { test.end() }) + fulfilTest.test('produce message to position topic when validations pass and action is RESERVE', async (test) => { + const localfulfilMessages = MainUtil.clone(fulfilMessages) + localfulfilMessages[0].value.metadata.event.action = 'reserve' + await Consumer.createHandler(topicName, config, command) + Kafka.transformGeneralTopicName.returns(topicName) + TransferService.getById.returns(Promise.resolve({ condition: 'condition', payeeFsp: 'dfsp2', transferState: TransferState.RESERVED })) + ilp.update.returns(Promise.resolve()) + Validator.validateFulfilCondition.returns(true) + localfulfilMessages[0].value.content.headers['fspiop-source'] = 'dfsp2' + localfulfilMessages[0].value.content.payload.fulfilment = 'condition' + Kafka.proceed.returns(true) + + TransferService.getTransferDuplicateCheck.returns(Promise.resolve(null)) + TransferService.saveTransferDuplicateCheck.returns(Promise.resolve(null)) + Comparators.duplicateCheckComparator.withArgs(transfer.transferId, localfulfilMessages[0].value.content.payload).returns(Promise.resolve({ + hasDuplicateId: false, + hasDuplicateHash: false + })) + + const result = await allTransferHandlers.fulfil(null, localfulfilMessages) + test.equal(result, true) + test.end() + }) + fulfilTest.test('produce message to position topic when BULK_COMMIT validations pass', async (test) => { const localfulfilMessages = MainUtil.clone(fulfilMessages) await Consumer.createHandler(topicName, config, command)