Skip to content

Commit

Permalink
Merge pull request #27 from Project-OMOTES/WouterSpaak/issue11
Browse files Browse the repository at this point in the history
 Workflow definitions are not configured anymore on both sides but rather shared by the orchestrator on startup.
  • Loading branch information
WouterSpaak authored Oct 16, 2024
2 parents 7707074 + dc0337c commit 8f1a3c7
Show file tree
Hide file tree
Showing 22 changed files with 233 additions and 39 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
"@nestjs/common": "^10.0.2",
"@nestjs/core": "^10.0.2",
"@nestjs/platform-express": "^10.0.2",
"@omotes/proto": "^0.1.9",
"@omotes/proto": "^0.1.12",
"amqplib": "^0.10.3",
"axios": "^1.6.0",
"express": "^4.18.1",
Expand Down
2 changes: 1 addition & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"amqplib": "0.10.3",
"uuidv7": "0.6.3",
"tslib": "2.6.2",
"@omotes/proto": "^0.1.9",
"@omotes/proto": "^0.1.12",
"google-protobuf": "^3.21.2",
"influx": "^5.9.3"
},
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
export * from '@omotes/proto';

export * from './lib/Job';
export * from './lib/OmotesSDK';
export * from './lib/channel';
export * from './lib/handlers/ProgressHandler';
export * from './lib/handlers/ResultHandler';
export * from './lib/handlers/StatusHandler';
export * from './lib/Job';
export * from './lib/OmotesSDK';
export * from './lib/types';
23 changes: 23 additions & 0 deletions sdk/src/lib/Job.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,27 @@ describe('Job', () => {
expect(handler).toBeInstanceOf(ResultHandler);
});
});

describe('params and job reference', () => {
it('should correctly set params', () => {
job.setParams({ foo: 'bar', baz: 42 });
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const struct = getSubmissionFromJob(job).getParamsDict()!;
expect(struct.toJavaScript()).toEqual({ foo: 'bar', baz: 42 });
});

it('should correctly set job reference', () => {
job.setJobReference('job_reference');
expect(getSubmissionFromJob(job).getJobReference()).toBe('job_reference');
});

it('should allow chained method calls', () => {
expect(() => job.setParams({}).setJobReference('')).not.toThrow();
});
});
});

function getSubmissionFromJob(job: Job) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return (job as any).jobSubmission as JobSubmission;
}
21 changes: 18 additions & 3 deletions sdk/src/lib/Job.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { JobCancel, JobSubmission } from '@omotes/proto';
import { JobCancel, JobSubmission, Workflow } from '@omotes/proto';
import { Channel, Connection } from 'amqplib';
import { JavaScriptValue, Struct } from 'google-protobuf/google/protobuf/struct_pb';
import { from } from 'rxjs';
import { uuidv7 } from 'uuidv7';
import { getChannel } from './channel';
import { ProgressHandler } from './handlers/ProgressHandler';
import { ResultHandler } from './handlers/ResultHandler';
import { StatusHandler } from './handlers/StatusHandler';
import { getCancellationsQueue, getProgressQueue, getResultQueue, getStatusQueue, getSubmissionsQueue } from './queue';
import { JobTypeName } from './types';


export type ParamsDict = { [key: string]: JavaScriptValue };

