Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove-multitenacy #150

Merged
merged 4 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
export interface Job<ScheduleType extends string = string> {
tenant: string;

export interface Job<ScheduleType extends string = string> {
id: string;
queue: string;
payload: string;
Expand All @@ -20,7 +18,6 @@ export interface Job<ScheduleType extends string = string> {
}

export interface JobEnqueue<ScheduleType extends string = string> {
tenant: string;
id: string;
queue: string;
payload: string;
Expand Down
27 changes: 2 additions & 25 deletions src/activity/activity.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { Redis } from "ioredis";
import { Closable } from "../Closable";
import {
decodeRedisKey,
encodeRedisKey,
tenantToRedisPrefix,
} from "../encodeRedisKey";
import { decodeRedisKey, encodeRedisKey } from "../encodeRedisKey";
import { Job } from "../Job";
import { Producer } from "../producer/producer";

Expand Down Expand Up @@ -55,36 +51,31 @@ interface ScheduledEvent {

interface InvokedEvent {
type: "invoked";
tenant: string;
id: string;
queue: string;
}

interface RescheduledEvent {
type: "rescheduled";
tenant: string;
id: string;
queue: string;
runAt: Date;
}

interface DeletedEvent {
type: "deleted";
tenant: string;
id: string;
queue: string;
}

interface RequestedEvent {
type: "requested";
tenant: string;
id: string;
queue: string;
}

interface AcknowledgedEvent {
type: "acknowledged";
tenant: string;
id: string;
queue: string;
}
Expand All @@ -94,7 +85,6 @@ export class Activity<ScheduleType extends string> implements Closable {
private producer;

constructor(
public readonly tenant: string,
redisFactory: () => Redis,
private readonly onEvent: OnActivity,
options: SubscriptionOptions = {}
Expand All @@ -114,23 +104,13 @@ export class Activity<ScheduleType extends string> implements Closable {
options.id = encodeRedisKey(options.id);
}

this.redis.psubscribe(
`${tenantToRedisPrefix(tenant)}${options.queue ?? "*"}:${
options.id ?? "*"
}`
);
this.redis.psubscribe(`${options.queue ?? "*"}:${options.id ?? "*"}`);
}

private async handleMessage(channel: string, message: string) {
const [_type, ...args] = splitEvent(message, 9);
const type = _type as OnActivityEvent["type"];

let tenant = "";
if (channel.startsWith("{")) {
tenant = channel.slice(1, channel.indexOf("}"));
channel = channel.slice(channel.indexOf("}") + 1);
}

const channelParts = channel.split(":").map(decodeRedisKey);
if (channelParts.length !== 2) {
return;
Expand All @@ -152,7 +132,6 @@ export class Activity<ScheduleType extends string> implements Closable {
await this.onEvent({
type: "scheduled",
job: {
tenant,
id,
queue,
payload,
Expand All @@ -173,15 +152,13 @@ export class Activity<ScheduleType extends string> implements Closable {
const [runDate] = args;
await this.onEvent({
type: "rescheduled",
tenant,
id,
queue,
runAt: new Date(+runDate),
});
} else {
await this.onEvent({
type,
tenant,
id,
queue,
});
Expand Down
12 changes: 0 additions & 12 deletions src/encodeRedisKey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,3 @@ export function encodeRedisKey(decoded: string): string {
export function decodeRedisKey(encoded: string): string {
return encoded.replace(/%3A/g, ":");
}

export function tenantToRedisPrefix(tenant: string) {
if (tenant.includes("{") || tenant.includes("}")) {
throw new Error("tenant shall not include {}!");
}

if (tenant === "") {
return "";
}

return "{" + tenant + "}";
}
2 changes: 0 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,10 @@ export default class Owl<ScheduleType extends string> {
}

public createActivity(
tenant: string,
onEvent: OnActivity,
options: SubscriptionOptions = {}
) {
return new Activity<ScheduleType>(
tenant,
this.redisFactory,
onEvent,
options
Expand Down
12 changes: 5 additions & 7 deletions src/producer/delete.lua
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
-- Checks if a specified job exists and deletes it.

local tenantPrefix = KEYS[1]

local jobId = ARGV[1]
local jobQueue = ARGV[2]

local FOUND_AND_DELETED = 0
local NOT_FOUND = 1

if redis.call("DEL", tenantPrefix .. "jobs:" .. jobQueue .. ":" .. jobId) == 0 then
if redis.call("DEL", "jobs:" .. jobQueue .. ":" .. jobId) == 0 then
return NOT_FOUND
end

redis.call("SREM", tenantPrefix .. "queues:" .. jobQueue, jobId)
redis.call("ZREM", tenantPrefix .. "queue", jobQueue .. ":" .. jobId)
redis.call("SREM", "queues:" .. jobQueue, jobId)
redis.call("ZREM", "queue", jobQueue .. ":" .. jobId)

redis.call("PUBLISH", tenantPrefix .. jobQueue .. ":" .. jobId, "deleted")
redis.call("PUBLISH", tenantPrefix .. "deleted", jobQueue .. ":" .. jobId)
redis.call("PUBLISH", jobQueue .. ":" .. jobId, "deleted")
redis.call("PUBLISH", "deleted", jobQueue .. ":" .. jobId)

return FOUND_AND_DELETED
8 changes: 3 additions & 5 deletions src/producer/invoke.lua
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
-- Checks if a specified job exists and invokes it immediately.

local tenantPrefix = KEYS[1]

local jobId = ARGV[1]
local jobQueue = ARGV[2]
local newRunAt = ARGV[3]

local FOUND_AND_INVOKED = 0
local NOT_FOUND = 1

local updatedJobs = redis.call("ZADD", tenantPrefix .. "queue", "XX", "CH", newRunAt, jobQueue .. ":" .. jobId)
local updatedJobs = redis.call("ZADD", "queue", "XX", "CH", newRunAt, jobQueue .. ":" .. jobId)

if updatedJobs == 0 then
return NOT_FOUND
end

redis.call("PUBLISH", tenantPrefix .. jobQueue .. ":" .. jobId, "invoked")
redis.call("PUBLISH", tenantPrefix .. "invoked", jobQueue .. ":" .. jobId)
redis.call("PUBLISH", jobQueue .. ":" .. jobId, "invoked")
redis.call("PUBLISH", "invoked", jobQueue .. ":" .. jobId)

return FOUND_AND_INVOKED
31 changes: 9 additions & 22 deletions src/producer/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { StaleChecker, StaleCheckerConfig } from "../shared/stale-checker";
import {
decodeRedisKey,
encodeRedisKey,
tenantToRedisPrefix,
} from "../encodeRedisKey";
import { defineLocalCommands } from "../redis-commands";
import type { Logger } from "pino";
Expand All @@ -15,7 +14,6 @@ import { ScheduleMap } from "..";
declare module "ioredis" {
interface Commands {
schedule(
tenantPrefix: string,
id: string,
queue: string,
payload: string,
Expand All @@ -28,10 +26,9 @@ declare module "ioredis" {
retryIntervals: string
): Promise<0 | 1>;

delete(tenantPrefix: string, id: string, queue: string): Promise<0 | 1>;
delete(id: string, queue: string): Promise<0 | 1>;

invoke(
tenantPrefix: string,
id: string,
queue: string,
newRunAt: number
Expand Down Expand Up @@ -88,7 +85,6 @@ export class Producer<ScheduleType extends string> implements Closable {
}

await this.redis.schedule(
tenantToRedisPrefix(job.tenant),
encodeRedisKey(job.id),
encodeRedisKey(job.queue),
job.payload,
Expand All @@ -103,7 +99,6 @@ export class Producer<ScheduleType extends string> implements Closable {
this.logger?.debug({ job }, "Producer: Enqueued");

return {
tenant: job.tenant,
id: job.id,
queue: job.queue,
count: 1,
Expand All @@ -116,13 +111,12 @@ export class Producer<ScheduleType extends string> implements Closable {
}

public async scanQueue(
tenant: string,
queue: string,
cursor: number = 0,
count = 100
): Promise<{ newCursor: number; jobs: Job<ScheduleType>[] }> {
const [newCursor, jobIds] = await this.redis.sscan(
tenantToRedisPrefix(tenant) + `queues:${encodeRedisKey(queue)}`,
`queues:${encodeRedisKey(queue)}`,
cursor,
"COUNT",
count
Expand All @@ -132,23 +126,21 @@ export class Producer<ScheduleType extends string> implements Closable {
newCursor: +newCursor,
jobs: (
await this.findJobs(
tenant,
jobIds.map(decodeRedisKey).map((id) => ({ id, queue }))
)
).filter((j) => !!j) as Job<ScheduleType>[],
};
}

public async scanQueuePattern(
tenant: string,
queuePattern: string,
cursor: number = 0,
count = 100
): Promise<{ newCursor: number; jobs: Job<ScheduleType>[] }> {
const [newCursor, jobIdKeys] = await this.redis.scan(
cursor,
"MATCH",
tenantToRedisPrefix(tenant) + `jobs:${encodeRedisKey(queuePattern)}:*`,
`jobs:${encodeRedisKey(queuePattern)}:*`,
"COUNT",
count
);
Expand All @@ -160,26 +152,25 @@ export class Producer<ScheduleType extends string> implements Closable {

return {
newCursor: +newCursor,
jobs: (await this.findJobs(tenant, jobIds)).filter(
jobs: (await this.findJobs(jobIds)).filter(
(j) => !!j
) as Job<ScheduleType>[],
};
}

public async findJobs(
tenant: string,
ids: { queue: string; id: string }[]
): Promise<(Job<ScheduleType> | null)[]> {
const pipeline = this.redis.pipeline();

for (const { queue, id } of ids) {
pipeline.hgetall(
`${tenantToRedisPrefix(tenant)}jobs:${encodeRedisKey(
`jobs:${encodeRedisKey(
queue
)}:${encodeRedisKey(id)}`
);
pipeline.zscore(
tenantToRedisPrefix(tenant) + "queue",
"queue",
`${encodeRedisKey(queue)}:${encodeRedisKey(id)}`
);
}
Expand Down Expand Up @@ -220,7 +211,6 @@ export class Producer<ScheduleType extends string> implements Closable {
const runAt = +zscoreResult;

jobResults.push({
tenant,
id,
queue,
payload,
Expand All @@ -242,17 +232,15 @@ export class Producer<ScheduleType extends string> implements Closable {
}

public async findById(
tenant: string,
queue: string,
id: string
): Promise<Job<ScheduleType> | null> {
const [job] = await this.findJobs(tenant, [{ id, queue }]);
const [job] = await this.findJobs([{ id, queue }]);
return job;
}

public async delete(tenant: string, queue: string, id: string) {
public async delete(queue: string, id: string) {
const result = await this.redis.delete(
tenantToRedisPrefix(tenant),
encodeRedisKey(id),
encodeRedisKey(queue)
);
Expand All @@ -265,9 +253,8 @@ export class Producer<ScheduleType extends string> implements Closable {
}
}

public async invoke(tenant: string, queue: string, id: string) {
public async invoke(queue: string, id: string) {
const result = await this.redis.invoke(
tenantToRedisPrefix(tenant),
encodeRedisKey(id),
encodeRedisKey(queue),
Date.now()
Expand Down
12 changes: 5 additions & 7 deletions src/producer/schedule.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
-- Adds the job's data to the job table and schedules it.

local tenantPrefix = KEYS[1]

local jobId = ARGV[1]
local jobQueue = ARGV[2]
local payload = ARGV[3]
Expand All @@ -16,7 +14,7 @@ local retryIntervals = ARGV[10] -- as JSON array
local SCHEDULED = 0
local ID_ALREADY_EXISTS = 1

local jobTableJobKey = tenantPrefix .. "jobs:" .. jobQueue .. ":" .. jobId
local jobTableJobKey = "jobs:" .. jobQueue .. ":" .. jobId

if not override then
if redis.call("EXISTS", jobTableJobKey) == 1 then
Expand All @@ -37,11 +35,11 @@ redis.call(
"retry", retryIntervals
)

redis.call("SADD", tenantPrefix .. "queues:" .. jobQueue, jobId)
redis.call("SADD", "queues:" .. jobQueue, jobId)

redis.call("ZADD", tenantPrefix .. "queue", scheduledExecutionDate, jobQueue .. ":" .. jobId)
redis.call("ZADD", "queue", scheduledExecutionDate, jobQueue .. ":" .. jobId)

redis.call("PUBLISH", tenantPrefix .. jobQueue .. ":" .. jobId, "scheduled" .. ":" .. scheduledExecutionDate .. ":" .. scheduleType .. ":" .. scheduleMeta .. ":" .. maximumExecutionTimes .. ":" .. exclusive .. ":" .. count .. ":" .. retryIntervals .. ":" .. payload)
redis.call("PUBLISH", tenantPrefix .. "scheduled", jobQueue .. ":" .. jobId)
redis.call("PUBLISH", jobQueue .. ":" .. jobId, "scheduled" .. ":" .. scheduledExecutionDate .. ":" .. scheduleType .. ":" .. scheduleMeta .. ":" .. maximumExecutionTimes .. ":" .. exclusive .. ":" .. count .. ":" .. retryIntervals .. ":" .. payload)
redis.call("PUBLISH", "scheduled", jobQueue .. ":" .. jobId)

return SCHEDULED
Loading