Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: get fx transfer not working #1098

Merged
merged 16 commits into from
Sep 13, 2024
Merged
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.7.5",
"@mojaloop/central-services-shared": "18.7.6",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down
2 changes: 1 addition & 1 deletion src/domain/position/fx-fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ const processPositionFxFulfilBin = async (
'application/json'
)

transferStateId = Enum.Transfers.TransferState.COMMITTED
// No need to change the transfer state here for success case.

binItem.result = { success: true }
}
Expand Down
56 changes: 38 additions & 18 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -760,19 +760,21 @@ const getTransfer = async (error, messages) => {
} else {
message = messages
}
const action = message.value.metadata.event.action
const isFx = action === TransferEventAction.FX_GET
const contextFromMessage = EventSdk.Tracer.extractContextFromMessage(message.value)
const span = EventSdk.Tracer.createChildSpanFromContext('cl_transfer_get', contextFromMessage)
try {
await span.audit(message, EventSdk.AuditEventAction.start)
const metadata = message.value.metadata
const action = metadata.event.action
const transferId = message.value.content.uriParams.id
const transferIdOrCommitRequestId = message.value.content.uriParams.id
const kafkaTopic = message.topic
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, { method: `getTransfer:${action}` }))

const actionLetter = Enum.Events.ActionLetter.get
const params = { message, kafkaTopic, span, consumer: Consumer, producer: Producer }
const eventDetail = { functionality: TransferEventType.NOTIFICATION, action: TransferEventAction.GET }
const eventDetail = { functionality: TransferEventType.NOTIFICATION, action }