export class Job {
public readonly uuid = uuidv7();
private readonly jobSubmission = new JobSubmission();

constructor(
public readonly type: JobTypeName,
public readonly type: Workflow.AsObject['typeName'],
private readonly esdl: string,
private readonly conn: Connection,
private readonly channel: Channel,
Expand All @@ -27,12 +30,24 @@ export class Job {

public start() {
this.channel.sendToQueue(getSubmissionsQueue(), this.toBuffer(this.jobSubmission), { persistent: true });
return this;
}

public cancel() {
const cancel = new JobCancel();
cancel.setUuid(this.uuid);
this.channel.sendToQueue(getCancellationsQueue(), this.toBuffer(cancel), { persistent: true });
return this;
}

public setParams(params: ParamsDict) {
this.jobSubmission.setParamsDict(Struct.fromJavaScript(params));
return this;
}

public setJobReference(reference: string) {
this.jobSubmission.setJobReference(reference);
return this;
}

public getProgressHandler() {
Expand Down
12 changes: 9 additions & 3 deletions sdk/src/lib/OmotesSDK.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { Workflow } from '@omotes/proto';
import { Connection, connect } from 'amqplib';
import { Job } from './Job';
import { Observable } from 'rxjs';
import { getChannel } from './channel';
import { Job } from './Job';
import { getProfile } from './profiles';
import { getSubmissionsQueue } from './queue';
import { JobTypeName, OmotesSDKOptions } from './types';
import { OmotesSDKOptions } from './types';
import { setupAvailableWorkflows } from './workflow';

export class OmotesSDK {
private _connection: Connection | null = null;
public workflows!: Observable<Workflow.AsObject[]>;

private get connection() {
if (!this._connection) {
throw new Error(`OmotesSDK is not connected. Call connect() first.`);
Expand All @@ -24,9 +29,10 @@ export class OmotesSDK {
port: this.options.rabbitMQPort,
vhost: 'omotes',
});
this.workflows = await setupAvailableWorkflows(this.connection, this.options.id);
}

public async createJob(type: JobTypeName, esdl: string) {
public async createJob(type: Workflow.AsObject['typeName'], esdl: string) {
const queue = getSubmissionsQueue();
const { channel } = await getChannel(this.connection, queue);
const job = new Job(type, esdl, this.connection, channel);
Expand Down
6 changes: 6 additions & 0 deletions sdk/src/lib/channel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@ describe('getChannel', () => {
expect(connection.channel.assertQueue).toHaveBeenCalledWith('queue', { durable: true });
expect(connection.channel.assertExchange).toHaveBeenCalledWith('omotes_exchange', 'direct');
});

it('should correctly amend routing key', async () => {
await getChannel(connection as unknown as Connection, 'queue', 'routingKey');
expect(connection.channel.assertQueue).toHaveBeenCalledWith('queue', { durable: true });
expect(connection.channel.bindQueue).toHaveBeenCalledWith('queue', 'exchange', 'routingKey');
});
});
4 changes: 2 additions & 2 deletions sdk/src/lib/channel.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Connection } from 'amqplib';

export async function getChannel(connection: Connection, queueName: string) {
export async function getChannel(connection: Connection, queueName: string, routingKey?: string) {
const channel = await connection.createChannel();
const exchange = await channel.assertExchange('omotes_exchange', 'direct');
const queue = await channel.assertQueue(queueName, { durable: true });
await channel.bindQueue(queue.queue, exchange.exchange, queueName);
await channel.bindQueue(queue.queue, exchange.exchange, routingKey ?? queueName);
return { channel, exchange };
}
48 changes: 48 additions & 0 deletions sdk/src/lib/handlers/AvailableWorkflowsHandler.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { AvailableWorkflows, Workflow } from '@omotes/proto';
import { Channel } from 'amqplib';
import { of } from 'rxjs';
import { MockChannel } from '../../util/MockChannel.spec';
import { AvailableWorkflowsHandler } from './AvailableWorkflowsHandler';

describe('AvailableWorkflowsHandler', () => {
describe('#getWorkflows', () => {
let channel: MockChannel<AvailableWorkflows>;
let handler: AvailableWorkflowsHandler;

beforeEach(() => {
channel = new MockChannel();
handler = new AvailableWorkflowsHandler(
of(channel as unknown as Channel),
'client_id'
);
});

it('should report workflows', (done) => {
const { message, workflows } = getWorkflows();
handler.getWorkflows().subscribe((responses) => {
expect(responses).toEqual(workflows);
done();
});
channel.pushMessage(message);
});
});
});

function getWorkflows() {
const message = new AvailableWorkflows();
const workflows = [
{ typeName: 'workflow1', typeDescription: 'description1', parametersList: [] },
{ typeName: 'workflow2', typeDescription: 'description2', parametersList: [] },
];
const list = workflows.map(({ typeDescription, typeName }) => {
const w = new Workflow();
w.setTypeDescription(typeDescription);
w.setTypeName(typeName);
return w;
})
message.setWorkflowsList(list);
return {
workflows,
message
}
}
24 changes: 24 additions & 0 deletions sdk/src/lib/handlers/AvailableWorkflowsHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { AvailableWorkflows } from '@omotes/proto';
import { Channel } from 'amqplib';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { Handler } from './Handler';

export class AvailableWorkflowsHandler extends Handler {
protected override queue = this.getQueueName();
constructor(protected override readonly channel$: Observable<Channel>, private readonly clientId?: string) {
super(channel$);
}

public getWorkflows() {
return this.channelToRx().pipe(
map((message) => {
return AvailableWorkflows.deserializeBinary(message.content).toObject().workflowsList;
})
)
}

private getQueueName() {
return `available_workflows.${this.clientId}`;
}
}
11 changes: 3 additions & 8 deletions sdk/src/lib/handlers/Handler.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { JobSubmission } from '@omotes/proto';
import { Channel, Connection } from 'amqplib';
import { Channel } from 'amqplib';
import { of } from 'rxjs';
import { MockChannel, MockConnection } from '../../util/MockChannel.spec';
import { Job } from '../Job';
import { MockChannel } from '../../util/MockChannel.spec';
import { Handler } from './Handler';

class ConcreteHandler extends Handler {
Expand All @@ -14,15 +13,11 @@ class ConcreteHandler extends Handler {

describe('Handler', () => {
let handler: ConcreteHandler;
let job: Job;
let channel: MockChannel<JobSubmission>;
let connection: MockConnection;

beforeEach(() => {
channel = new MockChannel();
connection = new MockConnection();
job = new Job('grow_simulator', 'esdl', connection as unknown as Connection, channel as unknown as Channel);
handler = new ConcreteHandler(job, of(channel as unknown as Channel));
handler = new ConcreteHandler(of(channel as unknown as Channel));
});

it('should ack messages', () => {
Expand Down
3 changes: 1 addition & 2 deletions sdk/src/lib/handlers/Handler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Channel, ConsumeMessage } from 'amqplib';
import { Observable, Subject, switchMap, takeUntil } from 'rxjs';
import { Job } from '../Job';

export abstract class Handler {
protected readonly abstract queue: string;
protected readonly close$ = new Subject<void>();
constructor(protected readonly job: Job, protected readonly channel$: Observable<Channel>) { }
constructor(protected readonly channel$: Observable<Channel>) { }

protected channelToRx() {
return this.channel$.pipe(
Expand Down
10 changes: 10 additions & 0 deletions sdk/src/lib/handlers/JobHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Channel } from 'amqplib';
import { Observable } from 'rxjs';
import { Job } from '../Job';
import { Handler } from './Handler';

export abstract class JobHandler extends Handler {
constructor(protected readonly job: Job, protected override readonly channel$: Observable<Channel>) {
super(channel$);
}
}
4 changes: 2 additions & 2 deletions sdk/src/lib/handlers/ProgressHandler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { JobProgressUpdate } from '@omotes/proto';
import { map } from 'rxjs';
import { getProgressQueue } from '../queue';
import { Handler } from './Handler';
import { JobHandler } from './JobHandler';

export class ProgressHandler extends Handler {
export class ProgressHandler extends JobHandler {
protected override queue: string = getProgressQueue(this.job);

public getProgress() {
Expand Down
22 changes: 22 additions & 0 deletions sdk/src/lib/handlers/RequestAvailableWorkflowsHandler.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { RequestAvailableWorkflows } from '@omotes/proto';
import { Channel } from 'amqplib';
import { MockChannel } from '../../util/MockChannel.spec';
import { RequestAvailableWorkflowsHandler } from './RequestAvailableWorkflowsHandler';

describe('RequestAvailableWorkflowsHandler', () => {
let channel: MockChannel<RequestAvailableWorkflows>;
let handler: RequestAvailableWorkflowsHandler;
beforeEach(() => {
channel = new MockChannel();
handler = new RequestAvailableWorkflowsHandler();
});

describe('#start', () => {
it('should send a message to the channel', () => {
handler.start(channel as unknown as Channel);
expect(channel.sendToQueue).toHaveBeenCalledWith('request_available_workflows', expect.any(Buffer), {
persistent: true
});
});
})
});
11 changes: 11 additions & 0 deletions sdk/src/lib/handlers/RequestAvailableWorkflowsHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { RequestAvailableWorkflows } from '@omotes/proto';
import { Channel } from 'amqplib';

export class RequestAvailableWorkflowsHandler {
private queue = 'request_available_workflows';

public start(channel: Channel) {
const message = new RequestAvailableWorkflows();
return channel.sendToQueue(this.queue, Buffer.from(message.serializeBinary()), { persistent: true });
}
}
4 changes: 2 additions & 2 deletions sdk/src/lib/handlers/ResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { JobResult } from '@omotes/proto';
import { first, map } from 'rxjs';
import { getResultQueue } from '../queue';
import { Handler } from './Handler';
import { JobHandler } from './JobHandler';

export class ResultHandler extends Handler {
export class ResultHandler extends JobHandler {
protected override queue: string = getResultQueue(this.job);

public getResult() {
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/lib/handlers/StatusHandler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { JobStatusUpdate } from '@omotes/proto';
import { Observable, map } from 'rxjs';
import { getStatusQueue } from '../queue';
import { Handler } from './Handler';
import { JobHandler } from './JobHandler';

export class StatusHandler extends Handler {
export class StatusHandler extends JobHandler {
protected override queue: string = getStatusQueue(this.job);

public getStatus(): Observable<JobStatusUpdate.JobStatusMap[keyof JobStatusUpdate.JobStatusMap]> {
Expand Down
Loading

0 comments on commit 8f1a3c7

Please sign in to comment.