Skip to content

Commit

Permalink
Merge pull request #1258 from qtomlinson/qt/wip
Browse files Browse the repository at this point in the history
Adjust logging and allow configuration of batch size for dequeueMultiple
  • Loading branch information
elrayle authored Dec 18, 2024
2 parents 0d72a12 + 6345358 commit 9cb1c84
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 16 deletions.
2 changes: 1 addition & 1 deletion providers/upgrade/azureQueueConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const defaultOptions = {
config.get('DEFINITION_UPGRADE_QUEUE_CONNECTION_STRING') || config.get('HARVEST_AZBLOB_CONNECTION_STRING'),
queueName: config.get('DEFINITION_UPGRADE_QUEUE_NAME') || 'definitions-upgrade',
dequeueOptions: {
numOfMessages: 32,
numOfMessages: config.get('DEFINITION_UPGRADE_DEQUEUE_BATCH_SIZE') || 16,
visibilityTimeout: 10 * 60 // 10 min. The default value is 30 seconds.
}
}
Expand Down
6 changes: 3 additions & 3 deletions providers/upgrade/defUpgradeQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class DefinitionQueueUpgrader extends DefinitionVersionChecker {
try {
const message = this._constructMessage(definition)
await this._upgrade.queue(message)
this.logger.debug('Queued for definition upgrade ', {
this.logger.info('Queued for definition upgrade ', {
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
} catch (error) {
//continue if queuing fails and requeue at the next request.
this.logger.error(`Error queuing for definition upgrade ${error.message}`, {
//continue if queueing fails and requeue at the next request.
this.logger.error(`Error queueing for definition upgrade ${error.message}`, {
error,
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
Expand Down
21 changes: 14 additions & 7 deletions providers/upgrade/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,20 @@ class DefinitionUpgrader {
}

async _upgradeIfNecessary(coordinates) {
const existing = await this._definitionService.getStored(coordinates)
let result = await this._defVersionChecker.validate(existing)
if (!result) {
await this._definitionService.computeStoreAndCurate(coordinates)
this.logger.info(`Handled definition update for ${coordinates.toString()}`)
} else {
this.logger.debug(`Skipped definition update for ${coordinates.toString()}`)
try {
const existing = await this._definitionService.getStored(coordinates)
let result = await this._defVersionChecker.validate(existing)
if (!result) {
await this._definitionService.computeStoreAndCurate(coordinates)
this.logger.info('Handled definition upgrade for %s', coordinates)
} else {
this.logger.debug('Skipped definition upgrade for %s', coordinates)
}
} catch (error) {
const context = `Error handling definition upgrade for ${coordinates.toString()}`
const newError = new Error(`${context}: ${error.message}`)
newError.stack = error.stack
throw newError
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion test/providers/upgrade/defUpgradeQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ const DefinitionQueueUpgrader = require('../../../providers/upgrade/defUpgradeQu
const MemoryQueue = require('../../../providers/upgrade/memoryQueueConfig')

describe('DefinitionQueueUpgrader', () => {
const logger = { debug: sinon.stub(), error: sinon.stub() }
let logger
beforeEach(() => {
logger = { debug: sinon.stub(), error: sinon.stub() }
})

describe('Unit tests', () => {
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
Expand Down
86 changes: 82 additions & 4 deletions test/providers/upgrade/processTest.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
// SPDX-License-Identifier: MIT

const chaiAsPromised = require('chai-as-promised')
const chai = require('chai')
chai.use(chaiAsPromised)
const { expect } = require('chai')
const sinon = require('sinon')
const { QueueHandler, DefinitionUpgrader } = require('../../../providers/upgrade/process')
const EntityCoordinates = require('../../../lib/entityCoordinates')

describe('Definition Upgrade Queue Processing', () => {
let logger
Expand Down Expand Up @@ -80,7 +84,9 @@ describe('Definition Upgrade Queue Processing', () => {
})

describe('DefinitionUpgrader', () => {
const definition = Object.freeze({ coordinates: 'pypi/pypi/-/test/revision' })
const coordinates = 'pypi/pypi/-/test/revision'
const definition = Object.freeze({ coordinates: EntityCoordinates.fromString(coordinates) })
const message = Object.freeze({ data: { coordinates: definition.coordinates } })
let definitionService, versionChecker, upgrader

beforeEach(() => {
Expand All @@ -98,7 +104,7 @@ describe('Definition Upgrade Queue Processing', () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves()

await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } })
await upgrader.processMessage(message)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true
Expand All @@ -108,7 +114,7 @@ describe('Definition Upgrade Queue Processing', () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves(definition)

await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } })
await upgrader.processMessage(message)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.notCalled).to.be.true
Expand All @@ -118,7 +124,7 @@ describe('Definition Upgrade Queue Processing', () => {
definitionService.getStored.resolves()
versionChecker.validate.resolves()

await upgrader.processMessage({ data: { coordinates: 'pypi/pypi/-/test/revision' } })
await upgrader.processMessage(message)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true
Expand All @@ -130,5 +136,77 @@ describe('Definition Upgrade Queue Processing', () => {
expect(versionChecker.validate.notCalled).to.be.true
expect(definitionService.computeStoreAndCurate.notCalled).to.be.true
})

it('handles exception by rethrowing with coordinates and the original error message', async () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves()
definitionService.computeStoreAndCurate.rejects(new Error('test'))

await expect(upgrader.processMessage(message)).to.be.rejectedWith(Error, /pypi\/pypi\/-\/test\/revision: test/)
})
})

describe('Integration Test', () => {
const definition = Object.freeze({
coordinates: { type: 'pypi', provider: 'pypi', name: 'test', revision: 'revision' },
_meta: { schemaVersion: '0.0.1' }
})
const message = Object.freeze({ data: { ...definition } })

let queue, handler, definitionService, versionChecker
beforeEach(() => {
let definitionUpgrader
;({ definitionService, versionChecker, definitionUpgrader } = setupDefinitionUpgrader(logger))
queue = {
dequeueMultiple: sinon.stub().resolves([message]),
delete: sinon.stub().resolves()
}
handler = new QueueHandler(queue, logger, definitionUpgrader)
})

it('handles exception and logs the coordinates and the original error message', async () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves()
definitionService.computeStoreAndCurate.rejects(new Error('test'))

await handler.work(true)
expect(queue.dequeueMultiple.calledOnce).to.be.true
expect(queue.delete.called).to.be.false
expect(logger.error.calledOnce).to.be.true
expect(logger.error.args[0][0].message).to.match(/pypi\/pypi\/-\/test\/revision: test/)
})

it('skips compute if a definition is up-to-date', async () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves(definition)

await handler.work(true)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.notCalled).to.be.true
expect(queue.delete.called).to.be.true
})

it('recomputes a definition, if a definition is not up-to-date', async () => {
definitionService.getStored.resolves(definition)
versionChecker.validate.resolves()
await handler.work(true)
expect(definitionService.getStored.calledOnce).to.be.true
expect(versionChecker.validate.calledOnce).to.be.true
expect(definitionService.computeStoreAndCurate.calledOnce).to.be.true
expect(queue.delete.called).to.be.true
})
})
})

function setupDefinitionUpgrader(logger) {
const definitionService = {
getStored: sinon.stub(),
computeStoreAndCurate: sinon.stub().resolves()
}
const versionChecker = {
validate: sinon.stub()
}
const definitionUpgrader = new DefinitionUpgrader(definitionService, logger, versionChecker)
return { definitionService, versionChecker, definitionUpgrader }
}

0 comments on commit 9cb1c84

Please sign in to comment.