Skip to content

Commit

Permalink
Merge pull request #31 from Project-OMOTES/WouterSpaak/correctly-asse…
Browse files Browse the repository at this point in the history
…rt-workflows

Send message to queue correctly.
  • Loading branch information
WouterSpaak authored Nov 26, 2024
2 parents 9f02904 + bf66dd5 commit 636bdf6
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
4 changes: 3 additions & 1 deletion sdk/src/lib/OmotesSDK.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ export class OmotesSDK {
port: this.options.rabbitMQPort,
vhost: 'omotes',
});
this.workflows = await setupAvailableWorkflows(this.connection, this.options.id);
const { trigger, workflows } = await setupAvailableWorkflows(this.connection, this.options.id);
this.workflows = workflows;
trigger();
}

public async createJob(type: Workflow.AsObject['typeName'], esdl: string) {
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/lib/workflow.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ describe('setupAvailableWorkflows', () => {
const assertCalls = connection.channel.assertQueue.mock.calls.map(([queueName]) => {
return queueName;
});
expect(assertCalls).toEqual(['available_workflows.client_id', 'request_available_workflows']);
expect(assertCalls).toEqual(['available_workflows.client_id']);
})
});
12 changes: 6 additions & 6 deletions sdk/src/lib/workflow.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { RequestAvailableWorkflows } from '@omotes/proto';
import { Connection } from 'amqplib';
import { from, map, shareReplay } from 'rxjs';
import { getChannel } from './channel';
import { AvailableWorkflowsHandler } from './handlers/AvailableWorkflowsHandler';
import { RequestAvailableWorkflowsHandler } from './handlers/RequestAvailableWorkflowsHandler';

export async function setupAvailableWorkflows(connection: Connection, clientId: string) {
const availableChannel$ = from(getChannel(connection, `available_workflows.${clientId}`, 'available_workflows')).pipe(
map(({ channel }) => channel)
);
const { channel: requestChannel } = await getChannel(connection, 'request_available_workflows');

const requestHandler = new RequestAvailableWorkflowsHandler();
const workflowsHandler = new AvailableWorkflowsHandler(availableChannel$, clientId);
requestHandler.start(requestChannel);
return workflowsHandler.getWorkflows().pipe(shareReplay(1));
const requestChannel = await connection.createChannel();
return {
workflows: workflowsHandler.getWorkflows().pipe(shareReplay(1)),
trigger: () => requestChannel.sendToQueue('request_available_workflows', Buffer.from(new RequestAvailableWorkflows().serializeBinary()))
};
}

0 comments on commit 636bdf6

Please sign in to comment.