Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#1334 patch request notif #751

Merged
merged 7 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ EVENT_SDK_VENDOR_PREFIX="mojaloop"
EVENT_SDK_TRACESTATE_HEADER_ENABLED="false"
EVENT_SDK_ASYNC_OVERRIDE_EVENTS="log, trace"
EVENT_SDK_TRACEID_PER_VENDOR="false"
CLEDG_HANDLERS_TIMEOUT_DISABLED="true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to be here? that config is in default.json

8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "10.5.1",
"version": "10.5.2",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -86,7 +86,7 @@
"@mojaloop/central-services-health": "10.4.0",
"@mojaloop/central-services-logger": "10.4.0",
"@mojaloop/central-services-metrics": "9.5.0",
"@mojaloop/central-services-shared": "10.5.1",
"@mojaloop/central-services-shared": "10.5.2",
"@mojaloop/central-services-stream": "9.5.0",
"@mojaloop/event-sdk": "10.4.0",
"@mojaloop/forensic-logging-client": "8.3.0",
Expand Down
26 changes: 16 additions & 10 deletions src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
const Logger = require('@mojaloop/central-services-logger')
const EventSdk = require('@mojaloop/event-sdk')
const TransferService = require('../../domain/transfer')
const TransferObjectTransform = require('../../domain/transfer/transform')
const PositionService = require('../../domain/position')
const Utility = require('@mojaloop/central-services-shared').Util
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
Expand Down Expand Up @@ -115,15 +116,16 @@ const positions = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, { method: 'positions' }))

const actionLetter = action === Enum.Events.Event.Action.PREPARE ? Enum.Events.ActionLetter.prepare
: (action === Enum.Events.Event.Action.COMMIT ? Enum.Events.ActionLetter.commit
: (action === Enum.Events.Event.Action.REJECT ? Enum.Events.ActionLetter.reject
: (action === Enum.Events.Event.Action.ABORT ? Enum.Events.ActionLetter.abort
: (action === Enum.Events.Event.Action.TIMEOUT_RESERVED ? Enum.Events.ActionLetter.timeout
: (action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare
: (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved
: (action === Enum.Events.Event.Action.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort
: Enum.Events.ActionLetter.unknown))))))))
: (action === Enum.Events.Event.Action.RESERVE ? Enum.Events.ActionLetter.reserve
: (action === Enum.Events.Event.Action.COMMIT ? Enum.Events.ActionLetter.commit
: (action === Enum.Events.Event.Action.REJECT ? Enum.Events.ActionLetter.reject
: (action === Enum.Events.Event.Action.ABORT ? Enum.Events.ActionLetter.abort
: (action === Enum.Events.Event.Action.TIMEOUT_RESERVED ? Enum.Events.ActionLetter.timeout
: (action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare
: (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved
: (action === Enum.Events.Event.Action.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort
: Enum.Events.ActionLetter.unknown)))))))))
const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer }
const eventDetail = { action }
if (![Enum.Events.Event.Action.BULK_PREPARE, Enum.Events.Event.Action.BULK_COMMIT, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED, Enum.Events.Event.Action.BULK_ABORT].includes(action)) {
Expand Down Expand Up @@ -156,7 +158,7 @@ const positions = async (error, messages) => {
throw fspiopError
}
}
} else if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.BULK_COMMIT].includes(action)) {
} else if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.COMMIT, Enum.Events.Event.Action.RESERVE, Enum.Events.Event.Action.BULK_COMMIT].includes(action)) {
Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, { path: 'commit' }))
const transferInfo = await TransferService.getTransferInfoToChangePosition(transferId, Enum.Accounts.TransferParticipantRoleType.PAYEE_DFSP, Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) {
Expand All @@ -172,6 +174,10 @@ const positions = async (error, messages) => {
transferStateId: Enum.Transfers.TransferState.COMMITTED
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
if (action === Enum.Events.Event.Action.RESERVE) {
const transfer = await TransferService.getById(transferInfo.transferId)
message.value.content.payload = TransferObjectTransform.toFulfil(transfer)
}
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId, action })
return true
Expand Down
50 changes: 25 additions & 25 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,16 @@ const fulfil = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, { method: `fulfil:${action}` }))

