diff --git a/Dockerfile b/Dockerfile index 50ece0c43..58e2332bf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,7 +16,7 @@ FROM node:${NODE_VERSION} as builder WORKDIR /opt/app RUN apk --no-cache add git -RUN apk add --no-cache -t build-dependencies make gcc g++ python3 libtool openssl-dev autoconf automake bash \ +RUN apk add --no-cache -t build-dependencies make gcc g++ python3 py3-setuptools libtool openssl-dev autoconf automake bash \ && cd $(npm root -g)/npm \ && npm install -g node-gyp diff --git a/docker-compose.yml b/docker-compose.yml index 9d64f4f10..62ed0ffeb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -94,6 +94,21 @@ services: retries: 10 start_period: 40s interval: 30s + + redis: + image: redis:6.2.4-alpine + restart: "unless-stopped" + environment: + - ALLOW_EMPTY_PASSWORD=yes + - REDIS_PORT=6379 + - REDIS_REPLICATION_MODE=master + - REDIS_TLS_ENABLED=no + healthcheck: + test: ["CMD", "redis-cli", "ping"] + ports: + - "6379:6379" + networks: + - cl-mojaloop-net mockserver: image: jamesdbloom/mockserver @@ -219,16 +234,3 @@ services: - cl-mojaloop-net environment: - KAFKA_BROKERS=kafka:29092 - - redis: - image: redis:6.2.4-alpine - restart: "unless-stopped" - environment: - - ALLOW_EMPTY_PASSWORD=yes - - REDIS_PORT=6379 - - REDIS_REPLICATION_MODE=master - - REDIS_TLS_ENABLED=no - ports: - - "6379:6379" - networks: - - cl-mojaloop-net diff --git a/docker/central-ledger/default.json b/docker/central-ledger/default.json index a62fbb223..7fff2e5f4 100644 --- a/docker/central-ledger/default.json +++ b/docker/central-ledger/default.json @@ -86,7 +86,7 @@ "enabled": true, "type": "redis", "proxyConfig": { - "host": "localhost", + "host": "redis", "port": 6379 } }, diff --git a/src/handlers/positions/handlerBatch.js b/src/handlers/positions/handlerBatch.js index 9186efd8f..f45801129 100644 --- a/src/handlers/positions/handlerBatch.js +++ b/src/handlers/positions/handlerBatch.js @@ -104,9 +104,20 @@ const positions = async (error, messages) => { binId }) + const accountID = message.key.toString() + + /** + * Interscheme accounting rule: + * - If the creditor and debtor are represented by the same proxy, the message key will be 0. + * In such cases, we skip position changes. + */ + if (accountID === '0') { + histTimerEnd({ success: true }) + return span.finish() + } + // Assign message to account-bin by accountID and child action-bin by action // (References to the messages to be stored in bins, no duplication of messages) - const accountID = message.key.toString() const action = message.value.metadata.event.action const accountBin = bins[accountID] || (bins[accountID] = {}) const actionBin = accountBin[action] || (accountBin[action] = []) @@ -129,54 +140,56 @@ const positions = async (error, messages) => { return span.audit(message, EventSdk.AuditEventAction.start) })) - // Start DB Transaction - const trx = await BatchPositionModel.startDbTransaction() + // Start DB Transaction if there are any bins to process + const trx = !!Object.keys(bins).length && await BatchPositionModel.startDbTransaction() try { - // Call Bin Processor with the list of account-bins and trx - const result = await BinProcessor.processBins(bins, trx) + if (trx) { + // Call Bin Processor with the list of account-bins and trx + const result = await BinProcessor.processBins(bins, trx) - // If Bin Processor processed bins successfully, commit Kafka offset - // Commit the offset of last message in the array - for (const message of Object.values(lastPerPartition)) { - const params = { message, kafkaTopic: message.topic, consumer: Consumer } - // We are using Kafka.proceed() to just commit the offset of the last message in the array - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME }) - } + // If Bin Processor processed bins successfully, commit Kafka offset + // Commit the offset of last message in the array + for (const message of Object.values(lastPerPartition)) { + const params = { message, kafkaTopic: message.topic, consumer: Consumer } + // We are using Kafka.proceed() to just commit the offset of the last message in the array + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME }) + } - // Commit DB transaction - await trx.commit() + // Commit DB transaction + await trx.commit() - // Loop through results and produce notification messages and audit messages - await Promise.all(result.notifyMessages.map(item => { - // Produce notification message and audit message - const action = item.binItem.message?.value.metadata.event.action - const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE - return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span) - }).concat( - // Loop through followup messages and produce position messages for further processing of the transfer - result.followupMessages.map(item => { - // Produce position message and audit message + // Loop through results and produce notification messages and audit messages + await Promise.all(result.notifyMessages.map(item => { + // Produce notification message and audit message const action = item.binItem.message?.value.metadata.event.action const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE - return Kafka.produceGeneralMessage( - Config.KAFKA_CONFIG, - Producer, - Enum.Events.Event.Type.POSITION, - action, - item.message, - eventStatus, - item.messageKey, - item.binItem.span, - Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT - ) - }) - )) + return Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span) + }).concat( + // Loop through followup messages and produce position messages for further processing of the transfer + result.followupMessages.map(item => { + // Produce position message and audit message + const action = item.binItem.message?.value.metadata.event.action + const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE + return Kafka.produceGeneralMessage( + Config.KAFKA_CONFIG, + Producer, + Enum.Events.Event.Type.POSITION, + action, + item.message, + eventStatus, + item.messageKey, + item.binItem.span, + Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT + ) + }) + )) + } histTimerEnd({ success: true }) } catch (err) { // If Bin Processor returns failure // - Rollback DB transaction - await trx.rollback() + await trx?.rollback() // - Audit Error for each message const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err) diff --git a/src/lib/proxyCache.js b/src/lib/proxyCache.js index 8780a1bff..5776f8ca6 100644 --- a/src/lib/proxyCache.js +++ b/src/lib/proxyCache.js @@ -1,24 +1,38 @@ 'use strict' -const { createProxyCache } = require('@mojaloop/inter-scheme-proxy-cache-lib') -const Config = require('./config.js') +const { createProxyCache, STORAGE_TYPES } = require('@mojaloop/inter-scheme-proxy-cache-lib') const ParticipantService = require('../../src/domain/participant') +const Config = require('./config.js') let proxyCache +const init = async () => { + // enforce lazy connection for redis + const proxyConfig = + Config.PROXY_CACHE_CONFIG.type === STORAGE_TYPES.redis + ? { ...Config.PROXY_CACHE_CONFIG.proxyConfig, lazyConnect: true } + : Config.PROXY_CACHE_CONFIG.proxyConfig + + proxyCache = Object.freeze( + createProxyCache(Config.PROXY_CACHE_CONFIG.type, proxyConfig) + ) +} + const connect = async () => { - return getCache().connect() + return !proxyCache?.isConnected && getCache().connect() } const disconnect = async () => { return proxyCache?.isConnected && proxyCache.disconnect() } +const reset = async () => { + await disconnect() + proxyCache = null +} + const getCache = () => { if (!proxyCache) { - proxyCache = Object.freeze(createProxyCache( - Config.PROXY_CACHE_CONFIG.type, - Config.PROXY_CACHE_CONFIG.proxyConfig - )) + init() } return proxyCache } @@ -40,6 +54,7 @@ const checkSameCreditorDebtorProxy = async (debtorDfspId, creditorDfspId) => { } module.exports = { + reset, // for testing connect, disconnect, getCache, diff --git a/test/integration-override/handlers/positions/handlerBatch.test.js b/test/integration-override/handlers/positions/handlerBatch.test.js index 460921646..d4edc26cf 100644 --- a/test/integration-override/handlers/positions/handlerBatch.test.js +++ b/test/integration-override/handlers/positions/handlerBatch.test.js @@ -31,7 +31,7 @@ const Config = require('#src/lib/config') const ProxyCache = require('#src/lib/proxyCache') const Db = require('@mojaloop/database-lib').Db const Cache = require('#src/lib/cache') -const Producer = require('@mojaloop/central-services-stream').Util.Producer +const { Producer, Consumer } = require('@mojaloop/central-services-stream').Util const Utility = require('@mojaloop/central-services-shared').Util.Kafka const Enum = require('@mojaloop/central-services-shared').Enum const ParticipantHelper = require('#test/integration/helpers/participant') @@ -58,6 +58,7 @@ const SettlementModelCached = require('#src/models/settlement/settlementModelCac const Handlers = { index: require('#src/handlers/register'), positions: require('#src/handlers/positions/handler'), + positionsBatch: require('#src/handlers/positions/handlerBatch'), transfers: require('#src/handlers/transfers/handler'), timeouts: require('#src/handlers/timeouts/handler') } @@ -984,8 +985,14 @@ Test('Handlers test', async handlersTest => { Enum.Kafka.Config.PRODUCER, TransferEventType.TRANSFER.toUpperCase(), TransferEventType.FULFIL.toUpperCase()) + const positionConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.POSITION.toUpperCase()) prepareConfig.logger = Logger fulfilConfig.logger = Logger + positionConfig.logger = Logger await transferPositionPrepare.test('process batch of messages with mixed keys (accountIds) and update transfer state to RESERVED', async (test) => { // Construct test data for 10 transfers. Default object contains 10 transfers. @@ -1687,6 +1694,63 @@ Test('Handlers test', async handlersTest => { test.end() }) + await transferPositionPrepare.test('skip processing of prepare/commit message if messageKey is 0', async (test) => { + await Handlers.positionsBatch.registerPositionHandler() + const topicNameOverride = 'topic-transfer-position-batch' + const message = { + value: { + content: {}, + from: 'payerFsp', + to: 'testFxp', + id: randomUUID(), + metadata: { + event: { + id: randomUUID(), + type: 'position', + action: 'prepare', + createdAt: new Date(), + state: { status: 'success', code: 0 } + }, + type: 'application/json' + } + } + } + const params = { + message, + producer: Producer, + kafkaTopic: topicNameOverride, + consumer: Consumer, + decodedPayload: message.value, + span: null + } + const opts = { + consumerCommit: false, + eventDetail: { functionality: 'position', action: 'prepare' }, + fromSwitch: false, + toDestination: 'payerFsp', + messageKey: '0', + topicNameOverride + } + await Utility.proceed(Config.KAFKA_CONFIG, params, opts) + await new Promise(resolve => setTimeout(resolve, 2000)) + + let notificationPrepareFiltered = [] + try { + const notificationPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'perpare' + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + + notificationPrepareFiltered = notificationPrepare.filter((notification) => notification.to !== 'Hub') + test.notOk('Error should be thrown') + } catch (err) { + test.equal(notificationPrepareFiltered.length, 0, 'Notification Messages not received for transfer with accountId 0') + } + + testConsumer.clearEvents() + test.end() + }) + await transferPositionPrepare.test('timeout should', async timeoutTest => { const td = await prepareTestData(testData) @@ -1837,9 +1901,13 @@ Test('Handlers test', async handlersTest => { await Db.disconnect() assert.pass('database connection closed') await testConsumer.destroy() // this disconnects the consumers - - await Producer.disconnect() await ProxyCache.disconnect() + await Producer.disconnect() + // Disconnect all consumers + await Promise.all(Consumer.getListOfTopics().map(async (topic) => { + Logger.info(`Disconnecting consumer for topic: ${topic}`) + return Consumer.getConsumer(topic).disconnect() + })) if (debug) { const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10 diff --git a/test/unit/handlers/positions/handlerBatch.test.js b/test/unit/handlers/positions/handlerBatch.test.js index ffc344700..28d5e5f4c 100644 --- a/test/unit/handlers/positions/handlerBatch.test.js +++ b/test/unit/handlers/positions/handlerBatch.test.js @@ -54,6 +54,7 @@ const prepareMessageValue = { payload: {} } } + const commitMessageValue = { metadata: { event: { @@ -565,6 +566,37 @@ Test('Position handler', positionBatchHandlerTest => { } }) + positionsTest.test('skip processing if message key is 0', async test => { + // Arrange + await Consumer.createHandler(topicName, config, command) + Kafka.transformGeneralTopicName.returns(topicName) + Kafka.getKafkaConfig.returns(config) + Kafka.proceed.returns(true) + BinProcessor.processBins.resolves({ + notifyMessages: [], + followupMessages: [] + }) + + const message = { + key: '0', + value: prepareMessageValue, + topic: topicName + } + + // Act + try { + await allTransferHandlers.positions(null, [message]) + test.ok(BatchPositionModel.startDbTransaction.notCalled, 'startDbTransaction should not be called') + test.ok(BinProcessor.processBins.notCalled, 'processBins should not be called') + test.ok(Kafka.proceed.notCalled, 'kafkaProceed should not be called') + test.end() + } catch (err) { + Logger.info(err) + test.fail('Error should not be thrown') + test.end() + } + }) + positionsTest.end() }) diff --git a/test/unit/handlers/transfers/prepare.test.js b/test/unit/handlers/transfers/prepare.test.js index deef4d13e..ad316c666 100644 --- a/test/unit/handlers/transfers/prepare.test.js +++ b/test/unit/handlers/transfers/prepare.test.js @@ -54,7 +54,7 @@ const Config = require('../../../../src/lib/config') const fxTransferModel = require('../../../../src/models/fxTransfer') const fxDuplicateCheck = require('../../../../src/models/fxTransfer/duplicateCheck') const fxTransferStateChange = require('../../../../src/models/fxTransfer/stateChange') -const ProxyCache = require('#src/lib/proxyCache') +const ProxyCache = require('../../../../src/lib/proxyCache') const { Action } = Enum.Events.Event diff --git a/test/unit/lib/proxyCache.test.js b/test/unit/lib/proxyCache.test.js index d2f3dfc75..9b5db4eeb 100644 --- a/test/unit/lib/proxyCache.test.js +++ b/test/unit/lib/proxyCache.test.js @@ -2,8 +2,9 @@ const Test = require('tapes')(require('tape')) const Sinon = require('sinon') -const ParticipantService = require('../../../src/domain/participant') const Proxyquire = require('proxyquire') +const ParticipantService = require('../../../src/domain/participant') +const Config = require('../../../src/lib/config') const connectStub = Sinon.stub() const disconnectStub = Sinon.stub() @@ -14,13 +15,14 @@ lookupProxyByDfspIdStub.withArgs('existingDfspId3').resolves('proxyId1') lookupProxyByDfspIdStub.withArgs('nonExistingDfspId1').resolves(null) lookupProxyByDfspIdStub.withArgs('nonExistingDfspId2').resolves(null) +const createProxyCacheStub = Sinon.stub().returns({ + connect: connectStub, + disconnect: disconnectStub, + lookupProxyByDfspId: lookupProxyByDfspIdStub +}) const ProxyCache = Proxyquire('../../../src/lib/proxyCache', { '@mojaloop/inter-scheme-proxy-cache-lib': { - createProxyCache: Sinon.stub().returns({ - connect: connectStub, - disconnect: disconnectStub, - lookupProxyByDfspId: lookupProxyByDfspIdStub - }) + createProxyCache: createProxyCacheStub } }) @@ -29,6 +31,8 @@ Test('Proxy Cache test', async (proxyCacheTest) => { proxyCacheTest.beforeEach(t => { sandbox = Sinon.createSandbox() + sandbox.stub(Config.PROXY_CACHE_CONFIG, 'type') + sandbox.stub(Config.PROXY_CACHE_CONFIG, 'proxyConfig') sandbox.stub(ParticipantService) t.end() }) @@ -39,9 +43,23 @@ Test('Proxy Cache test', async (proxyCacheTest) => { }) await proxyCacheTest.test('connect', async (connectTest) => { - await connectTest.test('connect to cache', async (test) => { + await connectTest.test('connect to cache with lazyConnect', async (test) => { + await ProxyCache.connect() + test.ok(connectStub.calledOnce) + const secondArg = createProxyCacheStub.getCall(0).args[1] + test.ok(secondArg.lazyConnect) + test.end() + }) + + await connectTest.test('connect to cache with default config if not redis storage type', async (test) => { + await ProxyCache.reset() + connectStub.resetHistory() + createProxyCacheStub.resetHistory() + Config.PROXY_CACHE_CONFIG.type = 'mysql' await ProxyCache.connect() - Sinon.assert.calledOnce(connectStub) + test.ok(connectStub.calledOnce) + const secondArg = createProxyCacheStub.getCall(0).args[1] + test.ok(secondArg.lazyConnect === undefined) test.end() })