Skip to content

Commit

Permalink
fix: creates message exchanges for multiple queue bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
xndlnk committed May 31, 2019
1 parent f17e850 commit adb11d6
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe(ExchangesFromApiProducer.name, () => {
}).compile()
})

it('transforms', async() => {
it('creates message exchanges and flows for each queue binding', async() => {

const apiService = app.get<RabbitMqManagementApiService>(RabbitMqManagementApiService)
jest.spyOn(apiService, 'getQueues').mockImplementation(async() => testQueues)
Expand All @@ -33,18 +33,20 @@ describe(ExchangesFromApiProducer.name, () => {
expect(outputSystem).not.toBeNull()

expect(outputSystem.getMicroServices()).toHaveLength(1)
expect(outputSystem.getMessageExchanges()).toHaveLength(1)
expect(outputSystem.getMessageExchanges()).toHaveLength(2)

const receiverServiceName = 'receiver-service'
const sourceExchangeName = 'source-exchange'
expect(outputSystem.getMicroServices()[0].getName()).toEqual('receiver-service')
expect(outputSystem.getMessageExchanges()[0].getName()).toEqual('source-exchange-1')
expect(outputSystem.getMessageExchanges()[1].getName()).toEqual('source-exchange-2')

expect(outputSystem.getMicroServices()[0].getPayload().name).toEqual(receiverServiceName)
expect(outputSystem.getMessageExchanges()[0].getPayload().name).toEqual(sourceExchangeName)
expect(outputSystem.getAsyncEventFlows()).toHaveLength(2)

expect(outputSystem.getAsyncEventFlows()).toHaveLength(1)
expect(outputSystem.getAsyncEventFlows()[0].source.id).toEqual(outputSystem.getMessageExchanges()[0].id)
expect(outputSystem.getAsyncEventFlows()[0].target.id).toEqual(outputSystem.getMicroServices()[0].id)

expect(outputSystem.getAsyncEventFlows()[1].source.id).toEqual(outputSystem.getMessageExchanges()[1].id)
expect(outputSystem.getAsyncEventFlows()[1].target.id).toEqual(outputSystem.getMicroServices()[0].id)

verifyEachContentHasTransformer(outputSystem, ExchangesFromApiProducer.name)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { ConfigService } from '../../config/Config.service'
import { System, AsyncEventFlow } from '../../model/ms'
import { RabbitMqManagementApiService } from '../api/api.service'

type Binding = {
exchange: string
queue: string
}

@Injectable()
export class ExchangesFromApiProducer {
private readonly logger = new Logger(ExchangesFromApiProducer.name)
Expand All @@ -15,12 +20,13 @@ export class ExchangesFromApiProducer {
) { }

public async transform(system: System): Promise<System> {
const queues = await this.getQueueNames()
const queueNames = await this.getQueueNames()

const bindingPromises = queues.map(queue => this.getBinding(queue))
const bindings = await Promise.all(bindingPromises)
const bindingsPromises: Promise<Binding[]>[] = queueNames.map(queue => this.getBindings(queue))
const bindings: Binding[][] = await Promise.all(bindingsPromises)
const allBindings: Binding[] = _.flatMap(bindings)

this.addEdgesFromBindings(system, bindings)
this.addEdgesFromBindings(system, allBindings)

return system
}
Expand All @@ -32,7 +38,7 @@ export class ExchangesFromApiProducer {
return queues.map(queue => { return queue.name })
}

private addEdgesFromBindings(system: System, bindings) {
private addEdgesFromBindings(system: System, bindings: Binding[]) {
bindings
.filter(binding => binding.exchange !== '')
.forEach(binding => {
Expand All @@ -49,17 +55,20 @@ export class ExchangesFromApiProducer {
return system
}

private async getBinding(queueName) {
private async getBindings(queueName): Promise<Binding[]> {
const bindingsData = await this.apiService.getBindings(queueName)

let binding = { 'exchange': '', 'queue': queueName }
const firstBindingHavingSource = bindingsData.find(element => { return element.source !== '' })
if (firstBindingHavingSource) {
binding = { 'exchange': firstBindingHavingSource.source, 'queue': queueName }
}
this.logger.log('found binding of queue ' + binding.queue + ' to exchange ' + binding.exchange)
const bindings: Binding[] = bindingsData.filter(element => element.source !== '')
.map(element => {
const binding = {
'exchange': element.source,
'queue': queueName
}
this.logger.log('found binding of queue ' + binding.queue + ' to exchange ' + binding.exchange)
return binding
})

return binding
return bindings
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
"destination_type": "queue"
},
{
"source": "source-exchange",
"source": "source-exchange-1",
"vhost": "/",
"destination": "receiver-service.routingKey.publish.update",
"destination_type": "queue"
},
{
"source": "source-exchange-2",
"vhost": "/",
"destination": "receiver-service.routingKey.publish.update",
"destination_type": "queue"
Expand Down

0 comments on commit adb11d6

Please sign in to comment.