const actionLetter = action === TransferEventAction.COMMIT ? Enum.Events.ActionLetter.commit
: (action === TransferEventAction.REJECT ? Enum.Events.ActionLetter.reject
: (action === TransferEventAction.ABORT ? Enum.Events.ActionLetter.abort
: (action === TransferEventAction.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === TransferEventAction.BULK_ABORT ? Enum.Events.ActionLetter.bulkAbort
: (action === TransferEventAction.RESERVE ? Enum.Events.ActionLetter.reserve // TODO ADD ACTION LETTER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still applicable?

: (action === TransferEventAction.REJECT ? Enum.Events.ActionLetter.reject
: (action === TransferEventAction.ABORT ? Enum.Events.ActionLetter.abort
: (action === TransferEventAction.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
rmothilal marked this conversation as resolved.
Show resolved Hide resolved
: Enum.Events.ActionLetter.unknown))))
const functionality = action === TransferEventAction.COMMIT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.REJECT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.ABORT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.BULK_COMMIT ? TransferEventType.BULK_PROCESSING
: (action === TransferEventAction.BULK_ABORT ? TransferEventType.BULK_PROCESSING
: (action === TransferEventAction.RESERVE ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.REJECT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.ABORT ? TransferEventType.NOTIFICATION
: (action === TransferEventAction.BULK_COMMIT ? TransferEventType.BULK_PROCESSING
rmothilal marked this conversation as resolved.
Show resolved Hide resolved
: Enum.Events.ActionLetter.unknown))))
// fulfil-specific declarations
const isTransferError = action === TransferEventAction.ABORT
Expand Down Expand Up @@ -344,22 +344,22 @@ const fulfil = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, 'handleResend'))
if (transferStateEnum === TransferState.COMMITTED || transferStateEnum === TransferState.ABORTED) {
message.value.content.payload = TransferObjectTransform.toFulfil(transfer)
if (!isTransferError) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized2--${actionLetter}3`))
const eventDetail = { functionality, action: TransferEventAction.FULFIL_DUPLICATE }
/**
* HOWTO: During bulk fulfil use an individualTransfer from a previous bulk fulfil
*/
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized3--${actionLetter}4`))
const eventDetail = { functionality, action: TransferEventAction.ABORT_DUPLICATE }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
const eventDetail = { functionality, action }
if (action !== TransferEventAction.RESERVE) {
if (!isTransferError) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized2--${actionLetter}3`))
eventDetail.action = TransferEventAction.FULFIL_DUPLICATE
/**
* HOWTO: During bulk fulfil use an individualTransfer from a previous bulk fulfil
*/
} else {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackFinilized3--${actionLetter}4`))
eventDetail.action = TransferEventAction.ABORT_DUPLICATE
}
}
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else if (transferStateEnum === TransferState.RECEIVED || transferStateEnum === TransferState.RESERVED) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `inProgress2--${actionLetter}5`))
/**
Expand Down Expand Up @@ -401,7 +401,7 @@ const fulfil = async (error, messages) => {
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
} else { // !hasDuplicateId
if (type === TransferEventType.FULFIL && [TransferEventAction.COMMIT, TransferEventAction.REJECT, TransferEventAction.ABORT, TransferEventAction.BULK_COMMIT, TransferEventAction.BULK_ABORT].includes(action)) {
if (type === TransferEventType.FULFIL && [TransferEventAction.COMMIT, TransferEventAction.RESERVE, TransferEventAction.REJECT, TransferEventAction.ABORT, TransferEventAction.BULK_COMMIT].includes(action)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You removed BULK_ABORT here

Util.breadcrumb(location, { path: 'validationCheck' })
if (payload.fulfilment && !Validator.validateFulfilCondition(payload.fulfilment, transfer.condition)) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorInvalidFulfilment--${actionLetter}9`))
Expand Down Expand Up @@ -434,7 +434,7 @@ const fulfil = async (error, messages) => {
throw fspiopError
} else { // validations success
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, { path: 'validationPassed' }))
if ([TransferEventAction.COMMIT, TransferEventAction.BULK_COMMIT].includes(action)) {
if ([TransferEventAction.COMMIT, TransferEventAction.RESERVE, TransferEventAction.BULK_COMMIT].includes(action)) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `positionTopic2--${actionLetter}12`))
await TransferService.handlePayeeResponse(transferId, payload, action)
const eventDetail = { functionality: TransferEventType.POSITION, action }
Expand Down
1 change: 1 addition & 0 deletions src/models/transfer/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ const savePayeeTransferResponse = async (transferId, payload, action, fspiopErro
switch (action) {
case TransferEventAction.COMMIT:
case TransferEventAction.BULK_COMMIT:
case TransferEventAction.RESERVE:
state = TransferInternalState.RECEIVED_FULFIL
extensionList = payload.extensionList
isFulfilment = true
Expand Down
24 changes: 24 additions & 0 deletions test/unit/handlers/transfers/handler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,30 @@ Test('Transfer handler', transferHandlerTest => {
test.end()
})

fulfilTest.test('produce message to position topic when validations pass and action is RESERVE', async (test) => {
const localfulfilMessages = MainUtil.clone(fulfilMessages)
localfulfilMessages[0].value.metadata.event.action = 'reserve'
await Consumer.createHandler(topicName, config, command)
Kafka.transformGeneralTopicName.returns(topicName)
TransferService.getById.returns(Promise.resolve({ condition: 'condition', payeeFsp: 'dfsp2', transferState: TransferState.RESERVED }))
ilp.update.returns(Promise.resolve())
Validator.validateFulfilCondition.returns(true)
localfulfilMessages[0].value.content.headers['fspiop-source'] = 'dfsp2'
localfulfilMessages[0].value.content.payload.fulfilment = 'condition'
Kafka.proceed.returns(true)

TransferService.getTransferDuplicateCheck.returns(Promise.resolve(null))
TransferService.saveTransferDuplicateCheck.returns(Promise.resolve(null))
Comparators.duplicateCheckComparator.withArgs(transfer.transferId, localfulfilMessages[0].value.content.payload).returns(Promise.resolve({
hasDuplicateId: false,
hasDuplicateHash: false
}))

const result = await allTransferHandlers.fulfil(null, localfulfilMessages)
test.equal(result, true)
test.end()
})

fulfilTest.test('produce message to position topic when BULK_COMMIT validations pass', async (test) => {
const localfulfilMessages = MainUtil.clone(fulfilMessages)
await Consumer.createHandler(topicName, config, command)
Expand Down