From afc4c5c8ce75853416d90db20c02bff42b50ef7c Mon Sep 17 00:00:00 2001 From: Eugen Klymniuk Date: Mon, 29 Apr 2024 19:29:49 +0100 Subject: [PATCH] feat(mojaloop/#3844): added integration tests for fxFulfil flow (#1011) * feat(mojaloop/#3844): added corner cases impl. for FX; added unit-tests * feat(mojaloop/#3844): added corner cases impl. for FX; added unit-tests * feat(mojaloop/#3844): added fxTransferErrorDuplicateCheck table; moved fxFulfilt tests in a separare file * feat(mojaloop/#3844): run tests with output * feat(mojaloop/#3844): fixed unit-test on ci env * feat(mojaloop/#3844): added unit-tests for FxFulfilService; moved duplicateCheckComparator logic to service * feat(mojaloop/#3844): reverted ci test-coverage * feat(mojaloop/#3844): added license * feat(mojaloop/#3844): moved checkErrorPayload to helpers * feat(mojaloop/#3844): moved checkErrorPayload to helpers * feat(mojaloop/#3844): updated from feat/fx-impl * feat(mojaloop/#3844): added integration tests for fxFulfil flow * feat(mojaloop/#3844): fixed producer.disconnect() in int-tests * feat(mojaloop/#3844): added test:int:transfers script * feat(mojaloop/#3844): added duplicateCheck int test * feat(mojaloop/#3844): small cleanup * feat(mojaloop/#3844): added duplicate and fulfilment check int-tests * feat(mojaloop/#3844): removed unneeded code * feat(mojaloop/#3844): added testConsumer.clearEvents() for int-tests * feat(mojaloop/#3844): skipped newly added int-test * feat(mojaloop/#3844): updated validateFulfilCondition * feat: unskip int-test feat: unskip int-test * feat(mojaloop/#3844): removed unneeded npm script --------- Co-authored-by: Kevin Leyow --- src/handlers/transfers/FxFulfilService.js | 37 ++- src/handlers/transfers/prepare.js | 2 +- src/models/fxTransfer/duplicateCheck.js | 136 +++++---- src/models/fxTransfer/fxTransfer.js | 8 +- src/shared/constants.js | 1 + src/shared/fspiopErrorFactory.js | 2 +- src/shared/logger/Logger.js | 2 +- test/fixtures.js | 26 +- .../handlers/positions/handlerBatch.test.js | 2 + .../handlers/transfers/handlers.test.js | 1 + .../handlers/transfers/fxFulfil.test.js | 273 ++++++++++++++++++ .../handlers/transfers/handlers.test.js | 1 + .../integration/helpers/createTestConsumer.js | 57 ++++ test/integration/helpers/testConsumer.js | 9 +- .../transfers/FxFulfilService.test.js | 72 ++++- 15 files changed, 528 insertions(+), 101 deletions(-) create mode 100644 test/integration/handlers/transfers/fxFulfil.test.js create mode 100644 test/integration/helpers/createTestConsumer.js diff --git a/src/handlers/transfers/FxFulfilService.js b/src/handlers/transfers/FxFulfilService.js index 265b0a93a..4ac140783 100644 --- a/src/handlers/transfers/FxFulfilService.js +++ b/src/handlers/transfers/FxFulfilService.js @@ -18,7 +18,7 @@ * Gates Foundation - Name Surname - * Eugen Klymniuk -------------- **********/ @@ -111,13 +111,14 @@ class FxFulfilService { async getDuplicateCheckResult({ commitRequestId, payload, action }) { const { duplicateCheck } = this.FxTransferModel + const isFxTransferError = action === Action.FX_ABORT - const getDuplicateFn = action === Action.FX_ABORT + const getDuplicateFn = isFxTransferError ? duplicateCheck.getFxTransferErrorDuplicateCheck - : duplicateCheck.getFxTransferDuplicateCheck - const saveHashFn = action === Action.FX_ABORT + : duplicateCheck.getFxTransferFulfilmentDuplicateCheck + const saveHashFn = isFxTransferError ? duplicateCheck.saveFxTransferErrorDuplicateCheck - : duplicateCheck.saveFxTransferDuplicateCheck + : duplicateCheck.saveFxTransferFulfilmentDuplicateCheck return this.Comparators.duplicateCheckComparator( commitRequestId, @@ -212,17 +213,20 @@ class FxFulfilService { }) throw fspiopError } + this.log.debug('validateEventType is passed', { type, functionality }) } async validateFulfilment(transfer, payload) { - if (payload.fulfilment && !this.Validator.validateFulfilCondition(payload.fulfilment, transfer.condition)) { + const isValid = this.validateFulfilCondition(payload.fulfilment, transfer.ilpCondition) + + if (!isValid) { const fspiopError = fspiopErrorFactory.fxInvalidFulfilment() const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) const eventDetail = { functionality: Type.POSITION, action: Action.FX_ABORT_VALIDATION } - this.log.warn('callbackErrorInvalidFulfilment', { eventDetail, apiFSPIOPError }) + this.log.warn('callbackErrorInvalidFulfilment', { eventDetail, apiFSPIOPError, transfer, payload }) await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, eventDetail.action, apiFSPIOPError) await this.kafkaProceed({ @@ -233,9 +237,9 @@ class FxFulfilService { }) throw fspiopError } - this.log.info('fulfilmentCheck passed successfully') - return true + this.log.info('fulfilmentCheck passed successfully', { isValid }) + return isValid } async validateTransferState(transfer, functionality) { @@ -246,7 +250,7 @@ class FxFulfilService { functionality, action: Action.FX_RESERVE } - this.log.warn('callbackErrorNonReservedState', { eventDetail, apiFSPIOPError }) + this.log.warn('callbackErrorNonReservedState', { eventDetail, apiFSPIOPError, transfer }) await this.kafkaProceed({ consumerCommit, @@ -256,6 +260,8 @@ class FxFulfilService { }) throw fspiopError } + this.log.debug('validateTransferState is passed') + return true } async validateExpirationDate(transfer, functionality) { @@ -320,6 +326,17 @@ class FxFulfilService { return this.Kafka.proceed(this.Config.KAFKA_CONFIG, this.params, kafkaOpts) } + validateFulfilCondition(fulfilment, condition) { + try { + const isValid = fulfilment && this.Validator.validateFulfilCondition(fulfilment, condition) + this.log.debug('validateFulfilCondition result:', { isValid, fulfilment, condition }) + return isValid + } catch (err) { + this.log.warn(`validateFulfilCondition error: ${err?.message}`, { fulfilment, condition }) + return false + } + } + static decodeKafkaMessage(message) { if (!message?.value) { throw TypeError('Invalid message format!') diff --git a/src/handlers/transfers/prepare.js b/src/handlers/transfers/prepare.js index 87ce4b54b..cb69b859e 100644 --- a/src/handlers/transfers/prepare.js +++ b/src/handlers/transfers/prepare.js @@ -18,7 +18,7 @@ * Gates Foundation - Name Surname - * Eugen Klymniuk -------------- **********/ diff --git a/src/models/fxTransfer/duplicateCheck.js b/src/models/fxTransfer/duplicateCheck.js index d1c86f746..aba6f3e58 100644 --- a/src/models/fxTransfer/duplicateCheck.js +++ b/src/models/fxTransfer/duplicateCheck.js @@ -6,25 +6,13 @@ const { TABLE_NAMES } = require('../../shared/constants') const histName = 'model_fx_transfer' -/** - * @function GetTransferDuplicateCheck - * - * @async - * @description This retrieves the fxTransferDuplicateCheck table record if present - * - * @param {string} commitRequestId - the fxTransfer commitRequestId - * - * @returns {object} - Returns the record from fxTransferDuplicateCheck table, or throws an error if failed - */ -const getFxTransferDuplicateCheck = async (commitRequestId) => { - const table = TABLE_NAMES.fxTransferDuplicateCheck - const queryName = `${table}_getFxTransferDuplicateCheck` +const getOneByCommitRequestId = async ({ commitRequestId, table, queryName }) => { const histTimerEnd = Metrics.getHistogram( histName, `${queryName} - Metrics for fxTransfer duplicate check model`, ['success', 'queryName'] ).startTimer() - logger.debug(`get ${table}`, { commitRequestId }) + logger.debug('get duplicate record', { commitRequestId, table, queryName }) try { const result = await Db.from(table).findOne({ commitRequestId }) @@ -32,30 +20,17 @@ const getFxTransferDuplicateCheck = async (commitRequestId) => { return result } catch (err) { histTimerEnd({ success: false, queryName }) - throw new Error(err?.message) + throw ErrorHandler.Factory.reformatFSPIOPError(err) } } -/** - * @function SaveTransferDuplicateCheck - * - * @async - * @description This inserts a record into fxTransferDuplicateCheck table - * - * @param {string} commitRequestId - the fxTransfer commitRequestId - * @param {string} hash - the hash of the fxTransfer request payload - * - * @returns {integer} - Returns the database id of the inserted row, or throws an error if failed - */ -const saveFxTransferDuplicateCheck = async (commitRequestId, hash) => { - const table = TABLE_NAMES.fxTransferDuplicateCheck - const queryName = `${table}_saveFxTransferDuplicateCheck` +const saveCommitRequestIdAndHash = async ({ commitRequestId, hash, table, queryName }) => { const histTimerEnd = Metrics.getHistogram( histName, `${queryName} - Metrics for fxTransfer duplicate check model`, ['success', 'queryName'] ).startTimer() - logger.debug(`save ${table}`, { commitRequestId, hash }) + logger.debug('save duplicate record', { commitRequestId, hash, table }) try { const result = await Db.from(table).insert({ commitRequestId, hash }) @@ -67,6 +42,39 @@ const saveFxTransferDuplicateCheck = async (commitRequestId, hash) => { } } +/** + * @function GetTransferDuplicateCheck + * + * @async + * @description This retrieves the fxTransferDuplicateCheck table record if present + * + * @param {string} commitRequestId - the fxTransfer commitRequestId + * + * @returns {object} - Returns the record from fxTransferDuplicateCheck table, or throws an error if failed + */ +const getFxTransferDuplicateCheck = async (commitRequestId) => { + const table = TABLE_NAMES.fxTransferDuplicateCheck + const queryName = `${table}_getFxTransferDuplicateCheck` + return getOneByCommitRequestId({ commitRequestId, table, queryName }) +} + +/** + * @function SaveTransferDuplicateCheck + * + * @async + * @description This inserts a record into fxTransferDuplicateCheck table + * + * @param {string} commitRequestId - the fxTransfer commitRequestId + * @param {string} hash - the hash of the fxTransfer request payload + * + * @returns {integer} - Returns the database id of the inserted row, or throws an error if failed + */ +const saveFxTransferDuplicateCheck = async (commitRequestId, hash) => { + const table = TABLE_NAMES.fxTransferDuplicateCheck + const queryName = `${table}_saveFxTransferDuplicateCheck` + return saveCommitRequestIdAndHash({ commitRequestId, hash, table, queryName }) +} + /** * @function getFxTransferErrorDuplicateCheck * @@ -80,21 +88,7 @@ const saveFxTransferDuplicateCheck = async (commitRequestId, hash) => { const getFxTransferErrorDuplicateCheck = async (commitRequestId) => { const table = TABLE_NAMES.fxTransferErrorDuplicateCheck const queryName = `${table}_getFxTransferErrorDuplicateCheck` - const histTimerEnd = Metrics.getHistogram( - histName, - `${queryName} - Metrics for fxTransfer error duplicate check model`, - ['success', 'queryName'] - ).startTimer() - logger.debug(`get ${table}`, { commitRequestId }) - - try { - const result = await Db.from(table).findOne({ commitRequestId }) - histTimerEnd({ success: true, queryName }) - return result - } catch (err) { - histTimerEnd({ success: false, queryName }) - throw new Error(err?.message) - } + return getOneByCommitRequestId({ commitRequestId, table, queryName }) } /** @@ -111,21 +105,40 @@ const getFxTransferErrorDuplicateCheck = async (commitRequestId) => { const saveFxTransferErrorDuplicateCheck = async (commitRequestId, hash) => { const table = TABLE_NAMES.fxTransferErrorDuplicateCheck const queryName = `${table}_saveFxTransferErrorDuplicateCheck` - const histTimerEnd = Metrics.getHistogram( - histName, - `${queryName} - Metrics for fxTransfer error duplicate check model`, - ['success', 'queryName'] - ).startTimer() - logger.debug(`save ${table}`, { commitRequestId, hash }) + return saveCommitRequestIdAndHash({ commitRequestId, hash, table, queryName }) +} - try { - const result = await Db.from(table).insert({ commitRequestId, hash }) - histTimerEnd({ success: true, queryName }) - return result - } catch (err) { - histTimerEnd({ success: false, queryName }) - throw ErrorHandler.Factory.reformatFSPIOPError(err) - } +/** + * @function getFxTransferFulfilmentDuplicateCheck + * + * @async + * @description This retrieves the fxTransferFulfilmentDuplicateCheck table record if present + * + * @param {string} commitRequestId - the fxTransfer commitRequestId + * + * @returns {object} - Returns the record from fxTransferFulfilmentDuplicateCheck table, or throws an error if failed + */ +const getFxTransferFulfilmentDuplicateCheck = async (commitRequestId) => { + const table = TABLE_NAMES.fxTransferFulfilmentDuplicateCheck + const queryName = `${table}_getFxTransferFulfilmentDuplicateCheck` + return getOneByCommitRequestId({ commitRequestId, table, queryName }) +} + +/** + * @function saveFxTransferFulfilmentDuplicateCheck + * + * @async + * @description This inserts a record into fxTransferFulfilmentDuplicateCheck table + * + * @param {string} commitRequestId - the fxTransfer commitRequestId + * @param {string} hash - the hash of the fxTransfer request payload + * + * @returns {integer} - Returns the database id of the inserted row, or throws an error if failed + */ +const saveFxTransferFulfilmentDuplicateCheck = async (commitRequestId, hash) => { + const table = TABLE_NAMES.fxTransferFulfilmentDuplicateCheck + const queryName = `${table}_saveFxTransferFulfilmentDuplicateCheck` + return saveCommitRequestIdAndHash({ commitRequestId, hash, table, queryName }) } module.exports = { @@ -133,5 +146,8 @@ module.exports = { saveFxTransferDuplicateCheck, getFxTransferErrorDuplicateCheck, - saveFxTransferErrorDuplicateCheck + saveFxTransferErrorDuplicateCheck, + + getFxTransferFulfilmentDuplicateCheck, + saveFxTransferFulfilmentDuplicateCheck } diff --git a/src/models/fxTransfer/fxTransfer.js b/src/models/fxTransfer/fxTransfer.js index 646047e63..b40c59766 100644 --- a/src/models/fxTransfer/fxTransfer.js +++ b/src/models/fxTransfer/fxTransfer.js @@ -147,6 +147,7 @@ const savePreparedRequest = async (payload, stateReason, hasPassedValidation) => getParticipant(payload.counterPartyFsp, payload.sourceAmount.currency), getParticipant(payload.counterPartyFsp, payload.targetAmount.currency) ]) + // todo: clarify, what we should do if no initiatingParticipant or counterParticipant found? const fxTransferRecord = { commitRequestId: payload.commitRequestId, @@ -275,8 +276,7 @@ const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopErro const errorDescription = fspiopError && fspiopError.errorInformation && fspiopError.errorInformation.errorDescription // let extensionList switch (action) { - // TODO: Need to check if these are relevant for FX transfers - // case TransferEventAction.COMMIT: + case TransferEventAction.FX_COMMIT: case TransferEventAction.FX_RESERVE: state = TransferInternalState.RECEIVED_FULFIL // extensionList = payload && payload.extensionList @@ -287,8 +287,8 @@ const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopErro // extensionList = payload && payload.extensionList isFulfilment = true break - // TODO: Need to check if these are relevant for FX transfers - // case TransferEventAction.ABORT_VALIDATION: + + case TransferEventAction.FX_ABORT_VALIDATION: case TransferEventAction.FX_ABORT: state = TransferInternalState.RECEIVED_ERROR // extensionList = payload && payload.errorInformation && payload.errorInformation.extensionList diff --git a/src/shared/constants.js b/src/shared/constants.js index 0052ac203..79967880e 100644 --- a/src/shared/constants.js +++ b/src/shared/constants.js @@ -4,6 +4,7 @@ const TABLE_NAMES = Object.freeze({ fxTransfer: 'fxTransfer', fxTransferDuplicateCheck: 'fxTransferDuplicateCheck', fxTransferErrorDuplicateCheck: 'fxTransferErrorDuplicateCheck', + fxTransferFulfilmentDuplicateCheck: 'fxTransferFulfilmentDuplicateCheck', fxTransferParticipant: 'fxTransferParticipant', fxTransferStateChange: 'fxTransferStateChange', fxWatchList: 'fxWatchList', diff --git a/src/shared/fspiopErrorFactory.js b/src/shared/fspiopErrorFactory.js index 721cbb0be..2e7ce3749 100644 --- a/src/shared/fspiopErrorFactory.js +++ b/src/shared/fspiopErrorFactory.js @@ -19,7 +19,7 @@ * Gates Foundation - Name Surname - * Eugen Klymniuk -------------- **********/ diff --git a/src/shared/logger/Logger.js b/src/shared/logger/Logger.js index 4d996c5ab..aaa9d5479 100644 --- a/src/shared/logger/Logger.js +++ b/src/shared/logger/Logger.js @@ -19,7 +19,7 @@ * Gates Foundation - Name Surname - * Eugen Klymniuk -------------- **********/ diff --git a/test/fixtures.js b/test/fixtures.js index 421eff709..dc3d55582 100644 --- a/test/fixtures.js +++ b/test/fixtures.js @@ -18,7 +18,7 @@ * Gates Foundation - Name Surname - * Eugen Klymniuk -------------- **********/ @@ -26,8 +26,8 @@ const { randomUUID } = require('node:crypto') const { Enum } = require('@mojaloop/central-services-shared') const ILP_PACKET = 'AYIBgQAAAAAAAASwNGxldmVsb25lLmRmc3AxLm1lci45T2RTOF81MDdqUUZERmZlakgyOVc4bXFmNEpLMHlGTFGCAUBQU0svMS4wCk5vbmNlOiB1SXlweUYzY3pYSXBFdzVVc05TYWh3CkVuY3J5cHRpb246IG5vbmUKUGF5bWVudC1JZDogMTMyMzZhM2ItOGZhOC00MTYzLTg0NDctNGMzZWQzZGE5OGE3CgpDb250ZW50LUxlbmd0aDogMTM1CkNvbnRlbnQtVHlwZTogYXBwbGljYXRpb24vanNvbgpTZW5kZXItSWRlbnRpZmllcjogOTI4MDYzOTEKCiJ7XCJmZWVcIjowLFwidHJhbnNmZXJDb2RlXCI6XCJpbnZvaWNlXCIsXCJkZWJpdE5hbWVcIjpcImFsaWNlIGNvb3BlclwiLFwiY3JlZGl0TmFtZVwiOlwibWVyIGNoYW50XCIsXCJkZWJpdElkZW50aWZpZXJcIjpcIjkyODA2MzkxXCJ9IgA' -const CONDITION = 'YlK5TZyhflbXaDRPtR5zhCu8FrbgvrQwwmzuH0iQ0AI' -const FULLFILMENT = 'oAKAAA' +const CONDITION = '8x04dj-RKEtfjStajaKXKJ5eL1mWm9iG2ltEKvEDOHc' +const FULFILMENT = 'uz0FAeutW6o8Mz7OmJh8ALX6mmsZCcIDOqtE01eo4uI' const DFSP1_ID = 'dfsp1' const DFSP2_ID = 'dfsp2' @@ -53,7 +53,7 @@ const extensionListDto = ({ }) const fulfilPayloadDto = ({ - fulfilment = FULLFILMENT, + fulfilment = FULFILMENT, transferState = 'RECEIVED', completedTimestamp = new Date().toISOString(), extensionList = extensionListDto() @@ -65,7 +65,7 @@ const fulfilPayloadDto = ({ }) const fxFulfilPayloadDto = ({ - fulfilment = FULLFILMENT, + fulfilment = FULFILMENT, conversionState = 'RECEIVED', completedTimestamp = new Date().toISOString(), extensionList = extensionListDto() @@ -95,13 +95,13 @@ const fulfilContentDto = ({ const fxFulfilContentDto = ({ payload = fxFulfilPayloadDto(), - fxTransferId = randomUUID(), + commitRequestId = randomUUID(), from = FXP_ID, to = DFSP1_ID } = {}) => ({ payload, uriParams: { - id: fxTransferId + id: commitRequestId }, headers: { 'fspiop-source': from, @@ -111,7 +111,7 @@ const fxFulfilContentDto = ({ }) const fulfilMetadataDto = ({ - id = randomUUID(), // todo: think, how it relates to other ids + id = randomUUID(), // think, how it relates to other ids type = 'fulfil', action = 'commit' } = {}) => ({ @@ -228,7 +228,8 @@ const fxTransferDto = ({ amountType = 'SEND', sourceAmount = amountDto({ currency: 'BWP', amount: '300.33' }), targetAmount = amountDto({ currency: 'TZS', amount: '48000' }), - condition = CONDITION + condition = CONDITION, + expiration = new Date(Date.now() + (24 * 60 * 60 * 1000)) } = {}) => ({ commitRequestId, determiningTransferId, @@ -237,7 +238,8 @@ const fxTransferDto = ({ amountType, sourceAmount, targetAmount, - condition + condition, + expiration }) const fxtGetAllDetailsByCommitRequestIdDto = ({ @@ -265,7 +267,7 @@ const fxtGetAllDetailsByCommitRequestIdDto = ({ counterPartyFspSourceParticipantCurrencyId: 33, transferState: Enum.Transfers.TransferState.RESERVED, transferStateEnumeration: 'RECEIVED', // or RECEIVED_FULFIL? - fulfilment: FULLFILMENT, + fulfilment: FULFILMENT, // todo: add other fields from getAllDetailsByCommitRequestId real response expirationDate: new Date(), createdDate: new Date() @@ -299,7 +301,7 @@ const watchListItemDto = ({ module.exports = { ILP_PACKET, CONDITION, - FULLFILMENT, + FULFILMENT, DFSP1_ID, DFSP2_ID, FXP_ID, diff --git a/test/integration-override/handlers/positions/handlerBatch.test.js b/test/integration-override/handlers/positions/handlerBatch.test.js index aec0609d3..d7f8352df 100644 --- a/test/integration-override/handlers/positions/handlerBatch.test.js +++ b/test/integration-override/handlers/positions/handlerBatch.test.js @@ -898,6 +898,8 @@ Test('Handlers test', async handlersTest => { await setupTests.test('start testConsumer', async (test) => { // Set up the testConsumer here await testConsumer.startListening() + await new Promise(resolve => setTimeout(resolve, 5_000)) + testConsumer.clearEvents() test.pass('done') test.end() diff --git a/test/integration-override/handlers/transfers/handlers.test.js b/test/integration-override/handlers/transfers/handlers.test.js index fd178227d..4869c82b0 100644 --- a/test/integration-override/handlers/transfers/handlers.test.js +++ b/test/integration-override/handlers/transfers/handlers.test.js @@ -329,6 +329,7 @@ Test('Handlers test', async handlersTest => { await testConsumer.startListening() // TODO: MIG - Disabling these handlers to test running the CL as a separate service independently. await new Promise(resolve => setTimeout(resolve, rebalanceDelay)) + testConsumer.clearEvents() test.pass('done') test.end() diff --git a/test/integration/handlers/transfers/fxFulfil.test.js b/test/integration/handlers/transfers/fxFulfil.test.js new file mode 100644 index 000000000..baabf5367 --- /dev/null +++ b/test/integration/handlers/transfers/fxFulfil.test.js @@ -0,0 +1,273 @@ +/***** + License + -------------- + Copyright © 2017 Bill & Melinda Gates Foundation + The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + + Contributors + -------------- + This is the official list of the Mojaloop project contributors for this file. + Names of the original copyright holders (individuals or organizations) + should be listed with a '*' in the first column. People who have + contributed from an organization can be listed under the organization + that actually holds the copyright for their contributions (see the + Gates Foundation organization for an example). Those individuals should have + their names indented and be marked with a '-'. Email address can be added + optionally within square brackets . + * Gates Foundation + - Name Surname + + * Eugen Klymniuk + -------------- + **********/ + +const Test = require('tape') +const { Db } = require('@mojaloop/database-lib') +const { Enum, Util } = require('@mojaloop/central-services-shared') +const { Producer } = require('@mojaloop/central-services-stream').Kafka + +const Config = require('#src/lib/config') +const Cache = require('#src/lib/cache') +const fspiopErrorFactory = require('#src/shared/fspiopErrorFactory') +const ParticipantCached = require('#src/models/participant/participantCached') +const ParticipantCurrencyCached = require('#src/models/participant/participantCurrencyCached') +const ParticipantLimitCached = require('#src/models/participant/participantLimitCached') +const fxTransferModel = require('#src/models/fxTransfer/index') +const prepare = require('#src/handlers/transfers/prepare') +const cyril = require('#src/domain/fx/cyril') +const Logger = require('#src/shared/logger/Logger') +const { TABLE_NAMES } = require('#src/shared/constants') + +const { checkErrorPayload, wrapWithRetries } = require('#test/util/helpers') +const createTestConsumer = require('#test/integration/helpers/createTestConsumer') +const ParticipantHelper = require('#test/integration/helpers/participant') +const HubAccountsHelper = require('#test/integration/helpers/hubAccounts') +const fixtures = require('#test/fixtures') + +const kafkaUtil = Util.Kafka +const { Action, Type } = Enum.Events.Event +const { TOPICS } = fixtures + +const storeFxTransferPreparePayload = async (fxTransfer, transferStateId = '', addToWatchList = true) => { + const { commitRequestId } = fxTransfer + const isFx = true + const log = new Logger({ commitRequestId }) + + const dupResult = await prepare.checkDuplication({ + payload: fxTransfer, + isFx, + ID: commitRequestId, + location: {} + }) + if (dupResult.hasDuplicateId) throw new Error('fxTransfer prepare Duplication Error') + + await prepare.savePreparedRequest({ + payload: fxTransfer, + isFx, + functionality: Type.NOTIFICATION, + params: {}, + validationPassed: true, + reasons: [], + location: {} + }) + + if (transferStateId) { + const knex = Db.getKnex() + await knex(TABLE_NAMES.fxTransferStateChange) + .update({ + transferStateId, + reason: 'fxFulfil int-test' + }) + .where({ commitRequestId }) + // https://github.com/mojaloop/central-ledger/blob/ad4dd53d6914628813aa30a1dcd3af2a55f12b0d/src/domain/position/fx-prepare.js#L187 + log.info('fxTransfer state is updated', { transferStateId }) + } + + if (addToWatchList) { + await cyril.getParticipantAndCurrencyForFxTransferMessage(fxTransfer) + log.info('fxTransfer is added to watchList', { fxTransfer }) + } +} + +Test('FxFulfil flow Integration Tests -->', async fxFulfilTest => { + await Db.connect(Config.DATABASE) + await Promise.all([ + Cache.initCache(), + ParticipantCached.initialize(), + ParticipantCurrencyCached.initialize(), + ParticipantLimitCached.initialize(), + HubAccountsHelper.prepareData() + ]) + + const dfspNamePrefix = 'dfsp_' + const fxpNamePrefix = 'fxp_' + const sourceAmount = fixtures.amountDto({ currency: 'USD', amount: 433.88 }) + const targetAmount = fixtures.amountDto({ currency: 'XXX', amount: 200.22 }) + + const [payer, fxp] = await Promise.all([ + ParticipantHelper.prepareData(dfspNamePrefix, sourceAmount.currency), + ParticipantHelper.prepareData(fxpNamePrefix, sourceAmount.currency) + ]) + const DFSP_1 = payer.participant.name + const FXP = fxp.participant.name + + const createFxFulfilKafkaMessage = ({ commitRequestId, fulfilment, action = Action.FX_RESERVE } = {}) => { + const content = fixtures.fxFulfilContentDto({ + commitRequestId, + payload: fixtures.fxFulfilPayloadDto({ fulfilment }), + from: FXP, + to: DFSP_1 + }) + const fxFulfilMessage = fixtures.fxFulfilKafkaMessageDto({ + content, + from: FXP, + to: DFSP_1, + metadata: fixtures.fulfilMetadataDto({ action }) + }) + return fxFulfilMessage.value + } + + const topicFxFulfilConfig = kafkaUtil.createGeneralTopicConf( + Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, + Type.TRANSFER, + Action.FULFIL + ) + const fxFulfilProducerConfig = kafkaUtil.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + Type.TRANSFER.toUpperCase(), + Action.FULFIL.toUpperCase() + ) + const producer = new Producer(fxFulfilProducerConfig) + await producer.connect() + const produceMessageToFxFulfilTopic = async (message) => producer.sendMessage(message, topicFxFulfilConfig) + + const testConsumer = createTestConsumer([ + { type: Type.NOTIFICATION, action: Action.EVENT }, + { type: Type.TRANSFER, action: Action.POSITION }, + { type: Type.TRANSFER, action: Action.FULFIL } + ]) + await testConsumer.startListening() + await new Promise(resolve => setTimeout(resolve, 5_000)) + testConsumer.clearEvents() + fxFulfilTest.pass('setup is done') + + fxFulfilTest.test('should publish a message to send error callback if fxTransfer does not exist', async (t) => { + const noFxTransferMessage = createFxFulfilKafkaMessage() + const isTriggered = await produceMessageToFxFulfilTopic(noFxTransferMessage) + t.ok(isTriggered, 'test is triggered') + + const messages = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: TOPICS.notificationEvent, + action: Action.FX_RESERVE, + valueToFilter: FXP + })) + t.ok(messages[0], 'Notification event message is sent') + t.equal(messages[0].value.id, noFxTransferMessage.id) + checkErrorPayload(t)(messages[0].value.content.payload, fspiopErrorFactory.fxTransferNotFound()) + t.end() + }) + + fxFulfilTest.test('should process fxFulfil message (happy path)', async (t) => { + const fxTransfer = fixtures.fxTransferDto({ + initiatingFsp: DFSP_1, + counterPartyFsp: FXP, + sourceAmount, + targetAmount + }) + const { commitRequestId } = fxTransfer + + await storeFxTransferPreparePayload(fxTransfer, Enum.Transfers.TransferState.RESERVED) + t.pass(`fxTransfer prepare is saved in DB: ${commitRequestId}`) + + const fxFulfilMessage = createFxFulfilKafkaMessage({ commitRequestId }) + const isTriggered = await produceMessageToFxFulfilTopic(fxFulfilMessage) + t.ok(isTriggered, 'test is triggered') + + const messages = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: TOPICS.transferPosition, + action: Action.FX_RESERVE + })) + t.ok(messages[0], `Message is sent to ${TOPICS.transferPosition}`) + const { from, to, content } = messages[0].value + t.equal(from, FXP) + t.equal(to, DFSP_1) + t.equal(content.payload.fulfilment, fxFulfilMessage.content.payload.fulfilment, 'fulfilment is correct') + t.end() + }) + + fxFulfilTest.test('should check duplicates, and detect modified request (hash is not the same)', async (t) => { + const fxTransfer = fixtures.fxTransferDto({ + initiatingFsp: DFSP_1, + counterPartyFsp: FXP, + sourceAmount, + targetAmount + }) + const { commitRequestId } = fxTransfer + + await storeFxTransferPreparePayload(fxTransfer, '', false) + await fxTransferModel.duplicateCheck.saveFxTransferFulfilmentDuplicateCheck(commitRequestId, 'wrongHash') + t.pass(`fxTransfer prepare and duplicateCheck are saved in DB: ${commitRequestId}`) + + const fxFulfilMessage = createFxFulfilKafkaMessage({ commitRequestId }) + const isTriggered = await produceMessageToFxFulfilTopic(fxFulfilMessage) + t.ok(isTriggered, 'test is triggered') + + const messages = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: TOPICS.transferPosition, + action: Action.FX_FULFIL_DUPLICATE + })) + t.ok(messages[0], `Message is sent to ${TOPICS.transferPosition}`) + const { from, to, content, metadata } = messages[0].value + t.equal(from, fixtures.SWITCH_ID) + t.equal(to, FXP) + t.equal(metadata.event.type, Type.NOTIFICATION) + checkErrorPayload(t)(content.payload, fspiopErrorFactory.noFxDuplicateHash()) + t.end() + }) + + fxFulfilTest.test('should detect invalid fulfilment', async (t) => { + const fxTransfer = fixtures.fxTransferDto({ + initiatingFsp: DFSP_1, + counterPartyFsp: FXP, + sourceAmount, + targetAmount + }) + const { commitRequestId } = fxTransfer + + await storeFxTransferPreparePayload(fxTransfer, Enum.Transfers.TransferState.RESERVED) + t.pass(`fxTransfer prepare is saved in DB: ${commitRequestId}`) + + const fulfilment = 'wrongFulfilment' + const fxFulfilMessage = createFxFulfilKafkaMessage({ commitRequestId, fulfilment }) + const isTriggered = await produceMessageToFxFulfilTopic(fxFulfilMessage) + t.ok(isTriggered, 'test is triggered') + + const messages = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: TOPICS.transferPosition, + action: Action.FX_ABORT_VALIDATION + })) + t.ok(messages[0], `Message is sent to ${TOPICS.transferPosition}`) + const { from, to, content } = messages[0].value + t.equal(from, FXP) + t.equal(to, DFSP_1) + checkErrorPayload(t)(content.payload, fspiopErrorFactory.fxInvalidFulfilment()) + t.end() + }) + + fxFulfilTest.test('teardown', async (t) => { + await Promise.all([ + Db.disconnect(), + Cache.destroyCache(), + producer.disconnect(), + testConsumer.destroy() + ]) + await new Promise(resolve => setTimeout(resolve, 5_000)) + t.pass('teardown is finished') + t.end() + }) + + fxFulfilTest.end() +}) diff --git a/test/integration/handlers/transfers/handlers.test.js b/test/integration/handlers/transfers/handlers.test.js index 0605f761a..821151c7d 100644 --- a/test/integration/handlers/transfers/handlers.test.js +++ b/test/integration/handlers/transfers/handlers.test.js @@ -532,6 +532,7 @@ Test('Handlers test', async handlersTest => { // TODO: MIG - Disabling these handlers to test running the CL as a separate service independently. await new Promise(resolve => setTimeout(resolve, rebalanceDelay)) + testConsumer.clearEvents() test.pass('done') test.end() diff --git a/test/integration/helpers/createTestConsumer.js b/test/integration/helpers/createTestConsumer.js new file mode 100644 index 000000000..5e1cde445 --- /dev/null +++ b/test/integration/helpers/createTestConsumer.js @@ -0,0 +1,57 @@ +/***** + License + -------------- + Copyright © 2017 Bill & Melinda Gates Foundation + The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + + Contributors + -------------- + This is the official list of the Mojaloop project contributors for this file. + Names of the original copyright holders (individuals or organizations) + should be listed with a '*' in the first column. People who have + contributed from an organization can be listed under the organization + that actually holds the copyright for their contributions (see the + Gates Foundation organization for an example). Those individuals should have + their names indented and be marked with a '-'. Email address can be added + optionally within square brackets . + * Gates Foundation + - Name Surname + + * Eugen Klymniuk + -------------- + **********/ + +const { Enum, Util } = require('@mojaloop/central-services-shared') +const Config = require('#src/lib/config') +const TestConsumer = require('./testConsumer') + +/** + * Creates a TestConsumer with handlers based on the specified types/actions configurations. + * + * @param {Array} typeActionList - An array of objects with 'type' and 'action' properties + * - `type` {string} - Represents the type parameter for the topic and configuration. + * - `action` {string} - Represents the action parameter for the topic and configuration. + * + * @returns {TestConsumer} An instance of TestConsumer configured with handlers derived from + */ +const createTestConsumer = (typeActionList) => { + const handlers = typeActionList.map(({ type, action }) => ({ + topicName: Util.Kafka.transformGeneralTopicName( + Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, + type, + action + ), + config: Util.Kafka.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.CONSUMER, + type.toUpperCase(), + action.toUpperCase() + ) + })) + + return new TestConsumer(handlers) +} + +module.exports = createTestConsumer diff --git a/test/integration/helpers/testConsumer.js b/test/integration/helpers/testConsumer.js index d154159d4..1db4e0508 100644 --- a/test/integration/helpers/testConsumer.js +++ b/test/integration/helpers/testConsumer.js @@ -27,8 +27,8 @@ ******/ 'use strict' -const Logger = require('@mojaloop/central-services-logger') const { uniqueId } = require('lodash') +const Logger = require('@mojaloop/central-services-logger') const Consumer = require('@mojaloop/central-services-stream').Kafka.Consumer /** @@ -55,12 +55,13 @@ class TestConsumer { config: handlerConfig.config } // Override the client and group ids: - handler.config.rdkafkaConf['client.id'] = 'testConsumer' + const id = uniqueId() + handler.config.rdkafkaConf['client.id'] = 'testConsumer' + id // Fix issue of consumers with different partition.assignment.strategy being assigned to the same group - handler.config.rdkafkaConf['group.id'] = 'testConsumerGroup' + uniqueId() + handler.config.rdkafkaConf['group.id'] = 'testConsumerGroup' + id delete handler.config.rdkafkaConf['partition.assignment.strategy'] - Logger.warn(`TestConsumer.startListening(): registering consumer with topicName: ${handler.topicName}`) + Logger.warn(`TestConsumer.startListening(): registering consumer with uniqueId ${id} - topicName: ${handler.topicName}`) const topics = [handler.topicName] const consumer = new Consumer(topics, handler.config) await consumer.connect() diff --git a/test/unit/handlers/transfers/FxFulfilService.test.js b/test/unit/handlers/transfers/FxFulfilService.test.js index 6c6ed7e65..b655fecf5 100644 --- a/test/unit/handlers/transfers/FxFulfilService.test.js +++ b/test/unit/handlers/transfers/FxFulfilService.test.js @@ -18,27 +18,32 @@ * Gates Foundation - Name Surname - * Eugen Klymniuk -------------- **********/ /* eslint-disable object-property-newline */ const Sinon = require('sinon') const Test = require('tapes')(require('tape')) +const { Db } = require('@mojaloop/database-lib') const { Enum, Util } = require('@mojaloop/central-services-shared') const { Consumer, Producer } = require('@mojaloop/central-services-stream').Util const FxFulfilService = require('../../../../src/handlers/transfers/FxFulfilService') +const fspiopErrorFactory = require('../../../../src/shared/fspiopErrorFactory') const Validator = require('../../../../src/handlers/transfers/validator') const FxTransferModel = require('../../../../src/models/fxTransfer') const Config = require('../../../../src/lib/config') +const { ERROR_MESSAGES } = require('../../../../src/shared/constants') const { Logger } = require('../../../../src/shared/logger') const fixtures = require('../../../fixtures') const mocks = require('./mocks') +const { checkErrorPayload } = require('#test/util/helpers') const { Kafka, Comparators, Hash } = Util const { Action } = Enum.Events.Event +const { TOPICS } = fixtures const log = new Logger() // const functionality = Type.NOTIFICATION @@ -46,6 +51,7 @@ const log = new Logger() Test('FxFulfilService Tests -->', fxFulfilTest => { let sandbox let span + let producer const createFxFulfilServiceWithTestData = (message) => { const { @@ -56,7 +62,7 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { kafkaTopic } = FxFulfilService.decodeKafkaMessage(message) - const kafkaParams = { + const params = { message, kafkaTopic, span, @@ -65,7 +71,7 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { producer: Producer } const service = new FxFulfilService({ - log, Config, Comparators, Validator, FxTransferModel, Kafka, kafkaParams + log, Config, Comparators, Validator, FxTransferModel, Kafka, params }) return { @@ -76,9 +82,12 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { fxFulfilTest.beforeEach(test => { sandbox = Sinon.createSandbox() + producer = sandbox.stub(Producer) + sandbox.stub(Consumer, 'isConsumerAutoCommitEnabled').returns(true) + sandbox.stub(Db) + sandbox.stub(FxTransferModel.fxTransfer) sandbox.stub(FxTransferModel.duplicateCheck) span = mocks.createTracerStub(sandbox).SpanStub - // producer = sandbox.stub(Producer) test.end() }) @@ -97,8 +106,8 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { commitRequestId, payload } = createFxFulfilServiceWithTestData(message) - FxTransferModel.duplicateCheck.getFxTransferDuplicateCheck.resolves({ hash: Hash.generateSha256(payload) }) - FxTransferModel.duplicateCheck.saveFxTransferDuplicateCheck.resolves() + FxTransferModel.duplicateCheck.getFxTransferFulfilmentDuplicateCheck.resolves({ hash: Hash.generateSha256(payload) }) + FxTransferModel.duplicateCheck.saveFxTransferFulfilmentDuplicateCheck.resolves() FxTransferModel.duplicateCheck.getFxTransferErrorDuplicateCheck.rejects(new Error('Should not be called')) FxTransferModel.duplicateCheck.saveFxTransferErrorDuplicateCheck.rejects(new Error('Should not be called')) @@ -117,8 +126,8 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { commitRequestId, payload } = createFxFulfilServiceWithTestData(message) - FxTransferModel.duplicateCheck.getFxTransferDuplicateCheck.rejects(new Error('Should not be called')) - FxTransferModel.duplicateCheck.saveFxTransferDuplicateCheck.rejects(new Error('Should not be called')) + FxTransferModel.duplicateCheck.getFxTransferFulfilmentDuplicateCheck.rejects(new Error('Should not be called')) + FxTransferModel.duplicateCheck.saveFxTransferFulfilmentDuplicateCheck.rejects(new Error('Should not be called')) FxTransferModel.duplicateCheck.getFxTransferErrorDuplicateCheck.resolves({ hash: Hash.generateSha256(payload) }) FxTransferModel.duplicateCheck.saveFxTransferErrorDuplicateCheck.resolves() @@ -131,5 +140,52 @@ Test('FxFulfilService Tests -->', fxFulfilTest => { methodTest.end() }) + fxFulfilTest.test('validateFulfilment Method Tests -->', methodTest => { + methodTest.test('should pass fulfilment validation', async t => { + const { service } = createFxFulfilServiceWithTestData(fixtures.fxFulfilKafkaMessageDto()) + const transfer = { + ilpCondition: fixtures.CONDITION, + counterPartyFspTargetParticipantCurrencyId: 123 + } + const payload = { fulfilment: fixtures.FULFILMENT } + + const isOk = await service.validateFulfilment(transfer, payload) + t.true(isOk) + t.end() + }) + + methodTest.test('should process wrong fulfilment', async t => { + Db.getKnex.resolves({ + transaction: sandbox.stub + }) + FxTransferModel.fxTransfer.saveFxFulfilResponse.restore() // to call real saveFxFulfilResponse impl. + + const { service } = createFxFulfilServiceWithTestData(fixtures.fxFulfilKafkaMessageDto()) + const transfer = { + ilpCondition: fixtures.CONDITION, + counterPartyFspTargetParticipantCurrencyId: 123 + } + const payload = { fulfilment: 'wrongFulfilment' } + + try { + await service.validateFulfilment(transfer, payload) + t.fail('Should throw fxInvalidFulfilment error') + } catch (err) { + t.equal(err.message, ERROR_MESSAGES.fxInvalidFulfilment) + t.ok(producer.produceMessage.calledOnce) + const [messageProtocol, topicConfig] = producer.produceMessage.lastCall.args + t.equal(topicConfig.topicName, TOPICS.transferPosition) + t.equal(topicConfig.key, String(transfer.counterPartyFspTargetParticipantCurrencyId)) + t.equal(messageProtocol.from, fixtures.FXP_ID) + t.equal(messageProtocol.to, fixtures.DFSP1_ID) + t.equal(messageProtocol.metadata.event.action, Action.FX_ABORT_VALIDATION) + checkErrorPayload(t)(messageProtocol.content.payload, fspiopErrorFactory.fxInvalidFulfilment()) + } + t.end() + }) + + methodTest.end() + }) + fxFulfilTest.end() })