Skip to content

Commit

Permalink
feat: initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 30, 2023
1 parent 0f26721 commit 4d248ef
Show file tree
Hide file tree
Showing 21 changed files with 1,020 additions and 0 deletions.
33 changes: 33 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Tests

on:
push:
branches: [main]
pull_request:
branches: [main]

concurrency:
group: test-${{ github.ref }}
cancel-in-progress: true

permissions:
contents: read # to fetch code (actions/checkout)

jobs:
test:
runs-on: ubuntu-latest

name: testing node@${{ matrix.node-version }}, redis@${{ matrix.redis-version }}

strategy:
matrix:
node-version: [lts/*, lts/-1, lts/-2, current]
include:
- node-version: "lts/*"

steps:
- name: Checkout repository
uses: actions/checkout@v3 # v3
- uses: oven-sh/setup-bun@v1
- run: bun install
- run: bun test
64 changes: 64 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*

# Runtime data
pids
*.pid
*.seed
*.pid.lock

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage

# nyc test coverage
.nyc_output

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# Bower dependency directory (https://bower.io/)
bower_components

# node-waf configuration
.lock-wscript

# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release

# Dependency directories
node_modules/
jspm_packages/

# TypeScript v1 declaration files
typings/

# Optional npm cache directory
.npm

# Optional eslint cache
.eslintcache

# Optional REPL history
.node_repl_history

# Output of 'npm pack'
*.tgz

# Yarn Integrity file
.yarn-integrity

# dotenv environment variables file
.env

# next.js build output
.next

dist
firestoreServiceAccountKey.json
21 changes: 21 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Configure ENV variables
# PORT
# REDIS_HOST,
# REDIS_PORT
# REDIS_PASSWORD
# REDIS_TLS
# AUTH_TOKENS // comma separated list of tokens

FROM oven/bun:latest

EXPOSE 8080

COPY package.json ./
COPY bun.lockb ./
COPY src ./src

RUN bun install

#CMD bun run ./src/index.ts
CMD cat package.json
HEALTHCHECK --interval=5s --timeout=5s --retries=3 CMD wget localhost:8080 -q -O - > /dev/null 2>&1
Binary file added bun.lockb
Binary file not shown.
15 changes: 15 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: '3'
services:
proxy:
build: .
ports:
- 8080:8080
environment:
PORT: 8080
REDIS_HOST: ${REDIS_HOST}
REDIS_PORT: ${REDIS_PORT}
REDIS_PASSWORD: ${REDIS_PASSWORD}
REDIS_TLS: ${REDIS_TLS}
AUTH_TOKENS: ${AUTH_TOKENS}
redis:
image: "redis:alpine"
39 changes: 39 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"name": "bullmq-proxy",
"version": "0.1.0",
"main": "index.js",
"repository": "git@github.com:taskforcesh/bullmq-proxy.git",
"author": "Manuel Astudillo <manuel@optimalbits.com>",
"license": "MIT",
"scripts": {
"dev": "bun run --watch src/index.ts",
"start": "bun src/index.ts",
"test": "mocha --require ts-node/register tests/**/*.ts",
"build:declaration": "tsc --emitDeclarationOnly"
},
"dependencies": {
"@sinclair/typebox": "^0.31.17",
"@taskforcesh/message-broker": "https://github.com/taskforcesh/message-broker.git#master",
"bullmq": "latest",
"ioredis": "^5.3.2"
},
"trustedDependencies": [
"@taskforcesh/message-broker"
],
"devDependencies": {
"@tsconfig/node-lts": "^18.12.5",
"@types/chai": "^4.3.6",
"@types/jest": "^29.5.5",
"@types/mocha": "^10.0.1",
"@types/node": "^12.0.0",
"@types/sinon": "^10.0.17",
"bun-types": "^1.0.3",
"chai": "^4.3.8",
"jest": "^29.7.0",
"mocha": "^10.2.0",
"sinon": "^16.0.0",
"ts-jest": "^29.1.1",
"ts-node": "^8.2.0",
"typescript": "^5.2.2"
}
}
73 changes: 73 additions & 0 deletions src/controllers/commands.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Define the types for the commands that can be sent to the queue controller.
* We use typebox so that we can validate the commands sent to the proxy and return
* proper errors to the client.
*
*/
import { Type, Static } from "@sinclair/typebox";

