diff --git a/providers/upgrade/azureQueueConfig.js b/providers/upgrade/azureQueueConfig.js index bc89f6e9..219ac3f3 100644 --- a/providers/upgrade/azureQueueConfig.js +++ b/providers/upgrade/azureQueueConfig.js @@ -9,7 +9,7 @@ const defaultOptions = { config.get('DEFINITION_UPGRADE_QUEUE_CONNECTION_STRING') || config.get('HARVEST_AZBLOB_CONNECTION_STRING'), queueName: config.get('DEFINITION_UPGRADE_QUEUE_NAME') || 'definitions-upgrade', dequeueOptions: { - numOfMessages: 32, + numOfMessages: config.get('DEFINITION_UPGRADE_DEQUEUE_BATCH_SIZE') || 16, visibilityTimeout: 10 * 60 // 10 min. The default value is 30 seconds. } } diff --git a/providers/upgrade/defUpgradeQueue.js b/providers/upgrade/defUpgradeQueue.js index 533d4de7..e8d103a4 100644 --- a/providers/upgrade/defUpgradeQueue.js +++ b/providers/upgrade/defUpgradeQueue.js @@ -19,12 +19,12 @@ class DefinitionQueueUpgrader extends DefinitionVersionChecker { try { const message = this._constructMessage(definition) await this._upgrade.queue(message) - this.logger.debug('Queued for definition upgrade ', { + this.logger.info('Queued for definition upgrade ', { coordinates: DefinitionVersionChecker.getCoordinates(definition) }) } catch (error) { - //continue if queuing fails and requeue at the next request. - this.logger.error(`Error queuing for definition upgrade ${error.message}`, { + //continue if queueing fails and requeue at the next request. + this.logger.error(`Error queueing for definition upgrade ${error.message}`, { error, coordinates: DefinitionVersionChecker.getCoordinates(definition) }) diff --git a/providers/upgrade/process.js b/providers/upgrade/process.js index e2d717ad..7aa82f96 100644 --- a/providers/upgrade/process.js +++ b/providers/upgrade/process.js @@ -67,13 +67,20 @@ class DefinitionUpgrader { } async _upgradeIfNecessary(coordinates) { - const existing = await this._definitionService.getStored(coordinates) - let result = await this._defVersionChecker.validate(existing) - if (!result) { - await this._definitionService.computeStoreAndCurate(coordinates) - this.logger.info(`Handled definition update for ${coordinates.toString()}`) - } else { - this.logger.debug(`Skipped definition update for ${coordinates.toString()}`) + try { + const existing = await this._definitionService.getStored(coordinates) + let result = await this._defVersionChecker.validate(existing) + if (!result) { + await this._definitionService.computeStoreAndCurate(coordinates) + this.logger.info('Handled definition upgrade for %s', coordinates) + } else { + this.logger.debug('Skipped definition upgrade for %s', coordinates) + } + } catch (error) { + const context = `Error handling definition upgrade for ${coordinates.toString()}` + const newError = new Error(`${context}: ${error.message}`) + newError.stack = error.stack + throw newError } } } diff --git a/test/providers/upgrade/defUpgradeQueue.js b/test/providers/upgrade/defUpgradeQueue.js index b6c90423..8d286d19 100644 --- a/test/providers/upgrade/defUpgradeQueue.js +++ b/test/providers/upgrade/defUpgradeQueue.js @@ -10,7 +10,10 @@ const DefinitionQueueUpgrader = require('../../../providers/upgrade/defUpgradeQu const MemoryQueue = require('../../../providers/upgrade/memoryQueueConfig') describe('DefinitionQueueUpgrader', () => { - const logger = { debug: sinon.stub(), error: sinon.stub() } + let logger + beforeEach(() => { + logger = { debug: sinon.stub(), error: sinon.stub() } + }) describe('Unit tests', () => { const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } } diff --git a/test/providers/upgrade/processTest.js b/test/providers/upgrade/processTest.js index 3f5739a3..1ea7fe72 100644 --- a/test/providers/upgrade/processTest.js +++ b/test/providers/upgrade/processTest.js @@ -1,9 +1,13 @@ // (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license. // SPDX-License-Identifier: MIT +const chaiAsPromised = require('chai-as-promised') +const chai = require('chai') +chai.use(chaiAsPromised) const { expect } = require('chai') const sinon = require('sinon') const { QueueHandler, DefinitionUpgrader } = require('../../../providers/upgrade/process') +const EntityCoordinates = require('../../../lib/entityCoordinates') describe('Definition Upgrade Queue Processing', () => { let logger @@ -80,7 +84,9 @@ describe('Definition Upgrade Queue Processing', () => { }) describe('DefinitionUpgrader', () => { - const definition = Object.freeze({ coordinates: 'pypi/pypi/-/test/revision' }) + const coordinates = 'pypi/pypi/-/test/revision' + const definition = Object.freeze({ coordinates: EntityCoordinates.fromString(coordinates) }) + const message = Object.freeze({ data: { coordinates: definition.coordinates } }) let definitionService, versionChecker, upgrader beforeEach(() => { @@ -98,7 +104,7 @@ describe('Definition Upgrade Queue Processing', () => { definitionService.getStored.resolves(definition) versionChecker.validate.resolves() - await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } }) + await upgrader.processMessage(message) expect(definitionService.getStored.calledOnce).to.be.true expect(versionChecker.validate.calledOnce).to.be.true expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true @@ -108,7 +114,7 @@ describe('Definition Upgrade Queue Processing', () => { definitionService.getStored.resolves(definition) versionChecker.validate.resolves(definition) - await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } }) + await upgrader.processMessage(message) expect(definitionService.getStored.calledOnce).to.be.true expect(versionChecker.validate.calledOnce).to.be.true expect(definitionService.computeStoreAndCurate.notCalled).to.be.true @@ -118,7 +124,7 @@ describe('Definition Upgrade Queue Processing', () => { definitionService.getStored.resolves() versionChecker.validate.resolves() - await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } }) + await upgrader.processMessage(message) expect(definitionService.getStored.calledOnce).to.be.true expect(versionChecker.validate.calledOnce).to.be.true expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true @@ -130,5 +136,77 @@ describe('Definition Upgrade Queue Processing', () => { expect(versionChecker.validate.notCalled).to.be.true expect(definitionService.computeStoreAndCurate.notCalled).to.be.true }) + + it('handles exception by rethrowing with coordinates and the original error message', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves() + definitionService.computeStoreAndCurate.rejects(new Error('test')) + + await expect(upgrader.processMessage(message)).to.be.rejectedWith(Error, /pypi\/pypi\/-\/test\/revision: test/) + }) + }) + + describe('Integration Test', () => { + const definition = Object.freeze({ + coordinates: { type: 'pypi', provider: 'pypi', name: 'test', revision: 'revision' }, + _meta: { schemaVersion: '0.0.1' } + }) + const message = Object.freeze({ data: { ...definition } }) + + let queue, handler, definitionService, versionChecker + beforeEach(() => { + let definitionUpgrader + ;({ definitionService, versionChecker, definitionUpgrader } = setupDefinitionUpgrader(logger)) + queue = { + dequeueMultiple: sinon.stub().resolves([message]), + delete: sinon.stub().resolves() + } + handler = new QueueHandler(queue, logger, definitionUpgrader) + }) + + it('handles exception and logs the coordinates and the original error message', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves() + definitionService.computeStoreAndCurate.rejects(new Error('test')) + + await handler.work(true) + expect(queue.dequeueMultiple.calledOnce).to.be.true + expect(queue.delete.called).to.be.false + expect(logger.error.calledOnce).to.be.true + expect(logger.error.args[0][0].message).to.match(/pypi\/pypi\/-\/test\/revision: test/) + }) + + it('skips compute if a definition is up-to-date', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves(definition) + + await handler.work(true) + expect(definitionService.getStored.calledOnce).to.be.true + expect(versionChecker.validate.calledOnce).to.be.true + expect(definitionService.computeStoreAndCurate.notCalled).to.be.true + expect(queue.delete.called).to.be.true + }) + + it('recomputes a definition, if a definition is not up-to-date', async () => { + definitionService.getStored.resolves(definition) + versionChecker.validate.resolves() + await handler.work(true) + expect(definitionService.getStored.calledOnce).to.be.true + expect(versionChecker.validate.calledOnce).to.be.true + expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true + expect(queue.delete.called).to.be.true + }) }) }) + +function setupDefinitionUpgrader(logger) { + const definitionService = { + getStored: sinon.stub(), + computeStoreAndCurate: sinon.stub().resolves() + } + const versionChecker = { + validate: sinon.stub() + } + const definitionUpgrader = new DefinitionUpgrader(definitionService, logger, versionChecker) + return { definitionService, versionChecker, definitionUpgrader } +}