Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csi-22): add prepare participant substitution #1065

Merged
merged 19 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,853 changes: 106 additions & 2,747 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.4",
"npm-check-updates": "16.14.20",
"npm-check-updates": "17.0.0",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
Expand Down
28 changes: 21 additions & 7 deletions src/domain/fx/cyril.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const checkIfDeterminingTransferExistsForTransferMessage = async (payload) => {
}
}

const checkIfDeterminingTransferExistsForFxTransferMessage = async (payload) => {
const checkIfDeterminingTransferExistsForFxTransferMessage = async (payload, proxyObligation) => {
// Does this determining transfer ID appear on the transfer list?
const transferRecord = await TransferModel.getById(payload.determiningTransferId)
const determiningTransferExistsInTransferList = (transferRecord !== null)
Expand All @@ -73,12 +73,17 @@ const checkIfDeterminingTransferExistsForFxTransferMessage = async (payload) =>
{
participantName: payload.counterPartyFsp,
currencyId: payload.sourceAmount.currency
},
{
participantName: payload.counterPartyFsp,
currencyId: payload.targetAmount.currency
}
]
// If a proxy is representing a FXP in a jurisdictional scenario,
// they would not hold a position account for the `targetAmount` currency
// for a /fxTransfer. So we skip adding this to accounts to be validated.
if (!proxyObligation.isCounterPartyFspProxy) {
participantCurrencyValidationList.push({
participantName: payload.counterPartyFsp,
currencyId: payload.targetAmount.currency
})
}
if (determiningTransferExistsInTransferList) {
// If there's a currency conversion which is not the first message, then it must be issued by the creditor party
participantCurrencyValidationList.push({
Expand All @@ -99,7 +104,7 @@ const checkIfDeterminingTransferExistsForFxTransferMessage = async (payload) =>
}
}

const getParticipantAndCurrencyForTransferMessage = async (payload, determiningTransferCheckResult) => {
const getParticipantAndCurrencyForTransferMessage = async (payload, determiningTransferCheckResult, proxyObligation) => {
const histTimerGetParticipantAndCurrencyForTransferMessage = Metrics.getHistogram(
'fx_domain_cyril_getParticipantAndCurrencyForTransferMessage',
'fx_domain_cyril_getParticipantAndCurrencyForTransferMessage - Metrics for fx cyril',
Expand All @@ -113,7 +118,16 @@ const getParticipantAndCurrencyForTransferMessage = async (payload, determiningT
// Get the FX request corresponding to this transaction ID
// TODO: Can't we just use the following query in the first place above to check if the determining transfer exists instead of using the watch list?
// const fxTransferRecord = await fxTransfer.getByDeterminingTransferId(payload.transferId)
const fxTransferRecord = await fxTransfer.getAllDetailsByCommitRequestId(determiningTransferCheckResult.watchListRecords[0].commitRequestId)
let fxTransferRecord
if (proxyObligation.isCounterPartyFspProxy) {
// If a proxy is representing a FXP in a jurisdictional scenario,
// they would not hold a position account for the `targetAmount` currency
// for a /fxTransfer. So we skip adding this to accounts to be validated.
fxTransferRecord = await fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(determiningTransferCheckResult.watchListRecords[0].commitRequestId)
} else {
fxTransferRecord = await fxTransfer.getAllDetailsByCommitRequestId(determiningTransferCheckResult.watchListRecords[0].commitRequestId)
}

// Liquidity check and reserve funds against FXP in FX target currency
participantName = fxTransferRecord.counterPartyFspName
currencyId = fxTransferRecord.targetCurrency
Expand Down
4 changes: 2 additions & 2 deletions src/domain/transfer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ const TransferErrorDuplicateCheckModel = require('../../models/transfer/transfer
const TransferError = require('../../models/transfer/transferError')
const TransferObjectTransform = require('./transform')

const prepare = async (payload, stateReason = null, hasPassedValidation = true, determiningTransferCheckResult) => {
const prepare = async (payload, stateReason = null, hasPassedValidation = true, determiningTransferCheckResult, proxyObligation) => {
const histTimerTransferServicePrepareEnd = Metrics.getHistogram(
'domain_transfer',
'prepare - Metrics for transfer domain',
['success', 'funcName']
).startTimer()
try {
const result = await TransferFacade.saveTransferPrepared(payload, stateReason, hasPassedValidation, determiningTransferCheckResult)
const result = await TransferFacade.saveTransferPrepared(payload, stateReason, hasPassedValidation, determiningTransferCheckResult, proxyObligation)
histTimerTransferServicePrepareEnd({ success: true, funcName: 'prepare' })
return result
} catch (err) {
Expand Down
32 changes: 25 additions & 7 deletions src/handlers/transfers/createRemittanceEntity.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,29 @@ const createRemittanceEntity = (isFx) => {
: TransferService.saveTransferDuplicateCheck(id, hash)
},

async savePreparedRequest (payload, reason, isValid, determiningTransferCheckResult) {
async savePreparedRequest (
payload,
reason,
isValid,
determiningTransferCheckResult,
proxyObligation
) {
// todo: add histoTimer and try/catch here
return isFx
? fxTransferModel.fxTransfer.savePreparedRequest(payload, reason, isValid, determiningTransferCheckResult)
: TransferService.prepare(payload, reason, isValid, determiningTransferCheckResult)
? fxTransferModel.fxTransfer.savePreparedRequest(
payload,
reason,
isValid,
determiningTransferCheckResult,
proxyObligation
)
: TransferService.prepare(
payload,
reason,
isValid,
determiningTransferCheckResult,
proxyObligation
)
},

async getByIdLight (id) {
Expand All @@ -31,16 +49,16 @@ const createRemittanceEntity = (isFx) => {
: TransferService.getByIdLight(id)
},

async checkIfDeterminingTransferExists (payload) {
async checkIfDeterminingTransferExists (payload, proxyObligation) {
return isFx
? cyril.checkIfDeterminingTransferExistsForFxTransferMessage(payload)
? cyril.checkIfDeterminingTransferExistsForFxTransferMessage(payload, proxyObligation)
: cyril.checkIfDeterminingTransferExistsForTransferMessage(payload)
},

async getPositionParticipant (payload, determiningTransferCheckResult) {
async getPositionParticipant (payload, determiningTransferCheckResult, proxyObligation) {
return isFx
? cyril.getParticipantAndCurrencyForFxTransferMessage(payload, determiningTransferCheckResult)
: cyril.getParticipantAndCurrencyForTransferMessage(payload, determiningTransferCheckResult)
: cyril.getParticipantAndCurrencyForTransferMessage(payload, determiningTransferCheckResult, proxyObligation)
},

async logTransferError (id, errorCode, errorDescription) {
Expand Down
172 changes: 154 additions & 18 deletions src/handlers/transfers/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const createRemittanceEntity = require('./createRemittanceEntity')
const Validator = require('./validator')
const dto = require('./dto')
const TransferService = require('#src/domain/transfer/index')
const ProxyCache = require('#src/lib/proxyCache')

const { Kafka, Comparators } = Util
const { TransferState } = Enum.Transfers
Expand All @@ -47,6 +48,7 @@ const { fspId } = Config.INSTRUMENTATION_METRICS_LABELS

const consumerCommit = true
const fromSwitch = true
const proxyEnabled = Config.PROXY_CACHE_CONFIG.enabled

const checkDuplication = async ({ payload, isFx, ID, location }) => {
const funcName = 'prepare_duplicateCheckComparator'
Expand Down Expand Up @@ -116,13 +118,29 @@ const processDuplication = async ({
return true
}

const savePreparedRequest = async ({ validationPassed, reasons, payload, isFx, functionality, params, location, determiningTransferCheckResult }) => {
const savePreparedRequest = async ({
validationPassed,
reasons,
payload,
isFx,
functionality,
params,
location,
determiningTransferCheckResult,
proxyObligation
}) => {
const logMessage = Util.breadcrumb(location, 'savePreparedRequest')
try {
logger.info(logMessage, { validationPassed, reasons })
const reason = validationPassed ? null : reasons.toString()
await createRemittanceEntity(isFx)
.savePreparedRequest(payload, reason, validationPassed, determiningTransferCheckResult)
.savePreparedRequest(
payload,
reason,
validationPassed,
determiningTransferCheckResult,
proxyObligation
)
} catch (err) {
logger.error(`${logMessage} error - ${err.message}`)
const fspiopError = reformatFSPIOPError(err, FSPIOPErrorCodes.INTERNAL_SERVER_ERROR)
Expand All @@ -137,27 +155,64 @@ const savePreparedRequest = async ({ validationPassed, reasons, payload, isFx, f
}
}

const definePositionParticipant = async ({ isFx, payload, determiningTransferCheckResult }) => {
const definePositionParticipant = async ({ isFx, payload, determiningTransferCheckResult, proxyObligation }) => {
console.log(determiningTransferCheckResult)
const cyrilResult = await createRemittanceEntity(isFx)
.getPositionParticipant(payload, determiningTransferCheckResult)
const account = await Participant.getAccountByNameAndCurrency(
cyrilResult.participantName,
cyrilResult.currencyId,
Enum.Accounts.LedgerAccountType.POSITION
)
.getPositionParticipant(payload, determiningTransferCheckResult, proxyObligation)
console.log(cyrilResult)
let messageKey
// On a proxied transfer prepare if there is a corresponding fx transfer `getPositionParticipant`
// should return the fxp's proxy as the participantName since the fxp proxy would be saved as the counterPartyFsp
// in the prior fx transfer prepare.
// Following interscheme rules, if the debtor(fxTransfer FXP) and the creditor(transfer payee) are
// represented by the same proxy, no position adjustment is needed.
let isSameProxy = false
// Only check transfers that have a related fxTransfer
if (determiningTransferCheckResult?.watchListRecords?.length > 0) {
const counterPartyParticipantFXPProxy = cyrilResult.participantName
console.log(counterPartyParticipantFXPProxy)
console.log(proxyObligation?.counterPartyFspProxyOrParticipantId?.proxyId)
isSameProxy = counterPartyParticipantFXPProxy && proxyObligation?.counterPartyFspProxyOrParticipantId?.proxyId
? counterPartyParticipantFXPProxy === proxyObligation.counterPartyFspProxyOrParticipantId.proxyId
: false
}
if (isSameProxy) {
messageKey = '0'
} else {
const participantName = cyrilResult.participantName
const account = await Participant.getAccountByNameAndCurrency(
participantName,
cyrilResult.currencyId,
Enum.Accounts.LedgerAccountType.POSITION
)
messageKey = account.participantCurrencyId.toString()
}

return {
messageKey: account.participantCurrencyId.toString(),
messageKey,
cyrilResult
}
}

const sendPositionPrepareMessage = async ({ isFx, payload, action, params, determiningTransferCheckResult }) => {
const sendPositionPrepareMessage = async ({
isFx,
payload,
action,
params,
determiningTransferCheckResult,
proxyObligation
}) => {
const eventDetail = {
functionality: Type.POSITION,
action
}
const { messageKey, cyrilResult } = await definePositionParticipant({ payload, isFx, determiningTransferCheckResult })

const { messageKey, cyrilResult } = await definePositionParticipant({
payload: proxyObligation.payloadClone,
isFx,
determiningTransferCheckResult,
proxyObligation
})

params.message.value.content.context = {
...params.message.value.content.context,
Expand Down Expand Up @@ -240,7 +295,7 @@ const prepare = async (error, messages) => {
producer: Producer
}

if (isForwarded) {
if (proxyEnabled && isForwarded) {
const transfer = await TransferService.getById(ID)
if (!transfer) {
const eventDetail = {
Expand Down Expand Up @@ -294,6 +349,66 @@ const prepare = async (error, messages) => {
return true
}

let initiatingFspProxyOrParticipantId
let counterPartyFspProxyOrParticipantId
const proxyObligation = {
isInitiatingFspProxy: false,
isCounterPartyFspProxy: false,
initiatingFspProxyOrParticipantId: null,
counterPartyFspProxyOrParticipantId: null,
isFx,
payloadClone: { ...payload }
}
if (proxyEnabled) {
const [initiatingFsp, counterPartyFsp] = isFx ? [payload.initiatingFsp, payload.counterPartyFsp] : [payload.payerFsp, payload.payeeFsp]
;[proxyObligation.initiatingFspProxyOrParticipantId, proxyObligation.counterPartyFspProxyOrParticipantId] = await Promise.all([
ProxyCache.getFSPProxy(initiatingFsp),
ProxyCache.getFSPProxy(counterPartyFsp)
])

proxyObligation.isInitiatingFspProxy = !proxyObligation.initiatingFspProxyOrParticipantId.inScheme &&
proxyObligation.initiatingFspProxyOrParticipantId.proxyId !== null
proxyObligation.isCounterPartyFspProxy = !proxyObligation.counterPartyFspProxyOrParticipantId.inScheme &&
proxyObligation.counterPartyFspProxyOrParticipantId.proxyId !== null

if (isFx) {
proxyObligation.payloadClone.initiatingFsp = !proxyObligation.initiatingFspProxyOrParticipantId?.inScheme &&
proxyObligation.initiatingFspProxyOrParticipantId?.proxyId
? proxyObligation.initiatingFspProxyOrParticipantId.proxyId
: payload.initiatingFsp
proxyObligation.payloadClone.counterPartyFsp = !proxyObligation.counterPartyFspProxyOrParticipantId?.inScheme &&
proxyObligation.counterPartyFspProxyOrParticipantId?.proxyId
? proxyObligation.counterPartyFspProxyOrParticipantId.proxyId
: payload.counterPartyFsp
} else {
proxyObligation.payloadClone.payerFsp = !proxyObligation.initiatingFspProxyOrParticipantId?.inScheme &&
proxyObligation.initiatingFspProxyOrParticipantId?.proxyId
? proxyObligation.initiatingFspProxyOrParticipantId.proxyId
: payload.payerFsp
proxyObligation.payloadClone.payeeFsp = !proxyObligation.counterPartyFspProxyOrParticipantId?.inScheme &&
proxyObligation.counterPartyFspProxyOrParticipantId?.proxyId
? proxyObligation.counterPartyFspProxyOrParticipantId.proxyId
: payload.payeeFsp
}

// If either debtor participant or creditor participant aren't in the scheme and have no proxy representative, then throw an error.
if ((proxyObligation.initiatingFspProxyOrParticipantId.inScheme === false && proxyObligation.initiatingFspProxyOrParticipantId.proxyId === null) ||
(proxyObligation.counterPartyFspProxyOrParticipantId.inScheme === false && proxyObligation.counterPartyFspProxyOrParticipantId.proxyId === null)) {
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.ID_NOT_FOUND,
`Payer proxy or payee proxy not found: initiatingFsp: ${initiatingFspProxyOrParticipantId} counterPartyFsp: ${counterPartyFspProxyOrParticipantId}`
).toApiErrorObject(Config.ERROR_HANDLING)
await Kafka.proceed(Config.KAFKA_CONFIG, params, {
consumerCommit,
fspiopError,
eventDetail: { functionality, action },
fromSwitch,
hubName: Config.HUB_NAME
})
throw fspiopError
}
}

const duplication = await checkDuplication({ payload, isFx, ID, location })
if (duplication.hasDuplicateId) {
const success = await processDuplication({
Expand All @@ -303,11 +418,29 @@ const prepare = async (error, messages) => {
return success
}

const determiningTransferCheckResult = await createRemittanceEntity(isFx).checkIfDeterminingTransferExists(payload)
const determiningTransferCheckResult = await createRemittanceEntity(isFx).checkIfDeterminingTransferExists(
proxyObligation.payloadClone,
proxyObligation
)

const { validationPassed, reasons } = await Validator.validatePrepare(
payload,
headers,
isFx,
determiningTransferCheckResult,
proxyObligation
)

const { validationPassed, reasons } = await Validator.validatePrepare(payload, headers, isFx, determiningTransferCheckResult)
await savePreparedRequest({
validationPassed, reasons, payload, isFx, functionality, params, location, determiningTransferCheckResult
validationPassed,
reasons,
payload,
isFx,
functionality,
params,
location,
determiningTransferCheckResult,
proxyObligation
})
if (!validationPassed) {
logger.error(Util.breadcrumb(location, { path: 'validationFailed' }))
Expand All @@ -317,7 +450,7 @@ const prepare = async (error, messages) => {
/**
* TODO: BULK-Handle at BulkProcessingHandler (not in scope of #967)
* HOWTO: For regular transfers this branch may be triggered by sending
* a tansfer in a currency not supported by either dfsp. Not sure if it
* a transfer in a currency not supported by either dfsp. Not sure if it
* will be triggered for bulk, because of the BulkPrepareHandler.
*/
await Kafka.proceed(Config.KAFKA_CONFIG, params, {
Expand All @@ -331,14 +464,17 @@ const prepare = async (error, messages) => {
}

logger.info(Util.breadcrumb(location, `positionTopic1--${actionLetter}7`))
const success = await sendPositionPrepareMessage({ isFx, payload, action, params, determiningTransferCheckResult })
const success = await sendPositionPrepareMessage({
isFx, payload, action, params, determiningTransferCheckResult, proxyObligation
})

histTimerEnd({ success, fspId })
return success
} catch (err) {
histTimerEnd({ success: false, fspId })
const fspiopError = reformatFSPIOPError(err)
logger.error(`${Util.breadcrumb(location)}::${err.message}--P0`)
logger.error(err.stack)
const state = new EventSdk.EventStateMetadata(EventSdk.EventStatusType.failed, fspiopError.apiErrorCode.code, fspiopError.apiErrorCode.message)
await span.error(fspiopError, state)
await span.finish(fspiopError.message, state)
Expand Down
Loading