enum QueueCommandTypes {
Add = "add",
Pause = "pause",
Resume = "resume",
Empty = "empty",
Clean = "clean",
Count = "count",
GetJobs = "getJobs",
GetJobsCount = "getJobsCount",
GetJobLogs = "getJobLogs",
RemoveRepeatable = "removeRepeatable",
JobsCommand = "jobs",
JobUpdate = "jobUpdate",
JobProgress = "jobProgress",
JobLog = "jobLog",
}

// add(name: NameType, data: DataType, opts?: JobsOptions): Promise<Job<DataType, ResultType, NameType>>;
const AddJobSchema = Type.Object({
fn: Type.Literal(QueueCommandTypes.Add),
args: Type.Tuple([Type.String(), Type.Any(), Type.Optional(Type.Any())]),
});

const PauseSchema = Type.Object({
fn: Type.Literal(QueueCommandTypes.Pause),
args: Type.Tuple([]),
});

const ResumeSchema = Type.Object({
fn: Type.Literal(QueueCommandTypes.Resume),
args: Type.Tuple([]),
});

const GetJobsSchema = Type.Object({
fn: Type.Literal(QueueCommandTypes.GetJobs),
args: Type.Tuple([
Type.Union([
Type.Literal("completed"),
Type.Literal("failed"),
Type.Literal("active"),
Type.Literal("delayed"),
Type.Literal("waiting"),
Type.Literal("waiting-children"),
Type.Literal("prioritized"),
]),
Type.Optional(Type.Number()), // start
Type.Optional(Type.Number()), // end
Type.Optional(Type.Boolean()), // asc
]),
});

const UpdateJobProgressSchema = Type.Object({
fn: Type.Literal(QueueCommandTypes.JobProgress),
args: Type.Tuple([Type.String(), Type.Union([Type.Number(), Type.Any()])]),
});

export const QueueSchema = Type.Union([
AddJobSchema,
PauseSchema,
ResumeSchema,
GetJobsSchema,
UpdateJobProgressSchema,
]);

export type QueueSchemaType = Static<typeof QueueSchema>;
5 changes: 5 additions & 0 deletions src/controllers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export * from './commands';
export * from './queue-events';
export * from './queue';
export * from './utils';
export * from './worker';
73 changes: 73 additions & 0 deletions src/controllers/queue-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { ServerWebSocket } from "bun";
import { MessageBroker } from "@taskforcesh/message-broker";
import { QueueEvents, ConnectionOptions, QueueEventsListener } from "bullmq";

import { WebSocketBehaviour } from "../interfaces/websocket-behaviour";
import { send } from "./utils";
import { log } from "../utils/log";

export interface QueueEventsWebSocketData {
connection: ConnectionOptions;
queueEvents: QueueEvents;
queueName: string;
events?: (keyof QueueEventsListener)[];
mb: MessageBroker<object>;
}

export const openQueueEvents = async (
ws: ServerWebSocket<QueueEventsWebSocketData>
) => {
const { connection, queueName } = ws.data;
const queueEvents = (ws.data.queueEvents = new QueueEvents(queueName, {
connection,
}));

const messageBroker = (ws.data.mb = new MessageBroker<object>(
async (msg: string | Buffer) => send(ws, msg)
));

const events = ws.data.events || [];
const cleanUps = [];

events.forEach((event) => {
const eventHandler = async (...args: any[]) => {
await messageBroker.sendData(
{
event,
args,
},
{ noack: true }
);
};
log(`Subscribing to event: ${event}, for queue: ${queueName}`);
queueEvents.on(event, eventHandler);
cleanUps.push(() => queueEvents.off(event, eventHandler));
});
};

export const QueueEventsController: WebSocketBehaviour = {
open: openQueueEvents,

message: async (
_ws: ServerWebSocket<QueueEventsWebSocketData>,
_message: string | Buffer
) => { },

drain: (_ws) => {
// console.log("WebSocket backpressure: " + ws.getBufferedAmount());
},

close: async (ws, code, message) => {
log(
`WebSocket closed for queue events (${ws.data.queueName}) with code ${code}${message ? `and message ${Buffer.from(
message
).toString()}` : ""}`
);

const { queueEvents } = ws.data;
if (queueEvents) {
ws.data.queueEvents = null;
await queueEvents.close();
}
},
};
Loading

0 comments on commit 4d248ef

Please sign in to comment.