From da58d7c22242bab722bcae2fab409c3320e9debe Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Fri, 28 Jun 2024 16:39:42 +0530 Subject: [PATCH 1/8] feat: batch bulk issuance Signed-off-by: tipusinghaw --- apps/issuance/src/issuance.service.ts | 165 +++++++++++++++++++------- 1 file changed, 122 insertions(+), 43 deletions(-) diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 166cd948c..00f16b356 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -32,7 +32,7 @@ import { ICredentialOfferResponse, IDeletedIssuanceRecords, IIssuedCredential, I import { OOBIssueCredentialDto } from 'apps/api-gateway/src/issuance/dtos/issuance.dto'; import { RecordType, agent_invitations, organisation, user } from '@prisma/client'; import { createOobJsonldIssuancePayload, validateEmail } from '@credebl/common/cast.helper'; -import { sendEmail } from '@credebl/common/send-grid-helper-file'; +// import { sendEmail } from '@credebl/common/send-grid-helper-file'; import * as pLimit from 'p-limit'; import { UserActivityRepository } from 'libs/user-activity/repositories'; import { validateW3CSchemaAttributes } from '../libs/helpers/attributes.validator'; @@ -803,7 +803,10 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO disposition: 'attachment' } ]; - const isEmailSent = await sendEmail(this.emailData); + + const isEmailSent = true; // change this after testing on local + // const isEmailSent = await sendEmail(this.emailData); + this.logger.log(`isEmailSent ::: ${JSON.stringify(isEmailSent)}-${this.counter}`); this.counter++; if (!isEmailSent) { @@ -1157,17 +1160,112 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO return new Promise(resolve => setTimeout(resolve, ms)); } - async issueBulkCredential(requestId: string, orgId: string, clientDetails: IClientDetails, reqPayload: ImportFileDetails): Promise { + /** + * Processes bulk payload in batches and adds jobs to the queue. + * @param bulkPayload - Array of bulk payload data. + * @param clientDetails - Client details. + * @param orgId - Organization ID. + * @param requestId - Request ID. + */ + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type + private async processInBatches(bulkPayload, clientDetails, orgId, requestId) { + + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type + const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms)); + const batchSize = 1000; // Define the batch size + const uniqueJobId = uuidv4(); // Generate a unique job ID for the entire process + const limit = pLimit(1000); // Limit concurrent batch processing, adjust based on system capacity + + // Generator function to yield batches + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type + function* createBatches(array, size) { + for (let i = 0; i < array.length; i += size) { + yield array.slice(i, i + size); + } + } + + // Helper function to process a batch + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type + const processBatch = async (batch, batchIndex) => { + const queueJobsArray = batch.map((item) => ({ + data: { + id: item.id, + jobId: uniqueJobId, + cacheId: requestId, + clientId: clientDetails.clientId, + referenceId: item.referenceId, + fileUploadId: item.fileUploadId, + schemaLedgerId: item.schemaId, + credentialDefinitionId: item.credDefId, + status: item.status, + credential_data: item.credential_data, + orgId, + credentialType: item.credential_type, + totalJobs: bulkPayload.length, + isRetry: false, + isLastData: false + } + })); + + this.logger.log(`Processing batch ${batchIndex + 1} with ${batch.length} items.`); + + // Execute the batched jobs with limited concurrency + await Promise.all(queueJobsArray.map(job => limit(() => job))); + + return queueJobsArray; + }; + + let batchIndex = 0; + + for (const batch of createBatches(bulkPayload, batchSize)) { + const resolvedBatchJobs = await processBatch(batch, batchIndex); + + // Add the resolved jobs to the queue and clear the batch from memory + this.logger.log("Adding resolved jobs to the queue:", resolvedBatchJobs); + await this.bulkIssuanceQueue.addBulk(resolvedBatchJobs); + + batchIndex++; + + // Wait for 10 seconds before processing the next batch, if more batches are remaining + if ((batchIndex * batchSize) < bulkPayload.length) { + await delay(20000); + } + + // Optionally, trigger garbage collection to free up memory + if (global.gc) { + global.gc(); + } + } + } + + /** + * Handles bulk credential issuance. + * @param requestId - The request ID. + * @param orgId - The organization ID. + * @param clientDetails - Client details. + * @param reqPayload - Request payload containing file details. + * @returns A promise resolving to a success message. + */ + async issueBulkCredential( + requestId: string, + orgId: string, + clientDetails: IClientDetails, + reqPayload: ImportFileDetails + ): Promise { if (!requestId) { throw new BadRequestException(ResponseMessages.issuance.error.missingRequestId); } + const fileUpload: FileUpload = { lastChangedDateTime: null, upload_type: '', status: '', orgId: '', - createDateTime: null + createDateTime: null, + name: '', + credentialType: '' }; + let csvFileDetail; try { @@ -1175,7 +1273,8 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO if (!cachedData) { throw new BadRequestException(ResponseMessages.issuance.error.cacheTimeOut); } - //for demo UI + + // For demo UI if (cachedData && clientDetails?.isSelectiveIssuance) { await this.cacheManager.del(requestId); await this.uploadCSVTemplate(reqPayload, requestId); @@ -1198,16 +1297,15 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO fileUpload.createDateTime = new Date(); fileUpload.name = parsedFileDetails.fileName; fileUpload.credentialType = parsedFileDetails.credentialType; - - csvFileDetail = await this.issuanceRepository.saveFileUploadDetails(fileUpload, clientDetails.userId); + csvFileDetail = await this.issuanceRepository.saveFileUploadDetails(fileUpload, clientDetails.userId); - const bulkPayloadObject: IBulkPayloadObject = { - parsedData, - parsedFileDetails, - userId: clientDetails.userId, - fileUploadId: csvFileDetail.id - }; + const bulkPayloadObject: IBulkPayloadObject = { + parsedData, + parsedFileDetails, + userId: clientDetails.userId, + fileUploadId: csvFileDetail.id + }; const storeBulkPayload = await this._storeBulkPayloadInBatch(bulkPayloadObject); @@ -1215,41 +1313,22 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO throw new BadRequestException(ResponseMessages.issuance.error.storeBulkData); } - const bulkpayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id); - if (!bulkpayload) { + // Process in batches + const bulkPayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id); + if (!bulkPayload) { throw new BadRequestException(ResponseMessages.issuance.error.fileData); } - const uniqueJobId = uuidv4(); - const queueJobsArrayPromises = bulkpayload.map(async (item) => ({ - data: { - id: item.id, - jobId: uniqueJobId, - cacheId: requestId, - clientId: clientDetails.clientId, - referenceId: item.referenceId, - fileUploadId: item.fileUploadId, - schemaLedgerId: item.schemaId, - credentialDefinitionId: item.credDefId, - status: item.status, - credential_data: item.credential_data, - orgId, - credentialType: item.credential_type, - totalJobs: bulkpayload.length, - isRetry: false, - isLastData: false - } - })); - const queueJobsArray = await Promise.all(queueJobsArrayPromises); - try { - await this.bulkIssuanceQueue.addBulk(queueJobsArray); - } catch (error) { - this.logger.error(`Error processing issuance data: ${error}`); - } + try { + await this.processInBatches(bulkPayload, clientDetails, orgId, requestId); + } catch (error) { + this.logger.error(`Error processing issuance data: ${error}`); + } + return ResponseMessages.issuance.success.bulkProcess; } catch (error) { fileUpload.status = FileUploadStatus.interrupted; - this.logger.error(`error in issueBulkCredential : ${error}`); + this.logger.error(`Error in issueBulkCredential: ${error}`); throw new RpcException(error.response); } finally { if (csvFileDetail !== undefined && csvFileDetail.id !== undefined) { @@ -1330,7 +1409,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO fileUploadData.fileRow = JSON.stringify(jobDetails); fileUploadData.isError = false; fileUploadData.createDateTime = new Date(); - fileUploadData.referenceId = jobDetails.credential_data.email_identifier; + fileUploadData.referenceId = jobDetails?.credential_data?.email_identifier; fileUploadData.jobId = jobDetails.id; const { orgId } = jobDetails; From 40a7fc554e308ad1549bd6f99bef981037d66c5f Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Fri, 28 Jun 2024 16:56:50 +0530 Subject: [PATCH 2/8] stoped queue to remove issuance data Signed-off-by: tipusinghaw --- apps/issuance/src/issuance.processor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/issuance/src/issuance.processor.ts b/apps/issuance/src/issuance.processor.ts index be45d2a36..60dbf06f4 100644 --- a/apps/issuance/src/issuance.processor.ts +++ b/apps/issuance/src/issuance.processor.ts @@ -22,6 +22,6 @@ export class BulkIssuanceProcessor { `Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...` ); - this.issuanceService.processIssuanceData(job.data); + // this.issuanceService.processIssuanceData(job.data); } } From 68318b7077816f49ec44a7458b20b542a2709000 Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Mon, 1 Jul 2024 12:20:17 +0530 Subject: [PATCH 3/8] feat: added delay for 1 min Signed-off-by: tipusinghaw --- apps/issuance/src/issuance.processor.ts | 2 +- apps/issuance/src/issuance.service.ts | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/issuance/src/issuance.processor.ts b/apps/issuance/src/issuance.processor.ts index 60dbf06f4..be45d2a36 100644 --- a/apps/issuance/src/issuance.processor.ts +++ b/apps/issuance/src/issuance.processor.ts @@ -22,6 +22,6 @@ export class BulkIssuanceProcessor { `Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...` ); - // this.issuanceService.processIssuanceData(job.data); + this.issuanceService.processIssuanceData(job.data); } } diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 00f16b356..32c533748 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -1162,19 +1162,19 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO /** * Processes bulk payload in batches and adds jobs to the queue. - * @param bulkPayload - Array of bulk payload data. - * @param clientDetails - Client details. - * @param orgId - Organization ID. - * @param requestId - Request ID. + * @param bulkPayload + * @param clientDetails + * @param orgId + * @param requestId */ // eslint-disable-next-line @typescript-eslint/explicit-function-return-type private async processInBatches(bulkPayload, clientDetails, orgId, requestId) { // eslint-disable-next-line @typescript-eslint/explicit-function-return-type const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms)); - const batchSize = 1000; // Define the batch size - const uniqueJobId = uuidv4(); // Generate a unique job ID for the entire process - const limit = pLimit(1000); // Limit concurrent batch processing, adjust based on system capacity + const batchSize = 1000; + const uniqueJobId = uuidv4(); + const limit = pLimit(1000); //adjust based on system capacity // Generator function to yield batches // eslint-disable-next-line @typescript-eslint/explicit-function-return-type @@ -1226,9 +1226,9 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO batchIndex++; - // Wait for 10 seconds before processing the next batch, if more batches are remaining + // Wait for 60 seconds before processing the next batch, if more batches are remaining if ((batchIndex * batchSize) < bulkPayload.length) { - await delay(20000); + await delay(60000); } // Optionally, trigger garbage collection to free up memory From 5524cdab78eda04eff1865b79eabc4c87a80ba18 Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Mon, 1 Jul 2024 19:56:28 +0530 Subject: [PATCH 4/8] refactor: changed batch config Signed-off-by: tipusinghaw --- apps/issuance/src/issuance.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 32c533748..782f30b26 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -1172,7 +1172,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO // eslint-disable-next-line @typescript-eslint/explicit-function-return-type const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms)); - const batchSize = 1000; + const batchSize = 2000; // initial 1000 const uniqueJobId = uuidv4(); const limit = pLimit(1000); //adjust based on system capacity @@ -1228,7 +1228,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO // Wait for 60 seconds before processing the next batch, if more batches are remaining if ((batchIndex * batchSize) < bulkPayload.length) { - await delay(60000); + await delay(40000); //intially 60000 } // Optionally, trigger garbage collection to free up memory From c6eae780b66c3ea3e56d7a546d986435517272de Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Tue, 2 Jul 2024 15:04:18 +0530 Subject: [PATCH 5/8] feat: changed delay time for batch Signed-off-by: tipusinghaw --- apps/issuance/src/issuance.repository.ts | 10 ++++++++-- apps/issuance/src/issuance.service.ts | 20 +++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/apps/issuance/src/issuance.repository.ts b/apps/issuance/src/issuance.repository.ts index 534dcb09f..e1507e5f0 100644 --- a/apps/issuance/src/issuance.repository.ts +++ b/apps/issuance/src/issuance.repository.ts @@ -353,7 +353,10 @@ export class IssuanceRepository { const errorCount = await this.prisma.file_data.count({ where: { fileUploadId, - isError: true + OR: [ + { isError: true }, + { status: false } + ] } }); @@ -568,7 +571,10 @@ export class IssuanceRepository { return this.prisma.file_data.findMany({ where: { fileUploadId: fileId, - isError: true + OR: [ + { isError: true }, + { status: false } + ] } }); } catch (error) { diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 782f30b26..4d7513f58 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -1167,11 +1167,11 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO * @param orgId * @param requestId */ - // eslint-disable-next-line @typescript-eslint/explicit-function-return-type - private async processInBatches(bulkPayload, clientDetails, orgId, requestId) { + + private async processInBatches(bulkPayload, clientDetails, orgId, requestId):Promise { + + const delay = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); - // eslint-disable-next-line @typescript-eslint/explicit-function-return-type - const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms)); const batchSize = 2000; // initial 1000 const uniqueJobId = uuidv4(); const limit = pLimit(1000); //adjust based on system capacity @@ -1185,8 +1185,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO } // Helper function to process a batch - // eslint-disable-next-line @typescript-eslint/explicit-function-return-type - const processBatch = async (batch, batchIndex) => { + const processBatch = async (batch, batchIndex): Promise<[]> => { const queueJobsArray = batch.map((item) => ({ data: { id: item.id, @@ -1228,12 +1227,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO // Wait for 60 seconds before processing the next batch, if more batches are remaining if ((batchIndex * batchSize) < bulkPayload.length) { - await delay(40000); //intially 60000 - } - - // Optionally, trigger garbage collection to free up memory - if (global.gc) { - global.gc(); + await delay(60000); //intially 60000 } } } @@ -1320,7 +1314,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO } try { - await this.processInBatches(bulkPayload, clientDetails, orgId, requestId); + this.processInBatches(bulkPayload, clientDetails, orgId, requestId); } catch (error) { this.logger.error(`Error processing issuance data: ${error}`); } From d9e54685b2446c36301bbda35eeb8026b943ceef Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Wed, 3 Jul 2024 12:24:18 +0530 Subject: [PATCH 6/8] feat: added batch for retry Signed-off-by: tipusinghaw --- .../api-gateway/src/dtos/create-schema.dto.ts | 2 + .../interfaces/issuance.interfaces.ts | 7 +++ apps/issuance/src/issuance.service.ts | 48 +++++++++---------- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/apps/api-gateway/src/dtos/create-schema.dto.ts b/apps/api-gateway/src/dtos/create-schema.dto.ts index aa7ecde58..248e007de 100644 --- a/apps/api-gateway/src/dtos/create-schema.dto.ts +++ b/apps/api-gateway/src/dtos/create-schema.dto.ts @@ -115,6 +115,8 @@ export class CreateW3CSchemaDto { @ValidateNested({each: true}) @Type(() => W3CAttributeValue) @IsNotEmpty() + @ArrayMinSize(1) + @IsArray({ message: 'attributes must be an array' }) attributes: W3CAttributeValue []; @ApiProperty() diff --git a/apps/issuance/interfaces/issuance.interfaces.ts b/apps/issuance/interfaces/issuance.interfaces.ts index b9ce3ff5c..6b3d7c76a 100644 --- a/apps/issuance/interfaces/issuance.interfaces.ts +++ b/apps/issuance/interfaces/issuance.interfaces.ts @@ -345,3 +345,10 @@ export interface IDeletedFileUploadRecords { deleteFileDetails: Prisma.BatchPayload; deleteFileUploadDetails: Prisma.BatchPayload; } + +export interface BulkPayloadDetails { + clientId: string, + orgId: string, + requestId?: string, + isRetry: boolean +} diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 4d7513f58..17efe0469 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -9,7 +9,7 @@ import { CommonConstants } from '@credebl/common/common.constant'; import { ResponseMessages } from '@credebl/common/response-messages'; import { ClientProxy, RpcException } from '@nestjs/microservices'; import { map } from 'rxjs'; -import { CredentialOffer, FileUpload, FileUploadData, IAttributes, IBulkPayloadObject, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISchemaAttributes, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces'; +import { BulkPayloadDetails, CredentialOffer, FileUpload, FileUploadData, IAttributes, IBulkPayloadObject, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISchemaAttributes, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces'; import { IssuanceProcessState, OrgAgentType, PromiseResult, SchemaType, TemplateIdentifier} from '@credebl/enum/enum'; import * as QRCode from 'qrcode'; import { OutOfBandIssuance } from '../templates/out-of-band-issuance.template'; @@ -826,6 +826,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO message: `${ResponseMessages.issuance.error.walletError} at position ${iterationNo}`, error: `${errorStack?.error?.message} at position ${iterationNo}` }) ); + throw error; // Check With other issuance flow } else { errors.push( new RpcException({ @@ -834,6 +835,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO error: error?.response?.error }) ); + throw error; // Check With other issuance flow } } } @@ -1168,7 +1170,9 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO * @param requestId */ - private async processInBatches(bulkPayload, clientDetails, orgId, requestId):Promise { + private async processInBatches(bulkPayload, bulkPayloadDetails: BulkPayloadDetails):Promise { + + const {clientId, isRetry, orgId, requestId} = bulkPayloadDetails; const delay = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); @@ -1191,7 +1195,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO id: item.id, jobId: uniqueJobId, cacheId: requestId, - clientId: clientDetails.clientId, + clientId, referenceId: item.referenceId, fileUploadId: item.fileUploadId, schemaLedgerId: item.schemaId, @@ -1201,7 +1205,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO orgId, credentialType: item.credential_type, totalJobs: bulkPayload.length, - isRetry: false, + isRetry, isLastData: false } })); @@ -1314,7 +1318,15 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO } try { - this.processInBatches(bulkPayload, clientDetails, orgId, requestId); + + const bulkPayloadDetails: BulkPayloadDetails = { + clientId: clientDetails.clientId, + orgId, + requestId, + isRetry: false + }; + + this.processInBatches(bulkPayload, bulkPayloadDetails); } catch (error) { this.logger.error(`Error processing issuance data: ${error}`); } @@ -1344,28 +1356,14 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO const errorMessage = ResponseMessages.bulkIssuance.error.fileDetailsNotFound; throw new BadRequestException(`${errorMessage}`); } - const uniqueJobId = uuidv4(); - const queueJobsArrayPromises = bulkpayloadRetry.map(async (item) => ({ - data: { - id: item.id, - jobId: uniqueJobId, + + try { + const bulkPayloadDetails: BulkPayloadDetails = { clientId, - referenceId: item.referenceId, - fileUploadId: item.fileUploadId, - schemaLedgerId: item.schemaId, - credentialDefinitionId: item.credDefId, - status: item.status, - credential_data: item.credential_data, orgId, - credentialType: item.credential_type, - totalJobs: bulkpayloadRetry.length, - isRetry: true, - isLastData: false - } - })); - const queueJobsArray = await Promise.all(queueJobsArrayPromises); - try { - await this.bulkIssuanceQueue.addBulk(queueJobsArray); + isRetry: true + }; + this.processInBatches(bulkpayloadRetry, bulkPayloadDetails); } catch (error) { this.logger.error(`Error processing issuance data: ${error}`); } From d12908a64458cdf963ce8683e1b732d29e65b9b4 Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Fri, 5 Jul 2024 13:28:03 +0530 Subject: [PATCH 7/8] refactor: added common constant in libs Signed-off-by: tipusinghaw --- apps/issuance/src/issuance.service.ts | 19 ++++++++----------- libs/common/src/common.constant.ts | 5 +++++ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/apps/issuance/src/issuance.service.ts b/apps/issuance/src/issuance.service.ts index 17efe0469..33ee1667d 100644 --- a/apps/issuance/src/issuance.service.ts +++ b/apps/issuance/src/issuance.service.ts @@ -1172,13 +1172,11 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO private async processInBatches(bulkPayload, bulkPayloadDetails: BulkPayloadDetails):Promise { - const {clientId, isRetry, orgId, requestId} = bulkPayloadDetails; - - const delay = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); - - const batchSize = 2000; // initial 1000 + const {clientId, isRetry, orgId, requestId} = bulkPayloadDetails; + const delay = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); + const batchSize = CommonConstants.ISSUANCE_BATCH_SIZE; // initial 1000 const uniqueJobId = uuidv4(); - const limit = pLimit(1000); //adjust based on system capacity + const limit = pLimit(CommonConstants.ISSUANCE_MAX_CONCURRENT_OPERATIONS); // Generator function to yield batches // eslint-disable-next-line @typescript-eslint/explicit-function-return-type @@ -1223,7 +1221,6 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO for (const batch of createBatches(bulkPayload, batchSize)) { const resolvedBatchJobs = await processBatch(batch, batchIndex); - // Add the resolved jobs to the queue and clear the batch from memory this.logger.log("Adding resolved jobs to the queue:", resolvedBatchJobs); await this.bulkIssuanceQueue.addBulk(resolvedBatchJobs); @@ -1231,7 +1228,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO // Wait for 60 seconds before processing the next batch, if more batches are remaining if ((batchIndex * batchSize) < bulkPayload.length) { - await delay(60000); //intially 60000 + await delay(CommonConstants.ISSUANCE_BATCH_DELAY); } } } @@ -1599,13 +1596,13 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO const {parsedFileDetails, parsedData, fileUploadId, userId} = bulkPayloadObject; const limit = pLimit(CommonConstants.MAX_CONCURRENT_OPERATIONS); - const startTime = Date.now(); // Start timing the entire process + const startTime = Date.now(); const batches = await this.splitIntoBatches(parsedData, CommonConstants.BATCH_SIZE); this.logger.log("Total number of batches:", batches.length); for (const [index, batch] of batches.entries()) { - const batchStartTime = Date.now(); // Start timing the current batch + const batchStartTime = Date.now(); // Create an array of limited promises for the current batch const saveFileDetailsPromises = batch.map(element => limit(() => { @@ -1632,7 +1629,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO this.logger.log(`Batch ${index + 1} processed in ${(batchEndTime - batchStartTime)} milliseconds.`); } - const endTime = Date.now(); // End timing the entire process + const endTime = Date.now(); this.logger.log(`Total processing time: ${(endTime - startTime)} milliseconds.`); return true; } catch (error) { diff --git a/libs/common/src/common.constant.ts b/libs/common/src/common.constant.ts index d99333d36..e641f2922 100644 --- a/libs/common/src/common.constant.ts +++ b/libs/common/src/common.constant.ts @@ -328,6 +328,11 @@ CACHE_TTL_SECONDS = 604800, // Bulk-issuance BATCH_SIZE = 100, MAX_CONCURRENT_OPERATIONS = 50, +ISSUANCE_BATCH_SIZE = 2000, +ISSUANCE_MAX_CONCURRENT_OPERATIONS = 1000, +ISSUANCE_BATCH_DELAY = 60000 //Intially 60000 + + // MICROSERVICES NAMES API_GATEWAY_SERVICE = 'api-gateway', ORGANIZATION_SERVICE = 'organization', From 2da0528d1c75b95b8692d54122549a3c3b4f3327 Mon Sep 17 00:00:00 2001 From: tipusinghaw Date: Fri, 5 Jul 2024 13:33:29 +0530 Subject: [PATCH 8/8] refactor: removed duplicate value from schema DTO Signed-off-by: tipusinghaw --- apps/api-gateway/src/dtos/create-schema.dto.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/api-gateway/src/dtos/create-schema.dto.ts b/apps/api-gateway/src/dtos/create-schema.dto.ts index 4730f8d7a..4705dfe77 100644 --- a/apps/api-gateway/src/dtos/create-schema.dto.ts +++ b/apps/api-gateway/src/dtos/create-schema.dto.ts @@ -117,8 +117,6 @@ export class CreateW3CSchemaDto { @IsArray({ message: 'attributes must be an array' }) @ArrayMinSize(1) @IsNotEmpty() - @ArrayMinSize(1) - @IsArray({ message: 'attributes must be an array' }) attributes: W3CAttributeValue []; @ApiProperty()