Skip to content

Commit

Permalink
feat(mojaloop/#3844): added integration tests for fxFulfil flow (#1011)
Browse files Browse the repository at this point in the history
* feat(mojaloop/#3844): added corner cases impl. for FX; added unit-tests

* feat(mojaloop/#3844): added corner cases impl. for FX; added unit-tests

* feat(mojaloop/#3844): added fxTransferErrorDuplicateCheck table; moved fxFulfilt tests in a separare file

* feat(mojaloop/#3844): run tests with output

* feat(mojaloop/#3844): fixed unit-test on ci env

* feat(mojaloop/#3844): added unit-tests for FxFulfilService;  moved duplicateCheckComparator logic to service

* feat(mojaloop/#3844): reverted ci test-coverage

* feat(mojaloop/#3844): added license

* feat(mojaloop/#3844): moved checkErrorPayload to helpers

* feat(mojaloop/#3844): moved checkErrorPayload to helpers

* feat(mojaloop/#3844): updated from feat/fx-impl

* feat(mojaloop/#3844): added integration tests for fxFulfil flow

* feat(mojaloop/#3844): fixed  producer.disconnect() in int-tests

* feat(mojaloop/#3844): added test:int:transfers script

* feat(mojaloop/#3844): added duplicateCheck int test

* feat(mojaloop/#3844): small cleanup

* feat(mojaloop/#3844): added duplicate and fulfilment check int-tests

* feat(mojaloop/#3844): removed unneeded code

* feat(mojaloop/#3844): added testConsumer.clearEvents() for int-tests

* feat(mojaloop/#3844): skipped newly added int-test

* feat(mojaloop/#3844): updated validateFulfilCondition

* feat: unskip int-test

feat: unskip int-test

* feat(mojaloop/#3844): removed unneeded npm script

---------

Co-authored-by: Kevin Leyow <kleyow@gmail.com>
  • Loading branch information
geka-evk and kleyow authored Apr 29, 2024
1 parent 05c4ce9 commit afc4c5c
Show file tree
Hide file tree
Showing 15 changed files with 528 additions and 101 deletions.
37 changes: 27 additions & 10 deletions src/handlers/transfers/FxFulfilService.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>
* Eugen Klymniuk <eugen.klymniuk@infitx.com
* Eugen Klymniuk <eugen.klymniuk@infitx.com>
--------------
**********/

Expand Down Expand Up @@ -111,13 +111,14 @@ class FxFulfilService {

async getDuplicateCheckResult({ commitRequestId, payload, action }) {
const { duplicateCheck } = this.FxTransferModel
const isFxTransferError = action === Action.FX_ABORT

const getDuplicateFn = action === Action.FX_ABORT
const getDuplicateFn = isFxTransferError
? duplicateCheck.getFxTransferErrorDuplicateCheck
: duplicateCheck.getFxTransferDuplicateCheck
const saveHashFn = action === Action.FX_ABORT
: duplicateCheck.getFxTransferFulfilmentDuplicateCheck
const saveHashFn = isFxTransferError
? duplicateCheck.saveFxTransferErrorDuplicateCheck
: duplicateCheck.saveFxTransferDuplicateCheck
: duplicateCheck.saveFxTransferFulfilmentDuplicateCheck

return this.Comparators.duplicateCheckComparator(
commitRequestId,
Expand Down Expand Up @@ -212,17 +213,20 @@ class FxFulfilService {
})
throw fspiopError
}
this.log.debug('validateEventType is passed', { type, functionality })
}

async validateFulfilment(transfer, payload) {
if (payload.fulfilment && !this.Validator.validateFulfilCondition(payload.fulfilment, transfer.condition)) {
const isValid = this.validateFulfilCondition(payload.fulfilment, transfer.ilpCondition)

if (!isValid) {
const fspiopError = fspiopErrorFactory.fxInvalidFulfilment()
const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING)
const eventDetail = {
functionality: Type.POSITION,
action: Action.FX_ABORT_VALIDATION
}
this.log.warn('callbackErrorInvalidFulfilment', { eventDetail, apiFSPIOPError })
this.log.warn('callbackErrorInvalidFulfilment', { eventDetail, apiFSPIOPError, transfer, payload })
await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, eventDetail.action, apiFSPIOPError)

await this.kafkaProceed({
Expand All @@ -233,9 +237,9 @@ class FxFulfilService {
})
throw fspiopError
}
this.log.info('fulfilmentCheck passed successfully')

return true
this.log.info('fulfilmentCheck passed successfully', { isValid })
return isValid
}

async validateTransferState(transfer, functionality) {
Expand All @@ -246,7 +250,7 @@ class FxFulfilService {
functionality,
action: Action.FX_RESERVE
}
this.log.warn('callbackErrorNonReservedState', { eventDetail, apiFSPIOPError })
this.log.warn('callbackErrorNonReservedState', { eventDetail, apiFSPIOPError, transfer })

await this.kafkaProceed({
consumerCommit,
Expand All @@ -256,6 +260,8 @@ class FxFulfilService {
})
throw fspiopError
}
this.log.debug('validateTransferState is passed')
return true
}

async validateExpirationDate(transfer, functionality) {
Expand Down Expand Up @@ -320,6 +326,17 @@ class FxFulfilService {
return this.Kafka.proceed(this.Config.KAFKA_CONFIG, this.params, kafkaOpts)
}

validateFulfilCondition(fulfilment, condition) {
try {
const isValid = fulfilment && this.Validator.validateFulfilCondition(fulfilment, condition)
this.log.debug('validateFulfilCondition result:', { isValid, fulfilment, condition })
return isValid
} catch (err) {
this.log.warn(`validateFulfilCondition error: ${err?.message}`, { fulfilment, condition })
return false
}
}

static decodeKafkaMessage(message) {
if (!message?.value) {
throw TypeError('Invalid message format!')
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>
* Eugen Klymniuk <eugen.klymniuk@infitx.com
* Eugen Klymniuk <eugen.klymniuk@infitx.com>
--------------
**********/

Expand Down
136 changes: 76 additions & 60 deletions src/models/fxTransfer/duplicateCheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,31 @@ const { TABLE_NAMES } = require('../../shared/constants')

const histName = 'model_fx_transfer'

/**
* @function GetTransferDuplicateCheck
*
* @async
* @description This retrieves the fxTransferDuplicateCheck table record if present
*
* @param {string} commitRequestId - the fxTransfer commitRequestId
*
* @returns {object} - Returns the record from fxTransferDuplicateCheck table, or throws an error if failed
*/
const getFxTransferDuplicateCheck = async (commitRequestId) => {
const table = TABLE_NAMES.fxTransferDuplicateCheck
const queryName = `${table}_getFxTransferDuplicateCheck`
const getOneByCommitRequestId = async ({ commitRequestId, table, queryName }) => {
const histTimerEnd = Metrics.getHistogram(
histName,
`${queryName} - Metrics for fxTransfer duplicate check model`,
['success', 'queryName']
).startTimer()
logger.debug(`get ${table}`, { commitRequestId })
logger.debug('get duplicate record', { commitRequestId, table, queryName })

try {
const result = await Db.from(table).findOne({ commitRequestId })
histTimerEnd({ success: true, queryName })
return result
} catch (err) {
histTimerEnd({ success: false, queryName })
throw new Error(err?.message)
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

/**
* @function SaveTransferDuplicateCheck
*
* @async
* @description This inserts a record into fxTransferDuplicateCheck table
*
* @param {string} commitRequestId - the fxTransfer commitRequestId
* @param {string} hash - the hash of the fxTransfer request payload
*
* @returns {integer} - Returns the database id of the inserted row, or throws an error if failed
*/
const saveFxTransferDuplicateCheck = async (commitRequestId, hash) => {
const table = TABLE_NAMES.fxTransferDuplicateCheck
const queryName = `${table}_saveFxTransferDuplicateCheck`
const saveCommitRequestIdAndHash = async ({ commitRequestId, hash, table, queryName }) => {
const histTimerEnd = Metrics.getHistogram(
histName,
`${queryName} - Metrics for fxTransfer duplicate check model`,
['success', 'queryName']
).startTimer()
logger.debug(`save ${table}`, { commitRequestId, hash })
logger.debug('save duplicate record', { commitRequestId, hash, table })

try {
const result = await Db.from(table).insert({ commitRequestId, hash })
Expand All @@ -67,6 +42,39 @@ const saveFxTransferDuplicateCheck = async (commitRequestId, hash) => {
}
}

/**
* @function GetTransferDuplicateCheck
*
* @async
* @description This retrieves the fxTransferDuplicateCheck table record if present
*
* @param {string} commitRequestId - the fxTransfer commitRequestId
*
* @returns {object} - Returns the record from fxTransferDuplicateCheck table, or throws an error if failed
*/
const getFxTransferDuplicateCheck = async (commitRequestId) => {
const table = TABLE_NAMES.fxTransferDuplicateCheck
const queryName = `${table}_getFxTransferDuplicateCheck`
return getOneByCommitRequestId({ commitRequestId, table, queryName })
}

/**
* @function SaveTransferDuplicateCheck
*
* @async
* @description This inserts a record into fxTransferDuplicateCheck table
*
* @param {string} commitRequestId - the fxTransfer commitRequestId
* @param {string} hash - the hash of the fxTransfer request payload
*
* @returns {integer} - Returns the database id of the inserted row, or throws an error if failed
*/
const saveFxTransferDuplicateCheck = async (commitRequestId, hash) => {
const table = TABLE_NAMES.fxTransferDuplicateCheck
const queryName = `${table}_saveFxTransferDuplicateCheck`
return saveCommitRequestIdAndHash({ commitRequestId, hash, table, queryName })
}

/**
* @function getFxTransferErrorDuplicateCheck
*
Expand All @@ -80,21 +88,7 @@ const saveFxTransferDuplicateCheck = async (commitRequestId, hash) => {
const getFxTransferErrorDuplicateCheck = async (commitRequestId) => {
const table = TABLE_NAMES.fxTransferErrorDuplicateCheck
const queryName = `${table}_getFxTransferErrorDuplicateCheck`
const histTimerEnd = Metrics.getHistogram(
histName,
`${queryName} - Metrics for fxTransfer error duplicate check model`,
['success', 'queryName']
).startTimer()
logger.debug(`get ${table}`, { commitRequestId })

try {
const result = await Db.from(table).findOne({ commitRequestId })
histTimerEnd({ success: true, queryName })
return result
} catch (err) {
histTimerEnd({ success: false, queryName })
throw new Error(err?.message)
}
return getOneByCommitRequestId({ commitRequestId, table, queryName })
}

/**
Expand All @@ -111,27 +105,49 @@ const getFxTransferErrorDuplicateCheck = async (commitRequestId) => {
const saveFxTransferErrorDuplicateCheck = async (commitRequestId, hash) => {
const table = TABLE_NAMES.fxTransferErrorDuplicateCheck
const queryName = `${table}_saveFxTransferErrorDuplicateCheck`
const histTimerEnd = Metrics.getHistogram(
histName,
`${queryName} - Metrics for fxTransfer error duplicate check model`,
['success', 'queryName']
).startTimer()
logger.debug(`save ${table}`, { commitRequestId, hash })
return saveCommitRequestIdAndHash({ commitRequestId, hash, table, queryName })
}

try {
const result = await Db.from(table).insert({ commitRequestId, hash })
histTimerEnd({ success: true, queryName })
return result
} catch (err) {
histTimerEnd({ success: false, queryName })
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
/**
* @function getFxTransferFulfilmentDuplicateCheck
*
* @async
* @description This retrieves the fxTransferFulfilmentDuplicateCheck table record if present
*
* @param {string} commitRequestId - the fxTransfer commitRequestId
*
* @returns {object} - Returns the record from fxTransferFulfilmentDuplicateCheck table, or throws an error if failed
*/
const getFxTransferFulfilmentDuplicateCheck = async (commitRequestId) => {
const table = TABLE_NAMES.fxTransferFulfilmentDuplicateCheck
const queryName = `${table}_getFxTransferFulfilmentDuplicateCheck`
return getOneByCommitRequestId({ commitRequestId, table, queryName })
}

/**
* @function saveFxTransferFulfilmentDuplicateCheck
*
* @async
* @description This inserts a record into fxTransferFulfilmentDuplicateCheck table
*
* @param {string} commitRequestId - the fxTransfer commitRequestId
* @param {string} hash - the hash of the fxTransfer request payload
*
* @returns {integer} - Returns the database id of the inserted row, or throws an error if failed
*/
const saveFxTransferFulfilmentDuplicateCheck = async (commitRequestId, hash) => {
const table = TABLE_NAMES.fxTransferFulfilmentDuplicateCheck
const queryName = `${table}_saveFxTransferFulfilmentDuplicateCheck`
return saveCommitRequestIdAndHash({ commitRequestId, hash, table, queryName })
}

module.exports = {
getFxTransferDuplicateCheck,
saveFxTransferDuplicateCheck,

getFxTransferErrorDuplicateCheck,
saveFxTransferErrorDuplicateCheck
saveFxTransferErrorDuplicateCheck,

getFxTransferFulfilmentDuplicateCheck,
saveFxTransferFulfilmentDuplicateCheck
}
8 changes: 4 additions & 4 deletions src/models/fxTransfer/fxTransfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ const savePreparedRequest = async (payload, stateReason, hasPassedValidation) =>
getParticipant(payload.counterPartyFsp, payload.sourceAmount.currency),
getParticipant(payload.counterPartyFsp, payload.targetAmount.currency)
])
// todo: clarify, what we should do if no initiatingParticipant or counterParticipant found?

const fxTransferRecord = {
commitRequestId: payload.commitRequestId,
Expand Down Expand Up @@ -275,8 +276,7 @@ const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopErro
const errorDescription = fspiopError && fspiopError.errorInformation && fspiopError.errorInformation.errorDescription
// let extensionList
switch (action) {
// TODO: Need to check if these are relevant for FX transfers
// case TransferEventAction.COMMIT:
case TransferEventAction.FX_COMMIT:
case TransferEventAction.FX_RESERVE:
state = TransferInternalState.RECEIVED_FULFIL
// extensionList = payload && payload.extensionList
Expand All @@ -287,8 +287,8 @@ const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopErro
// extensionList = payload && payload.extensionList
isFulfilment = true
break
// TODO: Need to check if these are relevant for FX transfers
// case TransferEventAction.ABORT_VALIDATION:

case TransferEventAction.FX_ABORT_VALIDATION:
case TransferEventAction.FX_ABORT:
state = TransferInternalState.RECEIVED_ERROR
// extensionList = payload && payload.errorInformation && payload.errorInformation.extensionList
Expand Down
1 change: 1 addition & 0 deletions src/shared/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const TABLE_NAMES = Object.freeze({
fxTransfer: 'fxTransfer',
fxTransferDuplicateCheck: 'fxTransferDuplicateCheck',
fxTransferErrorDuplicateCheck: 'fxTransferErrorDuplicateCheck',
fxTransferFulfilmentDuplicateCheck: 'fxTransferFulfilmentDuplicateCheck',
fxTransferParticipant: 'fxTransferParticipant',
fxTransferStateChange: 'fxTransferStateChange',
fxWatchList: 'fxWatchList',
Expand Down
2 changes: 1 addition & 1 deletion src/shared/fspiopErrorFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>
* Eugen Klymniuk <eugen.klymniuk@infitx.com
* Eugen Klymniuk <eugen.klymniuk@infitx.com>
--------------
**********/

Expand Down
2 changes: 1 addition & 1 deletion src/shared/logger/Logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>
* Eugen Klymniuk <eugen.klymniuk@infitx.com
* Eugen Klymniuk <eugen.klymniuk@infitx.com>
--------------
**********/

Expand Down
Loading

0 comments on commit afc4c5c

Please sign in to comment.