diff --git a/sdk/src/lib/OmotesSDK.ts b/sdk/src/lib/OmotesSDK.ts index a31c918..7b1d670 100644 --- a/sdk/src/lib/OmotesSDK.ts +++ b/sdk/src/lib/OmotesSDK.ts @@ -29,7 +29,9 @@ export class OmotesSDK { port: this.options.rabbitMQPort, vhost: 'omotes', }); - this.workflows = await setupAvailableWorkflows(this.connection, this.options.id); + const { trigger, workflows } = await setupAvailableWorkflows(this.connection, this.options.id); + this.workflows = workflows; + trigger(); } public async createJob(type: Workflow.AsObject['typeName'], esdl: string) { diff --git a/sdk/src/lib/workflow.spec.ts b/sdk/src/lib/workflow.spec.ts index 276c006..f815a59 100644 --- a/sdk/src/lib/workflow.spec.ts +++ b/sdk/src/lib/workflow.spec.ts @@ -18,6 +18,6 @@ describe('setupAvailableWorkflows', () => { const assertCalls = connection.channel.assertQueue.mock.calls.map(([queueName]) => { return queueName; }); - expect(assertCalls).toEqual(['available_workflows.client_id', 'request_available_workflows']); + expect(assertCalls).toEqual(['available_workflows.client_id']); }) }); \ No newline at end of file diff --git a/sdk/src/lib/workflow.ts b/sdk/src/lib/workflow.ts index 8f16d02..40b79bf 100644 --- a/sdk/src/lib/workflow.ts +++ b/sdk/src/lib/workflow.ts @@ -1,18 +1,18 @@ +import { RequestAvailableWorkflows } from '@omotes/proto'; import { Connection } from 'amqplib'; import { from, map, shareReplay } from 'rxjs'; import { getChannel } from './channel'; import { AvailableWorkflowsHandler } from './handlers/AvailableWorkflowsHandler'; -import { RequestAvailableWorkflowsHandler } from './handlers/RequestAvailableWorkflowsHandler'; export async function setupAvailableWorkflows(connection: Connection, clientId: string) { const availableChannel$ = from(getChannel(connection, `available_workflows.${clientId}`, 'available_workflows')).pipe( map(({ channel }) => channel) ); - const { channel: requestChannel } = await getChannel(connection, 'request_available_workflows'); - - const requestHandler = new RequestAvailableWorkflowsHandler(); const workflowsHandler = new AvailableWorkflowsHandler(availableChannel$, clientId); - requestHandler.start(requestChannel); - return workflowsHandler.getWorkflows().pipe(shareReplay(1)); + const requestChannel = await connection.createChannel(); + return { + workflows: workflowsHandler.getWorkflows().pipe(shareReplay(1)), + trigger: () => requestChannel.sendToQueue('request_available_workflows', Buffer.from(new RequestAvailableWorkflows().serializeBinary())) + }; }