Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate message queue type #10040

Merged
merged 5 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:

- name: Start worker
run: |
npx nx run twenty-server:worker:ci &
npx nx run twenty-server:worker &
echo "Worker started"

- name: Run Playwright tests
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/ci-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ jobs:
npx nx run twenty-server:database:init:prod
npx nx run twenty-server:database:migrate:prod
- name: Worker / Run
run: npx nx run twenty-server:worker:ci
run: |
timeout 30s npx nx run twenty-server:worker || exit_code=$?
if [ $exit_code -eq 124 ]; then
# If timeout was reached (exit code 124), consider it a success
exit 0
elif [ $exit_code -ne 0 ]; then
# If worker failed for other reasons, fail the build
exit $exit_code
fi
- name: Server / Check for Pending Migrations
run: |
METADATA_MIGRATION_OUTPUT=$(npx nx run twenty-server:typeorm migration:generate metadata-migration-check -d src/database/typeorm/metadata/metadata.datasource.ts || true)
Expand Down
2 changes: 0 additions & 2 deletions packages/twenty-docker/k8s/manifests/deployment-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ spec:
value: "false"
- name: STORAGE_TYPE
value: "local"
- name: "MESSAGE_QUEUE_TYPE"
value: "bull-mq"
- name: "ACCESS_TOKEN_EXPIRES_IN"
value: "7d"
- name: "LOGIN_TOKEN_EXPIRES_IN"
Expand Down
4 changes: 0 additions & 4 deletions packages/twenty-docker/k8s/manifests/deployment-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ spec:
value: "false" # it already runs on the server
- name: STORAGE_TYPE
value: "local"
- name: "MESSAGE_QUEUE_TYPE"
value: "bull-mq"
- name: "CACHE_STORAGE_TYPE"
value: "redis"
- name: "REDIS_URL"
value: "redis://twentycrm-redis.twentycrm.svc.cluster.local:6379"
- name: APP_SECRET
Expand Down
4 changes: 0 additions & 4 deletions packages/twenty-docker/k8s/terraform/deployment-server.tf
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ resource "kubernetes_deployment" "twentycrm_server" {
name = "STORAGE_TYPE"
value = "local"
}
env {
name = "MESSAGE_QUEUE_TYPE"
value = "bull-mq"
}
env {
name = "ACCESS_TOKEN_EXPIRES_IN"
value = "7d"
Expand Down
9 changes: 0 additions & 9 deletions packages/twenty-docker/k8s/terraform/deployment-worker.tf
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ resource "kubernetes_deployment" "twentycrm_worker" {
value = "postgres://twenty:${var.twentycrm_pgdb_admin_password}@${kubernetes_service.twentycrm_db.metadata.0.name}.${kubernetes_namespace.twentycrm.metadata.0.name}.svc.cluster.local/default"
}

env {
name = "CACHE_STORAGE_TYPE"
value = "redis"
}

env {
name = "REDIS_URL"
value = "redis://${kubernetes_service.twentycrm_redis.metadata.0.name}.${kubernetes_namespace.twentycrm.metadata.0.name}.svc.cluster.local:6379"
Expand All @@ -67,10 +62,6 @@ resource "kubernetes_deployment" "twentycrm_worker" {
name = "STORAGE_TYPE"
value = "local"
}
env {
name = "MESSAGE_QUEUE_TYPE"
value = "bull-mq"
}

env {
name = "APP_SECRET"
Expand Down
1 change: 0 additions & 1 deletion packages/twenty-server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ FRONT_PORT=3001
# SENTRY_DSN=https://xxx@xxx.ingest.sentry.io/xxx
# SENTRY_FRONT_DSN=https://xxx@xxx.ingest.sentry.io/xxx
# LOG_LEVELS=error,warn
# MESSAGE_QUEUE_TYPE=bull-mq
# DEMO_WORKSPACE_IDS=REPLACE_ME_WITH_A_RANDOM_UUID
# SERVER_URL=http://localhost:3000
# WORKSPACE_INACTIVE_DAYS_BEFORE_NOTIFICATION=30
Expand Down
2 changes: 0 additions & 2 deletions packages/twenty-server/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ EXCEPTION_HANDLER_DRIVER=console
SENTRY_DSN=https://ba869cb8fd72d5faeb6643560939cee0@o4505516959793152.ingest.sentry.io/4506660900306944
DEMO_WORKSPACE_IDS=63db4589-590f-42b3-bdf1-85268b3da02f,8de58f3f-7e86-4a0b-998d-b2cbe314ee3a,4d957b72-0b37-4bad-9468-8dc828ee082d,daa0b739-269e-49b6-9be5-5f0215941489,59c15f6a-909a-4495-9cf4-3ce1b0abbb6a,7202cc9d-92da-4b52-a323-d29d38cd3b4e,5f071b0d-646b-411a-94f1-5d9ba9d5c6ac,7bc10973-897b-4767-ab2f-35cdac3b2aec,4b3ba0be-2d29-4b1e-be66-8ac7eb65d000,edfb500d-cc4e-4f22-8e2b-f139a9758a68,eee459c9-9057-4459-ae0d-d51d14c01635,3dd2f505-0075-4217-ba33-fc4244aeaaa9,3d1a9165-3f3f-494e-a99d-f858eae95144,84db6ded-cfce-4aee-9160-6553b05c8143,96fb1540-269b-4d13-af21-2a8268eff8ca,b2463e69-d121-4ea5-80c9-bba82403e93e,5af30c15-867d-49ed-b939-d4856bed8514,b5677aa1-68fa-4818-aaaa-434a07ae2ed4,1ec7fa9a-d6bf-4fa2-a753-9a235d75ee3f,753a6fa2-df27-4c87-8c90-4da78fcb30dd,2138f2f2-bbe9-41df-b483-687a9075f94e,a885cfef-4636-4c3a-9788-1ff6e6b92df5,5458f7fb-9431-47a2-b7a0-32f31d115e23,6c09929f-11c3-4f92-9508-aa0e6b934d1e,57ae0a2c-7a4e-4c7d-8f43-68548e7f1206,cc7f0b85-6868-4c2d-85c5-3ce9977ea346,21871a7f-f067-45ea-989e-44339bb5ad07,c3efedab-84f5-4656-8297-55964b3d26cb,647dcdd1-4540-4003-9f58-fd84d4d759b7,fc5e6857-8d67-47b8-98f2-edeb0671e326,1ad8d72c-1826-40ed-8b44-d15a1d2aab70,eac6c90a-d25d-4c8c-a053-cfbc7cde0afb,023a70de-a85e-43fc-bbc6-757fbf6562f0,f3f0a7fb-1409-443b-8e39-4e58e628796e,62828804-97d4-40ec-82fa-2992a6ce4a81,af5441fe-b16f-4996-87f4-1a433ec53dd6,e8857860-f7b1-4478-9741-1eb9e7c11f2c,6bca9c44-c8c0-49f8-b0b5-1bb2ca7842b8,d50da092-09df-448f-84ea-3ebddfe1d9f6,9efd5d6d-db64-47d4-9ad3-5e4d8b65ff7f,6f089094-2dd2-4b0e-b5b7-8bb52b93ea8e,299b0822-68e9-4bfa-af35-da799012e80e,a3dd579c-93be-45a0-ad35-f518d8ed45dd,023b1b3e-4891-4061-aae0-f34368644f40,50174445-33c5-4482-bb8c-3ef6c511c8cd,9933c048-07a7-4735-9af2-940c2f9b6683,beadc568-3962-46bd-ad4d-06e23b37615b,0cdafc9f-d4c1-4576-837e-d7f6ec28643d,50bb24ce-1709-4928-a87b-d9d9e147a2ab,7690ed72-910d-4357-8e0e-17aa702b0b94,1ad0d69f-60fa-414f-bf79-4f94c2abba43,946d84a4-db4d-48cb-a5d3-03081b5c7e8e,1a080055-d2bf-4b14-8957-88a7d08769b8,ed343e38-e405-4fae-9486-27b09c98bdad,c8bdef75-a98c-4646-a372-3251340d2dea,87a8c6fa-f93e-4950-aff2-5f956ca1a6ba,604781ba-23c2-4220-a717-b5615431fcd9,31af6841-ad9f-4f28-a637-b5c5e6589447,cf067451-7b88-4ff2-a96d-3fc9c5d6fea0,26a8ad5e-29d9-4e7d-aa1f-e6221e8ea32a,fd14db29-e4df-44a7-9b3f-d00384458122,73b477a8-fcf4-4860-a685-65a0a79b8653,82e0f305-4c6c-4160-be1d-b0de834124e6,e38567ab-a6e2-4a94-99c5-a7db31c0aae8,faf3d6dc-66ff-4c1b-9658-f65a9cd9fcf1,6df6bb90-200e-4290-b73d-9bb374554229,2ff10cf4-a871-404a-9e7b-5ca7a232567e,06c614e2-0f36-4b72-8c82-59631680add2,5e508c81-3453-4185-ae8c-4c9b841f8c15,21b5c371-6010-4b1b-be67-7538eb877efb,54e61442-e291-4eea-8d49-7f11b5f85bd2,b6b7260a-4eea-40b0-9f7f-1dfd4c3cc7a8,e163fe76-30fb-44fb-b51a-50cc78745a21,4da672f2-29b4-4a98-b27c-b39a4aecc858,2fdb0601-c882-4aaf-ad49-ae17e530d47a,49525e1b-1b47-4545-a98c-0ba58778179f,f958ab32-b152-4004-9228-18148f7380f1,0ff5025a-62cd-4a10-a722-79f7cf360f01,642df445-e314-409a-a97d-64fc2aa2a15e,38b0dab5-d4fb-44f9-8cf9-bb35cf82e91d,62054133-f35a-4f64-a2ee-a31e48952835,536dbe8c-af33-4eab-a0a8-8d039a00db40,a04998ba-52c9-4538-b6d9-6d04408dbaf2,89016c7a-3d36-4619-a5c6-4f31795eebf7,7708b9a9-776c-46fc-94a4-dc28e7880958,5c92bc69-b328-4c66-a791-a05dbaf7a6f8,ad580a50-80b4-44be-9bc4-f2b57cd23207,36c0241c-891e-4b74-bd10-5e99df96bbc8,a96842ff-18be-4536-a23d-20d973d91621,0ea549b0-9558-4bdf-9944-5abc707c7660,0186c353-5ed2-4c94-b71a-fc0b48c90288,1508a165-2217-4911-b31c-1ea42a08f097,1731e392-dfdf-4fc4-863b-27ae62b0e374,0b245cea-96a6-4a3a-af6a-ef43496c239c,a844e208-7078-43a2-8bd0-86f31498cd3f,53d112b5-87f2-490b-a788-df1f4624f9ad,0d5794d4-3a52-482b-9a6a-f8185018bad1,df877aa6-231c-47fb-9be0-906e61677356,c56c6d1a-3418-49d2-82ce-bd9370668043,6e0b6f34-3cd0-4aa0-ae1f-25f5545dca68
MUTATION_MAXIMUM_RECORD_AFFECTED=100
MESSAGE_QUEUE_TYPE=bull-mq
CACHE_STORAGE_TYPE=redis

AUTH_GOOGLE_ENABLED=false
MESSAGING_PROVIDER_GMAIL_ENABLED=false
Expand Down
8 changes: 0 additions & 8 deletions packages/twenty-server/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,6 @@
"options": {
"cwd": "packages/twenty-server",
"command": "node dist/src/queue-worker/queue-worker.js"
},
"configurations": {
"ci": {
"env": {
"MESSAGE_QUEUE_TYPE": "sync",
"CACHE_STORAGE_TYPE": "memory"
}
}
}
},
"typeorm": {
Expand Down
4 changes: 2 additions & 2 deletions packages/twenty-server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import { GraphQLConfigModule } from 'src/engine/api/graphql/graphql-config/graph
import { GraphQLConfigService } from 'src/engine/api/graphql/graphql-config/graphql-config.service';
import { MetadataGraphQLApiModule } from 'src/engine/api/graphql/metadata-graphql-api.module';
import { RestApiModule } from 'src/engine/api/rest/rest-api.module';
import { MessageQueueDriverType } from 'src/engine/core-modules/message-queue/interfaces';
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-source.module';
import { WorkspaceMetadataCacheModule } from 'src/engine/metadata-modules/workspace-metadata-cache/workspace-metadata-cache.module';
import { GraphQLHydrateRequestFromTokenMiddleware } from 'src/engine/middlewares/graphql-hydrate-request-from-token.middleware';
Expand Down Expand Up @@ -89,9 +87,11 @@ export class AppModule {
// Messaque Queue explorer only for sync driver
// Maybe we don't need to conditionaly register the explorer, because we're creating a jobs module
// that will expose classes that are only used in the queue worker
/*
if (process.env.MESSAGE_QUEUE_TYPE === MessageQueueDriverType.Sync) {
modules.push(MessageQueueModule.registerExplorer());
}
*/

return modules;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import { EnvironmentService } from 'src/engine/core-modules/environment/environm
export const cacheStorageModuleFactory = (
environmentService: EnvironmentService,
): CacheModuleOptions => {
const cacheStorageType = environmentService.get('CACHE_STORAGE_TYPE');
const cacheStorageType = CacheStorageType.Redis;
const cacheStorageTtl = environmentService.get('CACHE_STORAGE_TTL');
const cacheModuleOptions: CacheModuleOptions = {
isGlobal: true,
ttl: cacheStorageTtl * 1000,
};

switch (cacheStorageType) {
case CacheStorageType.Memory: {
/* case CacheStorageType.Memory: {
return cacheModuleOptions;
}
}*/
case CacheStorageType.Redis: {
const redisUrl = environmentService.get('REDIS_URL');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { SupportDriver } from 'src/engine/core-modules/environment/interfaces/su
import { LLMChatModelDriver } from 'src/engine/core-modules/llm-chat-model/interfaces/llm-chat-model.interface';
import { LLMTracingDriver } from 'src/engine/core-modules/llm-tracing/interfaces/llm-tracing.interface';

import { CacheStorageType } from 'src/engine/core-modules/cache-storage/types/cache-storage-type.enum';
import { CaptchaDriverType } from 'src/engine/core-modules/captcha/interfaces';
import { CastToBoolean } from 'src/engine/core-modules/environment/decorators/cast-to-boolean.decorator';
import { CastToLogLevelArray } from 'src/engine/core-modules/environment/decorators/cast-to-log-level-array.decorator';
Expand All @@ -37,7 +36,6 @@ import { EnvironmentVariablesSubGroup } from 'src/engine/core-modules/environmen
import { ExceptionHandlerDriver } from 'src/engine/core-modules/exception-handler/interfaces';
import { StorageDriverType } from 'src/engine/core-modules/file-storage/interfaces';
import { LoggerDriverType } from 'src/engine/core-modules/logger/interfaces';
import { MessageQueueDriverType } from 'src/engine/core-modules/message-queue/interfaces';
import { ServerlessDriverType } from 'src/engine/core-modules/serverless/serverless.interface';
import { assert } from 'src/utils/assert';

Expand Down Expand Up @@ -760,12 +758,6 @@ export class EnvironmentVariables {
@IsOptional()
PG_SSL_ALLOW_SELF_SIGNED = false;

@EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.Cache,
description: 'Cache storage type',
})
CACHE_STORAGE_TYPE: CacheStorageType = CacheStorageType.Redis;

@EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.Cache,
description: 'Cache storage TTL',
Expand All @@ -779,11 +771,6 @@ export class EnvironmentVariables {
description: 'Cache storage URL',
})
@IsOptional()
@ValidateIf(
(env) =>
env.CACHE_STORAGE_TYPE === CacheStorageType.Redis ||
env.MESSAGE_QUEUE_TYPE === MessageQueueDriverType.BullMQ,
)
@IsUrl({
protocols: ['redis'],
require_tld: false,
Expand Down Expand Up @@ -1005,12 +992,6 @@ export class EnvironmentVariables {
@ValidateIf((env) => env.MAX_NUMBER_OF_WORKSPACES_DELETED_PER_EXECUTION > 0)
MAX_NUMBER_OF_WORKSPACES_DELETED_PER_EXECUTION = 5;

@EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.QueueConfig,
description: 'Queue driver type',
})
MESSAGE_QUEUE_TYPE: string = MessageQueueDriverType.BullMQ;

@EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.QueueConfig,
description: 'Workflow execution throttle limit',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import {
BullMQDriverFactoryOptions,
MessageQueueDriverType,
MessageQueueModuleOptions,
PgBossDriverFactoryOptions,
SyncDriverFactoryOptions,
} from 'src/engine/core-modules/message-queue/interfaces';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';

Expand All @@ -17,9 +15,10 @@ export const messageQueueModuleFactory = async (
environmentService: EnvironmentService,
redisClientService: RedisClientService,
): Promise<MessageQueueModuleOptions> => {
const driverType = environmentService.get('MESSAGE_QUEUE_TYPE');
const driverType = MessageQueueDriverType.BullMQ;

switch (driverType) {
/*
case MessageQueueDriverType.Sync: {
return {
type: MessageQueueDriverType.Sync,
Expand All @@ -35,7 +34,7 @@ export const messageQueueModuleFactory = async (
connectionString,
},
} satisfies PgBossDriverFactoryOptions;
}
}*/
case MessageQueueDriverType.BullMQ: {
return {
type: MessageQueueDriverType.BullMQ,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import { Logger } from '@nestjs/common';

import { createClient } from 'redis';
import RedisStore from 'connect-redis';
import session from 'express-session';
import { createClient } from 'redis';

import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { CacheStorageType } from 'src/engine/core-modules/cache-storage/types/cache-storage-type.enum';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';

export const getSessionStorageOptions = (
environmentService: EnvironmentService,
): session.SessionOptions => {
const cacheStorageType = environmentService.get('CACHE_STORAGE_TYPE');
const cacheStorageType = CacheStorageType.Redis;

const SERVER_URL = environmentService.get('SERVER_URL');

Expand All @@ -26,13 +24,13 @@ export const getSessionStorageOptions = (
};

switch (cacheStorageType) {
case CacheStorageType.Memory: {
/* case CacheStorageType.Memory: {
Logger.warn(
'Memory session storage is not recommended for production. Prefer Redis.',
);

return sessionStorage;
}
}*/
case CacheStorageType.Redis: {
const connectionString = environmentService.get('REDIS_URL');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ image: /images/user-guide/emails/emails_header.png
Queues facilitate async operations to be performed. They can be used for performing background tasks such as sending a welcome email on register.
Each use case will have its own queue class extended from `MessageQueueServiceBase`.

Currently, queue supports two drivers which can be configured by env variable `MESSAGE_QUEUE_TYPE`.
1. `bull-mq`: this is the default driver, which uses [bull-mq](https://bullmq.io/) under the hood.
2. `pg-boss`: this uses [pg-boss](https://github.com/timgit/pg-boss) under the hood.
Currently, we only support `bull-mq`[bull-mq](https://bullmq.io/) as the queue driver.

## Steps to create and use a new queue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ yarn command:prod cron:calendar:ongoing-stale
['FRONT_PROTOCOL', 'http', 'protocol of the frontend server. Could be `http` or `https`'],
['FRONT_PORT', '3001', 'Port of the frontend server.'],
['PORT', '3000', 'Port of the backend server'],
['CACHE_STORAGE_TYPE', 'redis', 'Cache type (memory, redis...)'],
['CACHE_STORAGE_TTL', '3600 * 24 * 7', 'Cache TTL in seconds']
]}></ArticleTable>

Expand Down Expand Up @@ -276,12 +275,6 @@ yarn command:prod cron:calendar:ongoing-stale
['SERVERLESS_LAMBDA_SECRET_ACCESS_KEY', '', 'Optional depending on the authentication method'],
]}></ArticleTable>

### Message Queue

<ArticleTable options={[
['MESSAGE_QUEUE_TYPE', 'bull-mq', "Queue driver: 'bull-mq'"],
]}></ArticleTable>

### Logging

<ArticleTable options={[
Expand Down
Loading