Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#272 Implement Fulfil Handler Consume (Success) #72

Merged
merged 14 commits into from
Jun 4, 2018
1 change: 1 addition & 0 deletions src/admin/auth/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ exports.plugin = {
server.auth.strategy('jwt-strategy', 'hapi-now-auth', {
verifyJWT: true,
keychain: [Config.ADMIN_SECRET],
// keychain: ['secret'],
validate: TokenStrategy.validate
})
}
Expand Down
32 changes: 0 additions & 32 deletions src/domain/transfer/commands/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
'use strict'

const Projection = require('../../../domain/transfer/projection')
const FeeProjection = require('../../../domain/fee/index')
const Query = require('../../../domain/transfer/queries')
// const State = require('../state')
const CryptoConditions = require('../../../crypto-conditions')
const Errors = require('../../../errors')

const prepare = async (transfer, stateReason = null, hasPassedValidation = true) => {
try {
Expand All @@ -15,32 +10,6 @@ const prepare = async (transfer, stateReason = null, hasPassedValidation = true)
}
}

const fulfil = async (fulfilment) => {
const record = {
payload: fulfilment,
timestamp: new Date()
}
const transfer = await Query.getById(fulfilment.id)
if (!transfer.executionCondition) {
throw new Errors.TransferNotConditionalError()
}
// if ((transfer.state === State.EXECUTED || transfer.state === State.SETTLED) && fulfilment === fulfilment.fulfilment) {
// return transfer
// }
if (new Date() > new Date(transfer.expirationDate)) {
throw new Errors.ExpiredTransferError()
}
// if (transfer.state !== State.PREPARED) {
// throw new Errors.InvalidModificationError(`Transfers in state ${transfer.state} may not be executed`)
// }
CryptoConditions.validateFulfillment(fulfilment.fulfilment, transfer.executionCondition)
await Projection.saveTransferExecuted({payload: record.payload, timestamp: record.timestamp})
await Projection.saveExecutedTransfer(record)
const fulfilledTransfer = await Query.getById(fulfilment.id)
await FeeProjection.generateFeeForTransfer(fulfilledTransfer)
return fulfilledTransfer
}

