Skip to content

Commit

Permalink
feat(be): ⚡ let send mail work with queue
Browse files Browse the repository at this point in the history
  • Loading branch information
lehuygiang28 committed Aug 22, 2024
1 parent 354fd7b commit 501f7dd
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 36 deletions.
15 changes: 11 additions & 4 deletions apps/be/common/src/mail/mail.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import { Processor, WorkerHost } from '@nestjs/bullmq';
import { PinoLogger } from 'nestjs-pino';

import { BULLMQ_BG_JOB_QUEUE } from '~be/common/bullmq';
import { MailService } from '~be/common/mail';
import { MailService, NotifyStopTaskOptions } from '~be/common/mail';
import { ModuleRef } from '@nestjs/core';

export type MailJobName = 'sendEmailRegister' | 'sendEmailLogin';
export type MailJobName = 'sendEmailRegister' | 'sendEmailLogin' | 'notifyStopTask';

@Processor(BULLMQ_BG_JOB_QUEUE, {
useWorkerThreads: true,
Expand Down Expand Up @@ -38,12 +38,15 @@ export class MailProcessor extends WorkerHost implements OnModuleInit {
return this.sendEmailRegister(job);
case 'sendEmailLogin':
return this.sendEmailLogin(job);
case 'notifyStopTask': {
return this.notifyStopTask(job as Job<NotifyStopTaskOptions>);
}
default:
return;
}
}

async sendEmailRegister(job: Job<unknown, unknown, MailJobName>): Promise<unknown> {
private async sendEmailRegister(job: Job<unknown, unknown, MailJobName>): Promise<unknown> {
const { email, url } = job.data as { email: string; url: string };
return this.mailService.sendConfirmMail({
to: email,
Expand All @@ -53,7 +56,7 @@ export class MailProcessor extends WorkerHost implements OnModuleInit {
});
}

async sendEmailLogin(job: Job<unknown, unknown, MailJobName>): Promise<unknown> {
private async sendEmailLogin(job: Job<unknown, unknown, MailJobName>): Promise<unknown> {
const { email, url } = job.data as { email: string; url: string };
return this.mailService.sendLogin({
to: email,
Expand All @@ -62,4 +65,8 @@ export class MailProcessor extends WorkerHost implements OnModuleInit {
},
});
}

private async notifyStopTask(job: Job<NotifyStopTaskOptions>) {
return this.mailService.notifyStopTask(job.data);
}
}
16 changes: 10 additions & 6 deletions apps/be/common/src/mail/mail.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import { SENDGRID_TRANSPORT } from './mail.constant';

import { ErrorNotificationEnum } from '~be/app/tasks/enums';

export type NotifyStopTaskOptions = {
to: string;
data: {
url: string;
};
typeErr: ErrorNotificationEnum;
};

@Injectable()
export class MailService {
private readonly TRANSPORTERS: string[] = [];
Expand Down Expand Up @@ -157,11 +165,7 @@ export class MailService {
});
}

async notifyStopTask(
data: { to: string; mailData: { url: string } },
typeErr: ErrorNotificationEnum,
) {
const { to, mailData } = data;
async notifyStopTask({ to, data, typeErr }: NotifyStopTaskOptions) {
const template = 'stop-task-notify';

let [stopTaskTitle, text1, text2, btn1]: MaybeType<string>[] = await Promise.all([
Expand Down Expand Up @@ -196,7 +200,7 @@ export class MailService {
to,
subject: stopTaskTitle,
template,
context: { title: stopTaskTitle, url: mailData.url, text1, text2, btn1 },
context: { title: stopTaskTitle, url: data.url, text1, text2, btn1 },
});
}
}
99 changes: 75 additions & 24 deletions apps/be/src/app/tasks/processors/task.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Job, Queue } from 'bullmq';
import { type Timings } from '@szmarczak/http-timer';

import {
BULLMQ_BG_JOB_QUEUE,
BULLMQ_DISCORD_QUEUE,
BULLMQ_TASK_QUEUE,
InjectQueueDecorator,
Expand All @@ -17,7 +18,7 @@ import { CreateTaskLogDto, TaskLogJobName } from '~be/app/task-logs';
import { defaultHeaders } from '~be/common/axios';
import { isTrueSet, normalizeHeaders, isNullOrUndefined } from '~be/common/utils';
import { RedisService } from '~be/common/redis/services';
import { MailService } from '~be/common/mail';
import { MailJobName, NotifyStopTaskOptions } from '~be/common/mail';
import { UsersService } from '~be/app/users';
import { AllConfig } from '~be/app/config';

Expand All @@ -44,10 +45,11 @@ export class TaskProcessor extends WorkerHost implements OnModuleInit {
private readonly redisService: RedisService,
private readonly taskService: TasksService,
private readonly configService: ConfigService<AllConfig>,
private readonly mailService: MailService,
private readonly usersService: UsersService,
@InjectQueueDecorator(BULLMQ_DISCORD_QUEUE)
private readonly discordQueue: Queue<unknown, unknown, DiscordJobName>,
@InjectQueueDecorator(BULLMQ_BG_JOB_QUEUE)
private readonly bgQueue: Queue<unknown, unknown, MailJobName>,
) {
super();
this.logger = new Logger(TaskProcessor.name);
Expand Down Expand Up @@ -325,32 +327,56 @@ export class TaskProcessor extends WorkerHost implements OnModuleInit {

if (!isNullOrUndefined(discordOptions?.dmUserId) && discordOptions?.dmUserId != '') {
promises.push(
this.discordQueue.add('sendDirectMessage', {
dmUserId: discordOptions.dmUserId,
message: discordMessage,
} satisfies SendDirectMessage),
this.discordQueue.add(
'sendDirectMessage',
{
dmUserId: discordOptions.dmUserId,
message: discordMessage,
} satisfies SendDirectMessage,
{
removeOnComplete: true,
removeOnFail: true,
attempts: 10,
},
),
);
}

if (!isNullOrUndefined(discordOptions?.channelId) && discordOptions?.channelId != '') {
promises.push(
this.discordQueue.add('sendMessage', {
channelId: discordOptions.channelId,
message: discordMessage,
} satisfies SendMessage),
this.discordQueue.add(
'sendMessage',
{
channelId: discordOptions.channelId,
message: discordMessage,
} satisfies SendMessage,
{
removeOnComplete: true,
removeOnFail: true,
attempts: 10,
backoff: { type: 'exponential', delay: 3000 },
},
),
);
}

if (isTrueSet(options?.alert?.alertOn?.email)) {
promises.push(
this.mailService.notifyStopTask(
this.bgQueue.add(
'notifyStopTask',
{
to: (await this.usersService.findById(job.data.userId.toString())).email,
mailData: {
data: {
url: taskUrl,
},
typeErr: ErrorNotificationEnum.disableByTooManyFailures,
} satisfies NotifyStopTaskOptions,
{
removeOnComplete: true,
removeOnFail: true,
attempts: 10,
backoff: { type: 'exponential', delay: 3000 },
},
ErrorNotificationEnum.disableByTooManyFailures,
),
);
}
Expand Down Expand Up @@ -391,32 +417,57 @@ export class TaskProcessor extends WorkerHost implements OnModuleInit {

if (!isNullOrUndefined(discordOptions?.dmUserId) && discordOptions?.dmUserId != '') {
promises.push(
this.discordQueue.add('sendDirectMessage', {
dmUserId: discordOptions.dmUserId,
message: discordMessage,
} satisfies SendDirectMessage),
this.discordQueue.add(
'sendDirectMessage',
{
dmUserId: discordOptions.dmUserId,
message: discordMessage,
} satisfies SendDirectMessage,
{
removeOnComplete: true,
removeOnFail: true,
attempts: 10,
backoff: { type: 'exponential', delay: 3000 },
},
),
);
}

if (!isNullOrUndefined(discordOptions?.channelId) && discordOptions?.channelId != '') {
promises.push(
this.discordQueue.add('sendMessage', {
message: discordMessage,
channelId: discordOptions.channelId,
} satisfies SendMessage),
this.discordQueue.add(
'sendMessage',
{
message: discordMessage,
channelId: discordOptions.channelId,
} satisfies SendMessage,
{
removeOnComplete: true,
removeOnFail: true,
attempts: 10,
backoff: { type: 'exponential', delay: 3000 },
},
),
);
}

if (isTrueSet(options?.alert?.alertOn?.email)) {
promises.push(
this.mailService.notifyStopTask(
this.bgQueue.add(
'notifyStopTask',
{
to: (await this.usersService.findById(job.data.userId.toString())).email,
mailData: {
data: {
url: taskUrl,
},
typeErr: ErrorNotificationEnum.jobExecutionFailed,
} satisfies NotifyStopTaskOptions,
{
removeOnComplete: true,
removeOnFail: true,
attempts: 10,
backoff: { type: 'exponential', delay: 3000 },
},
ErrorNotificationEnum.jobExecutionFailed,
),
);
}
Expand Down
3 changes: 3 additions & 0 deletions apps/be/src/app/tasks/services/task-scheduling.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export class TaskSchedulingService implements OnModuleInit {
jobId: task._id.toString(),
repeat: {
pattern: task.cron,
tz: task.timezone,
},
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -57,6 +58,7 @@ export class TaskSchedulingService implements OnModuleInit {
`fetch`,
{
pattern: task.cron,
tz: task.timezone,
},
taskIdString,
),
Expand All @@ -65,6 +67,7 @@ export class TaskSchedulingService implements OnModuleInit {
`fetch`,
{
pattern: cron,
tz: task.timezone,
},
taskIdString,
);
Expand Down
6 changes: 4 additions & 2 deletions apps/be/src/app/tasks/tasks.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import {
BULLMQ_TASK_LOG_QUEUE,
BULLMQ_CLEAR_TASK_QUEUE,
BULLMQ_RESTORE_TASK_FROM_DB_QUEUE,
BULLMQ_BG_JOB_QUEUE,
} from '~be/common/bullmq/bullmq.constant';
import { axiosConfig } from '~be/common/axios';
import { RedisModule } from '~be/common/redis';
import { MailModule } from '~be/common/mail';
import { UsersModule } from '~be/app/users';

import { Task, TaskSchema } from './schemas';
Expand All @@ -36,7 +36,6 @@ const importProviders = [
HttpModule.register(axiosConfig),
MongooseModule.forFeature([{ name: Task.name, schema: TaskSchema }]),
RedisModule,
MailModule,
UsersModule,
TaskLogsModule,
BullModule.registerQueue({
Expand All @@ -48,6 +47,9 @@ const importProviders = [
BullModule.registerQueue({
name: BULLMQ_CLEAR_TASK_QUEUE,
}),
BullModule.registerQueue({
name: BULLMQ_BG_JOB_QUEUE,
}),
ConditionalModule.registerWhen(
DiscordModule,
(env: NodeJS.ProcessEnv) =>
Expand Down

0 comments on commit 501f7dd

Please sign in to comment.