Util.breadcrumb(location, { path: 'validationFailed' })
if (!await Validator.validateParticipantByName(message.value.from)) {
Expand All @@ -781,24 +783,42 @@ const getTransfer = async (error, messages) => {
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
}
const transfer = await TransferService.getByIdLight(transferId)
if (!transfer) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorTransferNotFound--${actionLetter}3`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_ID_NOT_FOUND, 'Provided Transfer ID was not found on the server.')
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
if (!await Validator.validateParticipantTransferId(message.value.from, transferId)) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorNotTransferParticipant--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.CLIENT_ERROR)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
if (isFx) {
const fxTransfer = await FxTransferModel.fxTransfer.getByIdLight(transferIdOrCommitRequestId)
if (!fxTransfer) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorTransferNotFound--${actionLetter}3`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_ID_NOT_FOUND, 'Provided commitRequest ID was not found on the server.')
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
if (!await Validator.validateParticipantForCommitRequestId(message.value.from, transferIdOrCommitRequestId)) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorNotFxTransferParticipant--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.CLIENT_ERROR)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
Util.breadcrumb(location, { path: 'validationPassed' })
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackMessage--${actionLetter}4`))
message.value.content.payload = TransferObjectTransform.toFulfil(fxTransfer, true)
} else {
const transfer = await TransferService.getByIdLight(transferIdOrCommitRequestId)
if (!transfer) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorTransferNotFound--${actionLetter}3`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_ID_NOT_FOUND, 'Provided Transfer ID was not found on the server.')
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
if (!await Validator.validateParticipantTransferId(message.value.from, transferIdOrCommitRequestId)) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorNotTransferParticipant--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.CLIENT_ERROR)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
Util.breadcrumb(location, { path: 'validationPassed' })
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackMessage--${actionLetter}4`))
message.value.content.payload = TransferObjectTransform.toFulfil(transfer)
}

// ============================================================================================
Util.breadcrumb(location, { path: 'validationPassed' })
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackMessage--${actionLetter}4`))
message.value.content.payload = TransferObjectTransform.toFulfil(transfer)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch, hubName: Config.HUB_NAME })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ const processDuplication = async ({
const transfer = await createRemittanceEntity(isFx)
.getByIdLight(ID)

const finalizedState = [TransferState.COMMITTED, TransferState.ABORTED]
const finalizedState = [TransferState.COMMITTED, TransferState.ABORTED, TransferState.RESERVED]
const isFinalized =
finalizedState.includes(transfer?.transferStateEnumeration) ||
finalizedState.includes(transfer?.fxTransferStateEnumeration)
Expand Down
13 changes: 12 additions & 1 deletion src/handlers/transfers/validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const Decimal = require('decimal.js')
const Config = require('../../lib/config')
const Participant = require('../../domain/participant')
const Transfer = require('../../domain/transfer')
const FxTransferModel = require('../../models/fxTransfer')
const CryptoConditions = require('../../cryptoConditions')
const Crypto = require('crypto')
const base64url = require('base64url')
Expand Down Expand Up @@ -265,11 +266,21 @@ const validateParticipantTransferId = async function (participantName, transferI
return validationPassed
}

const validateParticipantForCommitRequestId = async function (participantName, commitRequestId) {
const fxTransferParticipants = await FxTransferModel.fxTransfer.getFxTransferParticipant(participantName, commitRequestId)
let validationPassed = false
if (Array.isArray(fxTransferParticipants) && fxTransferParticipants.length > 0) {
validationPassed = true
}
return validationPassed
}

module.exports = {
validatePrepare,
validateById,
validateFulfilCondition,
validateParticipantByName,
reasons,
validateParticipantTransferId
validateParticipantTransferId,
validateParticipantForCommitRequestId
}
24 changes: 23 additions & 1 deletion src/models/fxTransfer/fxTransfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const getByIdLight = async (id) => {
.where({ 'fxTransfer.commitRequestId': id })
.leftJoin('fxTransferStateChange AS tsc', 'tsc.commitRequestId', 'fxTransfer.commitRequestId')
.leftJoin('transferState AS ts', 'ts.transferStateId', 'tsc.transferStateId')
.leftJoin('fxTransferFulfilment AS tf', 'tf.commitRequestId', 'fxTransfer.commitRequestId')
.select(
'fxTransfer.*',
'tsc.fxTransferStateChangeId',
Expand All @@ -45,7 +46,8 @@ const getByIdLight = async (id) => {
'ts.description as fxTransferStateDescription',
'tsc.reason AS reason',
'tsc.createdDate AS completedTimestamp',
'fxTransfer.ilpCondition AS condition'
'fxTransfer.ilpCondition AS condition',
'tf.ilpFulfilment AS fulfilment'
)
.orderBy('tsc.fxTransferStateChangeId', 'desc')
.first()
Expand Down Expand Up @@ -521,11 +523,31 @@ const updateFxPrepareReservedForwarded = async function (commitRequestId) {
}
}

const getFxTransferParticipant = async (participantName, commitRequestId) => {
try {
return Db.from('participant').query(async (builder) => {
return builder
.where({
'ftp.commitRequestId': commitRequestId,
'participant.name': participantName,
'participant.isActive': 1
})
.innerJoin('fxTransferParticipant AS ftp', 'ftp.participantId', 'participant.participantId')
.select(
'ftp.*'
)
})
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

module.exports = {
getByCommitRequestId,
getByDeterminingTransferId,
getByIdLight,
getAllDetailsByCommitRequestId,
getFxTransferParticipant,
savePreparedRequest,
saveFxFulfilResponse,
saveFxTransfer,
Expand Down
10 changes: 5 additions & 5 deletions test/integration-override/handlers/transfers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ Test('Handlers test', async handlersTest => {
test.end()
})

await transferPrepare.test('send fxTransfer information callback when fxTransfer is COMMITTED on duplicate request', async (test) => {
await transferPrepare.test('send fxTransfer information callback when fxTransfer is RESERVED on duplicate request', async (test) => {
const td = await prepareTestData(testData)
const prepareConfig = Utility.getKafkaConfig(
Config.KAFKA_CONFIG,
Expand Down Expand Up @@ -678,13 +678,13 @@ Test('Handlers test', async handlersTest => {

try {
const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {}
test.equal(fxTransfer?.fxTransferState, TransferInternalState.COMMITTED, 'FxTransfer state updated to COMMITTED')
test.equal(fxTransfer?.fxTransferState, TransferInternalState.RECEIVED_FULFIL_DEPENDENT, 'FxTransfer state updated to RECEIVED_FULFIL_DEPENDENT')
} catch (err) {
Logger.error(err)
test.fail(err.message)
}

// Resend fx-prepare after state is COMMITTED
// Resend fx-prepare after state is RESERVED
await new Promise(resolve => setTimeout(resolve, 2000))
await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig)

Expand Down Expand Up @@ -1162,7 +1162,7 @@ Test('Handlers test', async handlersTest => {
test.end()
})

await transferFxForwarded.test('should be able to transition from RESERVED_FORWARDED to COMMITED on fx-fulfil', async (test) => {
await transferFxForwarded.test('should be able to transition from RESERVED_FORWARDED to RECEIVED_FULFIL_DEPENDENT on fx-fulfil', async (test) => {
const td = await prepareTestData(testData)
const prepareConfig = Utility.getKafkaConfig(
Config.KAFKA_CONFIG,
Expand Down Expand Up @@ -1218,7 +1218,7 @@ Test('Handlers test', async handlersTest => {

try {
const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {}
test.equal(fxTransfer?.fxTransferState, TransferInternalState.COMMITTED, 'FxTransfer state updated to COMMITTED')
test.equal(fxTransfer?.fxTransferState, TransferInternalState.RECEIVED_FULFIL_DEPENDENT, 'FxTransfer state updated to RECEIVED_FULFIL_DEPENDENT')
} catch (err) {
Logger.error(err)
test.fail(err.message)
Expand Down
13 changes: 5 additions & 8 deletions test/unit/domain/position/fx-fulfil.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ Test('Fx Fulfil domain', processPositionFxFulfilBinTest => {
test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-destination'], fxTransferCallbackTestData1.message.value.content.headers['fspiop-destination'])
test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], fxTransferCallbackTestData1.message.value.content.headers['fspiop-source'])
test.equal(processedMessages.notifyMessages[0].message.content.headers['content-type'], fxTransferCallbackTestData1.message.value.content.headers['content-type'])
test.equal(processedMessages.accumulatedFxTransferStates[fxTransferCallbackTestData1.message.value.id], Enum.Transfers.TransferInternalState.COMMITTED)
test.equal(processedMessages.accumulatedFxTransferStates[fxTransferCallbackTestData1.message.value.id], Enum.Transfers.TransferInternalState.RECEIVED_FULFIL_DEPENDENT)

test.equal(processedMessages.notifyMessages[1].message.content.headers.accept, fxTransferCallbackTestData2.message.value.content.headers.accept)
test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-destination'], fxTransferCallbackTestData2.message.value.content.headers['fspiop-destination'])
test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], fxTransferCallbackTestData2.message.value.content.headers['fspiop-source'])
test.equal(processedMessages.notifyMessages[1].message.content.headers['content-type'], fxTransferCallbackTestData2.message.value.content.headers['content-type'])
test.equal(processedMessages.accumulatedFxTransferStates[fxTransferCallbackTestData2.message.value.id], Enum.Transfers.TransferInternalState.COMMITTED)
test.equal(processedMessages.accumulatedFxTransferStates[fxTransferCallbackTestData2.message.value.id], Enum.Transfers.TransferInternalState.RECEIVED_FULFIL_DEPENDENT)

test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, fxTransferCallbackTestData3.message.value.id)
test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, fxTransferCallbackTestData3.message.value.content.headers.accept)
Expand All @@ -186,12 +186,9 @@ Test('Fx Fulfil domain', processPositionFxFulfilBinTest => {
test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001')
test.equal(processedMessages.accumulatedFxTransferStates[fxTransferCallbackTestData3.message.value.id], Enum.Transfers.TransferInternalState.ABORTED_REJECTED)

test.equal(processedMessages.accumulatedFxTransferStateChanges.length, 3)
test.equal(processedMessages.accumulatedFxTransferStateChanges[0].commitRequestId, fxTransferCallbackTestData1.message.value.id)
test.equal(processedMessages.accumulatedFxTransferStateChanges[1].commitRequestId, fxTransferCallbackTestData2.message.value.id)
test.equal(processedMessages.accumulatedFxTransferStateChanges[0].transferStateId, Enum.Transfers.TransferInternalState.COMMITTED)
test.equal(processedMessages.accumulatedFxTransferStateChanges[1].transferStateId, Enum.Transfers.TransferInternalState.COMMITTED)
test.equal(processedMessages.accumulatedFxTransferStateChanges[2].transferStateId, Enum.Transfers.TransferInternalState.ABORTED_REJECTED)
test.equal(processedMessages.accumulatedFxTransferStateChanges.length, 1)
test.equal(processedMessages.accumulatedFxTransferStateChanges[0].commitRequestId, fxTransferCallbackTestData3.message.value.id)
test.equal(processedMessages.accumulatedFxTransferStateChanges[0].transferStateId, Enum.Transfers.TransferInternalState.ABORTED_REJECTED)

test.end()
})
Expand Down
26 changes: 26 additions & 0 deletions test/unit/handlers/transfers/validator.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const Test = require('tapes')(require('tape'))
const Sinon = require('sinon')
const Participant = require('../../../../src/domain/participant')
const Transfer = require('../../../../src/domain/transfer')
const FxTransferModel = require('../../../../src/models/fxTransfer')
const Validator = require('../../../../src/handlers/transfers/validator')
const CryptoConditions = require('../../../../src/cryptoConditions')
const Enum = require('@mojaloop/central-services-shared').Enum
Expand Down Expand Up @@ -82,6 +83,7 @@ Test('transfer validator', validatorTest => {
sandbox.stub(Participant)
sandbox.stub(CryptoConditions, 'validateCondition')
sandbox.stub(Transfer, 'getTransferParticipant')
sandbox.stub(FxTransferModel.fxTransfer, 'getFxTransferParticipant')
test.end()
})

Expand Down Expand Up @@ -341,5 +343,29 @@ Test('transfer validator', validatorTest => {
validateParticipantTransferIdTest.end()
})

validatorTest.test('validateParticipantForCommitRequestId should', validateParticipantForCommitRequestIdTest => {
validateParticipantForCommitRequestIdTest.test('validate the CommitRequestId belongs to the requesting fsp', async (test) => {
const participantName = 'fsp1'
const commitRequestId = '88416f4c-68a3-4819-b8e0-c23b27267cd5'
FxTransferModel.fxTransfer.getFxTransferParticipant.withArgs(participantName, commitRequestId).returns(Promise.resolve([1]))

const result = await Validator.validateParticipantForCommitRequestId(participantName, commitRequestId)
test.equal(result, true, 'results match')
test.end()
})

validateParticipantForCommitRequestIdTest.test('validate the CommitRequestId belongs to the requesting fsp return false for no match', async (test) => {
const participantName = 'fsp1'
const commitRequestId = '88416f4c-68a3-4819-b8e0-c23b27267cd5'
FxTransferModel.fxTransfer.getFxTransferParticipant.withArgs(participantName, commitRequestId).returns(Promise.resolve([]))

const result = await Validator.validateParticipantForCommitRequestId(participantName, commitRequestId)
test.equal(result, false, 'results match')
test.end()
})

validateParticipantForCommitRequestIdTest.end()
})

validatorTest.end()
})