const reject = async (stateReason, transferId) => {
try {
return await Projection.saveTransferRejected(stateReason, transferId)
Expand All @@ -54,7 +23,6 @@ const settle = ({id, settlement_id}) => {
}

module.exports = {
fulfil,
prepare,
reject,
settle
Expand Down
57 changes: 0 additions & 57 deletions src/domain/transfer/ilp.js

This file was deleted.

2 changes: 1 addition & 1 deletion src/domain/transfer/models/transferStateChanges.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const getByTransferId = async (id) => {
// let transferStateChanges = await Db.transferStateChange.find({ transferId: id }, { order: 'changedDate desc' })
// return transferStateChanges[0]
return await Db.transferStateChange.query(async (builder) => {
let result = builder.select('transferStateChange.*').orderBy('changedDate', 'desc').first()
let result = builder.where({'transferStateChange.transferId': id}).select('transferStateChange.*').orderBy('transferStateChangeId', 'desc').first()
if (!result) throw new Error('no such transfer')
return result
})
Expand Down
24 changes: 12 additions & 12 deletions src/domain/transfer/models/transfers-read-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ const getAll = async () => {
let transferResultList = await builder
.innerJoin('participant AS ca', 'transfer.payerParticipantId', 'ca.participantId')
.innerJoin('participant AS da', 'transfer.payeeParticipantId', 'da.participantId')
.innerJoin('transferStateChange AS tsc', 'transfer.transferId', 'tsc.transferId')
.innerJoin('transferState AS ts', 'tsc.transferStateId', 'tsc.transferStateId')
.innerJoin('ilp AS ilp', 'transfer.transferId', 'ilp.transferId')
.leftJoin('transferStateChange AS tsc', 'transfer.transferId', 'tsc.transferId')
.leftJoin('transferState AS ts', 'ts.transferStateId', 'tsc.transferStateId')
.leftJoin('ilp AS ilp', 'transfer.transferId', 'ilp.transferId')
.select(
'transfer.*',
'transfer.currencyId AS currency',
Expand All @@ -39,15 +39,14 @@ const getAll = async () => {
'ts.enumeration AS transferState',
'ilp.packet AS ilpPacket',
'ilp.condition AS condition',
'ilp.fulfilment AS fulfilment'
'ilp.fulfilment AS fulfilment',
'ilp.ilpId AS ilpId'
)
.orderBy('tsc.=transferStateChangeId', 'desc')

transferResultList = transferResultList.map(async transferResult => {
.orderBy('tsc.transferStateChangeId', 'desc')
for (let transferResult of transferResultList) {
transferResult.extensionList = await extensionModel.getByTransferId(transferResult.transferId)
transferResult.isTransferReadModel = true
return transferResult
})
}
return transferResultList
})
} catch (err) {
Expand Down Expand Up @@ -86,8 +85,8 @@ const getById = async (id) => {
return await Db.transfer.query(async (builder) => {
var transferResult = builder
.where({'transfer.transferId': id})
.leftJoin('participant AS ca', 'transfer.payerParticipantId', 'ca.participantId')
.leftJoin('participant AS da', 'transfer.payeeParticipantId', 'da.participantId')
.innerJoin('participant AS ca', 'transfer.payerParticipantId', 'ca.participantId')
.innerJoin('participant AS da', 'transfer.payeeParticipantId', 'da.participantId')
.leftJoin('transferStateChange AS tsc', 'transfer.transferId', 'tsc.transferId')
.leftJoin('ilp AS ilp', 'transfer.transferId', 'ilp.transferId')
.select(
Expand All @@ -99,7 +98,8 @@ const getById = async (id) => {
'tsc.changedDate AS completedTimestamp',
'ilp.packet AS ilpPacket',
'ilp.condition AS condition',
'ilp.fulfilment AS fulfilment'
'ilp.fulfilment AS fulfilment',
'ilp.ilpId AS ilpId'
)
.orderBy('tsc.transferStateChangeId', 'desc')
.first()
Expand Down
1 change: 1 addition & 0 deletions src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const positions = async (error, messages) => {
if (message.value.metadata.event.type === POSITION && message.value.metadata.event.action === PREPARE) {
await Projection.updateTransferState(payload, TransferState.RESERVED)
} else if (message.value.metadata.event.type === POSITION && message.value.metadata.event.action === COMMIT) {
payload.transferId = message.value.id
await Projection.updateTransferState(payload, TransferState.COMMITTED)
} else {
await consumer.commitMessageSync(message)
Expand Down
20 changes: 15 additions & 5 deletions src/handlers/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,24 @@ module.exports = [
description: 'Register notification Kafka consumer handler'
}
},
// this is for testing purposes so that we can produce transfers without the ML-API. will be removed later

// Following are for testing purposes so that we can produce transfers without the ML-API. To be removed later.
{
method: 'POST',
path: '/test/producer/transfer/prepare',
handler: testProducer.transferPrepare,
options: {
id: 'transferPrepareTestProducer',
description: 'Produces transfer prepare message to Kafka'
}
},
{
method: 'POST',
path: '/test/producer',
handler: testProducer.testProducer,
path: '/test/producer/transfer/fulfil',
handler: testProducer.transferFulfil,
options: {
id: 'testing',
description: 'testing'
id: 'transferFulfilTestProducer',
description: 'Produces transfer fulfil message to Kafka'
}
}
]
68 changes: 65 additions & 3 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ const DAO = require('../lib/dao')
const Kafka = require('../lib/kafka')
const Validator = require('./validator')
const TransferQueries = require('../../domain/transfer/queries')
const TransferState = require('../../domain/transfer/state')
// const CryptoConditions = require('../../crypto-conditions')
const FiveBellsCondition = require('five-bells-condition')
const ilp = require('../../models/ilp')

const TRANSFER = 'transfer'
const PREPARE = 'prepare'
const FULFIL = 'fulfil'
const REJECT = 'reject'
const COMMIT = 'commit'

/**
* @method prepare
Expand Down Expand Up @@ -124,12 +129,67 @@ const prepare = async (error, messages) => {
}
}

const fulfil = async () => {

const fulfil = async (error, messages) => {
if (error) {
// Logger.error(error)
throw new Error()
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
Logger.info('FulfilHandler::fulfil')
const consumer = Kafka.Consumer.getConsumer(Utility.transformGeneralTopicName(TRANSFER, FULFIL))
const metadata = message.value.metadata
const transferId = message.value.id
const payload = message.value.content.payload
if (metadata.event.type === FULFIL && metadata.event.action === COMMIT) {
const existingTransfer = await TransferQueries.getById(transferId)
const fulfilmentCondition = FiveBellsCondition.fulfillmentToCondition(payload.fulfilment)
if (!existingTransfer) {
Logger.info('FulfilHandler::fulfil::validationFailed::notFound')
await consumer.commitMessageSync(message)
await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, message.value, Utility.ENUMS.STATE.FAILURE)
return true
// } else if (CryptoConditions.validateFulfillment(payload.fulfilment, existingTransfer.condition)) { // TODO: when implemented
} else if (fulfilmentCondition !== existingTransfer.condition) { // TODO: FiveBellsCondition.fulfillmentToCondition always passes
Logger.info('FulfilHandler::fulfil::validationFailed::invalidFulfilment')
await consumer.commitMessageSync(message)
await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, message.value, Utility.ENUMS.STATE.FAILURE)
return true
} else if (existingTransfer.transferState !== TransferState.RESERVED) {
Logger.info('FulfilHandler::fulfil::validationFailed::existingEntry')
await consumer.commitMessageSync(message)
await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, message.value, Utility.ENUMS.STATE.FAILURE)
return true
} else { // validations success
Logger.info('FulfilHandler::fulfil::validationPassed')
await ilp.update({ilpId: existingTransfer.ilpId, fulfilment: payload.fulfilment})
await consumer.commitMessageSync(message)
await Utility.produceParticipantMessage(existingTransfer.payeeFsp, Utility.ENUMS.POSITION, PREPARE, message.value, Utility.ENUMS.STATE.SUCCESS)
return true
}
} else if (metadata.event.type === FULFIL && metadata.event.action === REJECT) {
throw new Error('Not implemented')
// TODO: fulfil reject flow {2.2.1.} to be implemented here
} else {
Logger.info('FulfilHandler::fulfil::invalidEventAction')
await consumer.commitMessageSync(message)
await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, message.value, Utility.ENUMS.STATE.FAILURE)
return true
}
} catch (error) {
Logger.error(error)
throw error
}
}

const reject = async () => {

// TODO: delete method and use fulfil reject condition (see line 177)
throw new Error('Not implemented')
}
/**
* @method transfer
Expand Down Expand Up @@ -314,5 +374,7 @@ module.exports = {
registerRejectHandler,
registerAllHandlers,
prepare,
fulfil,
reject,
transfer
}
2 changes: 2 additions & 0 deletions testPI2/integration/domain/transfer/ilp.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
--------------
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this file not be deleted if the ilp file is removed/not used anymore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valentin is figuring this out as part of another story and will clean it in subsequent merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sure

******/

/*
'use strict'

const Test = require('tape')
Expand Down Expand Up @@ -220,3 +221,4 @@ Test('Ilp service tests', async (ilpTest) => {
}
})
})
*/
Loading