Skip to content

Commit

Permalink
fix: fx timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
vijayg10 committed May 23, 2024
1 parent 9dee638 commit 7575fca
Show file tree
Hide file tree
Showing 11 changed files with 684 additions and 242 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,15 @@ npm run migrate
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__COMMIT=topic-transfer-position-batch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__RESERVE=topic-transfer-position-batch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__TIMEOUT_RESERVED=topic-transfer-position-batch
npm start
```
- Additionally, run position batch handler in a new terminal
```
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__COMMIT=topic-transfer-position-batch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__RESERVE=topic-transfer-position-batch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__TIMEOUT_RESERVED=topic-transfer-position-batch
export CLEDG_HANDLERS__API__DISABLED=true
node src/handlers/index.js handler --positionbatch
```
Expand Down
7 changes: 4 additions & 3 deletions src/domain/timeout/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

const SegmentModel = require('../../models/misc/segment')
const TransferTimeoutModel = require('../../models/transfer/transferTimeout')
const FxTransferTimeoutModel = require('../../models/fxTransfer/fxTransferTimeout')
const TransferStateChangeModel = require('../../models/transfer/transferStateChange')
const FxTransferStateChangeModel = require('../../models/fxTransfer/stateChange')
const TransferFacade = require('../../models/transfer/facade')
Expand Down Expand Up @@ -60,7 +61,7 @@ const cleanupTransferTimeout = async () => {
}

const cleanupFxTransferTimeout = async () => {
const result = await TransferTimeoutModel.fxCleanup()
const result = await FxTransferTimeoutModel.cleanup()
return result
}

Expand All @@ -74,8 +75,8 @@ const getLatestFxTransferStateChange = async () => {
return result
}

const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
return TransferFacade.timeoutExpireReserved(segmentId, intervalMin, intervalMax)
const timeoutExpireReserved = async (segmentId, intervalMin, intervalMax, fxSegmentId, fxIntervalMin, fxIntervalMax) => {
return TransferFacade.timeoutExpireReserved(segmentId, intervalMin, intervalMax, fxSegmentId, fxIntervalMin, fxIntervalMax)
}

const fxTimeoutExpireReserved = async (segmentId, intervalMin, intervalMax) => {
Expand Down
114 changes: 14 additions & 100 deletions src/handlers/timeouts/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ const EventSdk = require('@mojaloop/event-sdk')
const resourceVersions = require('@mojaloop/central-services-shared').Util.resourceVersions
const Logger = require('@mojaloop/central-services-logger')
let timeoutJob
let fxTimeoutJob
let isRegistered
let isFxRegistered
let running = false
let fxRunning = false

const _processTimedOutTransfers = async (transferTimeoutList) => {
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING)
Expand Down Expand Up @@ -88,7 +85,7 @@ const _processTimedOutTransfers = async (transferTimeoutList) => {
Enum.Events.Event.Action.TIMEOUT_RESERVED,
message,
state,
result[i].payerParticipantCurrencyId?.toString(),
transferTimeoutList[i].effectedParticipantCurrencyId?.toString(),
span,
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.TIMEOUT_RESERVED
)
Expand Down Expand Up @@ -162,9 +159,11 @@ const _processFxTimedOutTransfers = async (fxTransferTimeoutList) => {
Config.KAFKA_CONFIG, Producer,
Enum.Kafka.Topics.POSITION,
Enum.Events.Event.Action.TIMEOUT_RESERVED,
// TODO: Enable the following line when the fx-timeout position is implemented
// Enum.Events.Event.Action.FX_TIMEOUT_RESERVED,
message,
state,
fxTransferTimeoutList[i].initiatingParticipantCurrencyId?.toString(),
fxTransferTimeoutList[i].effectedParticipantCurrencyId?.toString(),
span,
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.TIMEOUT_RESERVED
)
Expand Down Expand Up @@ -205,14 +204,23 @@ const timeout = async () => {
const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0
const cleanup = await TimeoutService.cleanupTransferTimeout()
const latestTransferStateChange = await TimeoutService.getLatestTransferStateChange()
const fxTimeoutSegment = await TimeoutService.getFxTimeoutSegment()
const intervalMax = (latestTransferStateChange && parseInt(latestTransferStateChange.transferStateChangeId)) || 0
const { transferTimeoutList, fxTransferTimeoutList } = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax)
const fxIntervalMin = fxTimeoutSegment ? fxTimeoutSegment.value : 0
const fxSegmentId = fxTimeoutSegment ? fxTimeoutSegment.segmentId : 0
const fxCleanup = await TimeoutService.cleanupFxTransferTimeout()
const latestFxTransferStateChange = await TimeoutService.getLatestFxTransferStateChange()
const fxIntervalMax = (latestFxTransferStateChange && parseInt(latestFxTransferStateChange.fxTransferStateChangeId)) || 0
const { transferTimeoutList, fxTransferTimeoutList } = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax, fxSegmentId, fxIntervalMin, fxIntervalMax)
transferTimeoutList && await _processTimedOutTransfers(transferTimeoutList)
fxTransferTimeoutList && await _processFxTimedOutTransfers(fxTransferTimeoutList)
return {
intervalMin,
cleanup,
intervalMax,
fxIntervalMin,
fxCleanup,
fxIntervalMax,
transferTimeoutList,
fxTransferTimeoutList
}
Expand All @@ -224,45 +232,6 @@ const timeout = async () => {
}
}

/**
* @function FxTransferTimeoutHandler
*
* @async
* @description This is the consumer callback function that gets registered to a cron job.
*
* ... called to validate/insert ...
*
* @param {error} error - error thrown if something fails within Cron
*
* @returns {boolean} - Returns a boolean: true if successful, or throws and error if failed
*/
const fxTimeout = async () => {
if (fxRunning) return
try {
fxRunning = true
const timeoutSegment = await TimeoutService.getFxTimeoutSegment()
const intervalMin = timeoutSegment ? timeoutSegment.value : 0
const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0
const cleanup = await TimeoutService.cleanupTransferTimeout()
const latestFxTransferStateChange = await TimeoutService.getLatestFxTransferStateChange()
const intervalMax = (latestFxTransferStateChange && parseInt(latestFxTransferStateChange.fxTransferStateChangeId)) || 0
const { transferTimeoutList, fxTransferTimeoutList } = await TimeoutService.fxTimeoutExpireReserved(segmentId, intervalMin, intervalMax)
fxTransferTimeoutList && await _processFxTimedOutTransfers(fxTransferTimeoutList)
transferTimeoutList && await _processTimedOutTransfers(transferTimeoutList)
return {
intervalMin,
cleanup,
intervalMax,
transferTimeoutList,
fxTransferTimeoutList
}
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
throw ErrorHandler.Factory.reformatFSPIOPError(err)
} finally {
fxRunning = false
}
}

/**
* @function isRunning
Expand All @@ -275,16 +244,6 @@ const isRunning = async () => {
return isRegistered
}

/**
* @function isFxRunning
*
* @description Function to determine if the fx-timeoutJob is running
*
* @returns {boolean} Returns true if the fx-timeoutJob is running
*/
const isFxRunning = async () => {
return isFxRegistered
}

/**
* @function stop
Expand All @@ -300,19 +259,6 @@ const stop = async () => {
}
}

/**
* @function stopFx
*
* @description Function to stop the fx-timeoutJob if running
*
* @returns {boolean} Returns true when the job is stopped
*/
const stopFx = async () => {
if (isFxRegistered) {
await fxTimeoutJob.stop()
isFxRegistered = undefined
}
}

/**
* @function RegisterTimeoutHandlers
Expand Down Expand Up @@ -343,34 +289,6 @@ const registerTimeoutHandler = async () => {
}
}

/**
* @function RegisterFxTimeoutHandlers
*
* @async
* @description Registers the fx-timeout handler by starting the timeoutJob cron
* @returns {boolean} - Returns a boolean: true if successful, or throws and error if failed
*/
const registerFxTimeoutHandler = async () => {
try {
if (isFxRegistered) {
await stopFx()
}

fxTimeoutJob = CronJob.from({
cronTime: Config.HANDLERS_TIMEOUT_TIMEXP,
onTick: fxTimeout,
start: false,
timeZone: Config.HANDLERS_TIMEOUT_TIMEZONE
})
isFxRegistered = true

await fxTimeoutJob.start()
return true
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

/**
* @function RegisterAllHandlers
Expand All @@ -384,7 +302,6 @@ const registerAllHandlers = async () => {
try {
if (!Config.HANDLERS_TIMEOUT_DISABLED) {
await registerTimeoutHandler()
await registerFxTimeoutHandler()
}
return true
} catch (err) {
Expand All @@ -395,11 +312,8 @@ const registerAllHandlers = async () => {

module.exports = {
timeout,
fxTimeout,
registerAllHandlers,
registerTimeoutHandler,
registerFxTimeoutHandler,
isRunning,
isFxRunning,
stop
}
53 changes: 53 additions & 0 deletions src/models/fxTransfer/fxTransferError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Contributors
--------------
This is the official list of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.
* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>
* Vijaya Kumar Guthi <vijaya.guthi@infitx.com>
--------------
******/

'use strict'

/**
* @module src/models/transfer/transferError/
*/

const Db = require('../../lib/db')
const Logger = require('@mojaloop/central-services-logger')

const getByCommitRequestId = async (id) => {
try {
const fxTransferError = await Db.from('fxTransferError').query(async (builder) => {
const result = builder
.where({ commitRequestId: id })
.select('*')
.first()
return result
})
fxTransferError.errorCode = fxTransferError.errorCode.toString()
return fxTransferError
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
throw err
}
}

module.exports = {
getByCommitRequestId
}
68 changes: 68 additions & 0 deletions src/models/fxTransfer/fxTransferTimeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Contributors
--------------
This is the official list of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.
* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>
* Vijaya Kumar Guthi <vijaya.guthi@infitx.com>
--------------
******/

'use strict'

const Db = require('../../lib/db')
const Logger = require('@mojaloop/central-services-logger')
const Enum = require('@mojaloop/central-services-shared').Enum
const TS = Enum.Transfers.TransferInternalState

const cleanup = async () => {
Logger.isDebugEnabled && Logger.debug('cleanup fxTransferTimeout')
try {
const knex = await Db.getKnex()

const ttIdList = await Db.from('fxTransferTimeout').query(async (builder) => {
const b = await builder
.whereIn('tsc.transferStateId', [`${TS.RECEIVED_FULFIL}`, `${TS.COMMITTED}`, `${TS.FAILED}`, `${TS.RESERVED_TIMEOUT}`,
`${TS.RECEIVED_REJECT}`, `${TS.EXPIRED_PREPARED}`, `${TS.EXPIRED_RESERVED}`, `${TS.ABORTED_REJECTED}`, `${TS.ABORTED_ERROR}`])
.innerJoin(
knex('fxTransferTimeout AS tt1')
.select('tsc1.commitRequestId')
.max('tsc1.fxTransferStateChangeId AS maxFxTransferStateChangeId')
.innerJoin('fxTransferStateChange AS tsc1', 'tsc1.commitRequestId', 'tt1.commitRequestId')
.groupBy('tsc1.commitRequestId').as('ts'), 'ts.commitRequestId', 'fxTransferTimeout.commitRequestId'
)
.innerJoin('fxTransferStateChange AS tsc', 'tsc.fxTransferStateChangeId', 'ts.maxFxTransferStateChangeId')
.select('fxTransferTimeout.fxTransferTimeoutId')
return b
})

await Db.from('fxTransferTimeout').query(async (builder) => {
const b = await builder
.whereIn('fxTransferTimeoutId', ttIdList.map(elem => elem.fxTransferTimeoutId))
.del()
return b
})
return ttIdList
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
throw err
}
}

module.exports = {
cleanup
}
6 changes: 5 additions & 1 deletion src/models/fxTransfer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ const duplicateCheck = require('./duplicateCheck')
const fxTransfer = require('./fxTransfer')
const stateChange = require('./stateChange')
const watchList = require('./watchList')
const fxTransferTimeout = require('./fxTransferTimeout')
const fxTransferError = require('./fxTransferError')

module.exports = {
duplicateCheck,
fxTransfer,
stateChange,
watchList
watchList,
fxTransferTimeout,
fxTransferError
}
Loading

0 comments on commit 7575fca

Please sign in to comment.