Skip to content

Commit

Permalink
feat: batch bulk issunace (#826)
Browse files Browse the repository at this point in the history
* feat: batch bulk issuance

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* stoped queue to remove issuance data

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* feat: added delay for 1 min

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* refactor: changed batch config

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* feat: changed delay time for batch

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* feat: added batch for retry

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* refactor: added common constant in libs

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* refactor: removed duplicate value from schema DTO

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

---------

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>
  • Loading branch information
tipusinghaw authored and KulkarniShashank committed Sep 6, 2024
1 parent 35ffa11 commit 912bcd0
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 69 deletions.
7 changes: 7 additions & 0 deletions apps/issuance/interfaces/issuance.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,10 @@ export interface IDeletedFileUploadRecords {
deleteFileDetails: Prisma.BatchPayload;
deleteFileUploadDetails: Prisma.BatchPayload;
}

export interface BulkPayloadDetails {
clientId: string,
orgId: string,
requestId?: string,
isRetry: boolean
}
10 changes: 8 additions & 2 deletions apps/issuance/src/issuance.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ export class IssuanceRepository {
const errorCount = await this.prisma.file_data.count({
where: {
fileUploadId,
isError: true
OR: [
{ isError: true },
{ status: false }
]
}
});

Expand Down Expand Up @@ -568,7 +571,10 @@ export class IssuanceRepository {
return this.prisma.file_data.findMany({
where: {
fileUploadId: fileId,
isError: true
OR: [
{ isError: true },
{ status: false }
]
}
});
} catch (error) {
Expand Down
202 changes: 135 additions & 67 deletions apps/issuance/src/issuance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CommonConstants } from '@credebl/common/common.constant';
import { ResponseMessages } from '@credebl/common/response-messages';
import { ClientProxy, RpcException } from '@nestjs/microservices';
import { map } from 'rxjs';
import { CredentialOffer, FileUpload, FileUploadData, IAttributes, IBulkPayloadObject, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISchemaAttributes, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces';
import { BulkPayloadDetails, CredentialOffer, FileUpload, FileUploadData, IAttributes, IBulkPayloadObject, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISchemaAttributes, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces';
import { IssuanceProcessState, OrgAgentType, PromiseResult, SchemaType, TemplateIdentifier} from '@credebl/enum/enum';
import * as QRCode from 'qrcode';
import { OutOfBandIssuance } from '../templates/out-of-band-issuance.template';
Expand All @@ -32,7 +32,7 @@ import { ICredentialOfferResponse, IDeletedIssuanceRecords, IIssuedCredential, I
import { OOBIssueCredentialDto } from 'apps/api-gateway/src/issuance/dtos/issuance.dto';
import { RecordType, agent_invitations, organisation, user } from '@prisma/client';
import { createOobJsonldIssuancePayload, validateEmail } from '@credebl/common/cast.helper';
import { sendEmail } from '@credebl/common/send-grid-helper-file';
// import { sendEmail } from '@credebl/common/send-grid-helper-file';
import * as pLimit from 'p-limit';
import { UserActivityRepository } from 'libs/user-activity/repositories';
import { validateW3CSchemaAttributes } from '../libs/helpers/attributes.validator';
Expand Down Expand Up @@ -803,7 +803,10 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
disposition: 'attachment'
}
];
const isEmailSent = await sendEmail(this.emailData);

const isEmailSent = true; // change this after testing on local
// const isEmailSent = await sendEmail(this.emailData);

this.logger.log(`isEmailSent ::: ${JSON.stringify(isEmailSent)}-${this.counter}`);
this.counter++;
if (!isEmailSent) {
Expand All @@ -823,6 +826,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
message: `${ResponseMessages.issuance.error.walletError} at position ${iterationNo}`,
error: `${errorStack?.error?.message} at position ${iterationNo}` })
);
throw error; // Check With other issuance flow
} else {
errors.push(
new RpcException({
Expand All @@ -831,6 +835,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
error: error?.response?.error
})
);
throw error; // Check With other issuance flow
}
}
}
Expand Down Expand Up @@ -1157,25 +1162,114 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
return new Promise(resolve => setTimeout(resolve, ms));
}

async issueBulkCredential(requestId: string, orgId: string, clientDetails: IClientDetails, reqPayload: ImportFileDetails): Promise<string> {
/**
* Processes bulk payload in batches and adds jobs to the queue.
* @param bulkPayload
* @param clientDetails
* @param orgId
* @param requestId
*/

private async processInBatches(bulkPayload, bulkPayloadDetails: BulkPayloadDetails):Promise<void> {

const {clientId, isRetry, orgId, requestId} = bulkPayloadDetails;
const delay = (ms: number): Promise<void> => new Promise<void>((resolve) => setTimeout(resolve, ms));
const batchSize = CommonConstants.ISSUANCE_BATCH_SIZE; // initial 1000
const uniqueJobId = uuidv4();
const limit = pLimit(CommonConstants.ISSUANCE_MAX_CONCURRENT_OPERATIONS);

// Generator function to yield batches
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
function* createBatches(array, size) {
for (let i = 0; i < array.length; i += size) {
yield array.slice(i, i + size);
}
}

// Helper function to process a batch
const processBatch = async (batch, batchIndex): Promise<[]> => {
const queueJobsArray = batch.map((item) => ({
data: {
id: item.id,
jobId: uniqueJobId,
cacheId: requestId,
clientId,
referenceId: item.referenceId,
fileUploadId: item.fileUploadId,
schemaLedgerId: item.schemaId,
credentialDefinitionId: item.credDefId,
status: item.status,
credential_data: item.credential_data,
orgId,
credentialType: item.credential_type,
totalJobs: bulkPayload.length,
isRetry,
isLastData: false
}
}));

this.logger.log(`Processing batch ${batchIndex + 1} with ${batch.length} items.`);

// Execute the batched jobs with limited concurrency
await Promise.all(queueJobsArray.map(job => limit(() => job)));

return queueJobsArray;
};

let batchIndex = 0;

for (const batch of createBatches(bulkPayload, batchSize)) {
const resolvedBatchJobs = await processBatch(batch, batchIndex);

this.logger.log("Adding resolved jobs to the queue:", resolvedBatchJobs);
await this.bulkIssuanceQueue.addBulk(resolvedBatchJobs);

batchIndex++;

// Wait for 60 seconds before processing the next batch, if more batches are remaining
if ((batchIndex * batchSize) < bulkPayload.length) {
await delay(CommonConstants.ISSUANCE_BATCH_DELAY);
}
}
}

/**
* Handles bulk credential issuance.
* @param requestId - The request ID.
* @param orgId - The organization ID.
* @param clientDetails - Client details.
* @param reqPayload - Request payload containing file details.
* @returns A promise resolving to a success message.
*/
async issueBulkCredential(
requestId: string,
orgId: string,
clientDetails: IClientDetails,
reqPayload: ImportFileDetails
): Promise<string> {
if (!requestId) {
throw new BadRequestException(ResponseMessages.issuance.error.missingRequestId);
}

const fileUpload: FileUpload = {
lastChangedDateTime: null,
upload_type: '',
status: '',
orgId: '',
createDateTime: null
createDateTime: null,
name: '',
credentialType: ''
};

let csvFileDetail;

try {
let cachedData = await this.cacheManager.get(requestId);
if (!cachedData) {
throw new BadRequestException(ResponseMessages.issuance.error.cacheTimeOut);
}
//for demo UI

// For demo UI
if (cachedData && clientDetails?.isSelectiveIssuance) {
await this.cacheManager.del(requestId);
await this.uploadCSVTemplate(reqPayload, requestId);
Expand All @@ -1198,58 +1292,46 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
fileUpload.createDateTime = new Date();
fileUpload.name = parsedFileDetails.fileName;
fileUpload.credentialType = parsedFileDetails.credentialType;

csvFileDetail = await this.issuanceRepository.saveFileUploadDetails(fileUpload, clientDetails.userId);

csvFileDetail = await this.issuanceRepository.saveFileUploadDetails(fileUpload, clientDetails.userId);

const bulkPayloadObject: IBulkPayloadObject = {
parsedData,
parsedFileDetails,
userId: clientDetails.userId,
fileUploadId: csvFileDetail.id
};
const bulkPayloadObject: IBulkPayloadObject = {
parsedData,
parsedFileDetails,
userId: clientDetails.userId,
fileUploadId: csvFileDetail.id
};

const storeBulkPayload = await this._storeBulkPayloadInBatch(bulkPayloadObject);

if (!storeBulkPayload) {
throw new BadRequestException(ResponseMessages.issuance.error.storeBulkData);
}

const bulkpayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id);
if (!bulkpayload) {
// Process in batches
const bulkPayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id);
if (!bulkPayload) {
throw new BadRequestException(ResponseMessages.issuance.error.fileData);
}
const uniqueJobId = uuidv4();
const queueJobsArrayPromises = bulkpayload.map(async (item) => ({
data: {
id: item.id,
jobId: uniqueJobId,
cacheId: requestId,
clientId: clientDetails.clientId,
referenceId: item.referenceId,
fileUploadId: item.fileUploadId,
schemaLedgerId: item.schemaId,
credentialDefinitionId: item.credDefId,
status: item.status,
credential_data: item.credential_data,
orgId,
credentialType: item.credential_type,
totalJobs: bulkpayload.length,
isRetry: false,
isLastData: false
}
}));

const queueJobsArray = await Promise.all(queueJobsArrayPromises);
try {
await this.bulkIssuanceQueue.addBulk(queueJobsArray);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}
try {

const bulkPayloadDetails: BulkPayloadDetails = {
clientId: clientDetails.clientId,
orgId,
requestId,
isRetry: false
};

this.processInBatches(bulkPayload, bulkPayloadDetails);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}

return ResponseMessages.issuance.success.bulkProcess;
} catch (error) {
fileUpload.status = FileUploadStatus.interrupted;
this.logger.error(`error in issueBulkCredential : ${error}`);
this.logger.error(`Error in issueBulkCredential: ${error}`);
throw new RpcException(error.response);
} finally {
if (csvFileDetail !== undefined && csvFileDetail.id !== undefined) {
Expand All @@ -1271,28 +1353,14 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
const errorMessage = ResponseMessages.bulkIssuance.error.fileDetailsNotFound;
throw new BadRequestException(`${errorMessage}`);
}
const uniqueJobId = uuidv4();
const queueJobsArrayPromises = bulkpayloadRetry.map(async (item) => ({
data: {
id: item.id,
jobId: uniqueJobId,

try {
const bulkPayloadDetails: BulkPayloadDetails = {
clientId,
referenceId: item.referenceId,
fileUploadId: item.fileUploadId,
schemaLedgerId: item.schemaId,
credentialDefinitionId: item.credDefId,
status: item.status,
credential_data: item.credential_data,
orgId,
credentialType: item.credential_type,
totalJobs: bulkpayloadRetry.length,
isRetry: true,
isLastData: false
}
}));
const queueJobsArray = await Promise.all(queueJobsArrayPromises);
try {
await this.bulkIssuanceQueue.addBulk(queueJobsArray);
isRetry: true
};
this.processInBatches(bulkpayloadRetry, bulkPayloadDetails);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}
Expand Down Expand Up @@ -1330,7 +1398,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
fileUploadData.fileRow = JSON.stringify(jobDetails);
fileUploadData.isError = false;
fileUploadData.createDateTime = new Date();
fileUploadData.referenceId = jobDetails.credential_data.email_identifier;
fileUploadData.referenceId = jobDetails?.credential_data?.email_identifier;
fileUploadData.jobId = jobDetails.id;
const { orgId } = jobDetails;

Expand Down Expand Up @@ -1528,13 +1596,13 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
const {parsedFileDetails, parsedData, fileUploadId, userId} = bulkPayloadObject;

const limit = pLimit(CommonConstants.MAX_CONCURRENT_OPERATIONS);
const startTime = Date.now(); // Start timing the entire process
const startTime = Date.now();
const batches = await this.splitIntoBatches(parsedData, CommonConstants.BATCH_SIZE);
this.logger.log("Total number of batches:", batches.length);

for (const [index, batch] of batches.entries()) {

const batchStartTime = Date.now(); // Start timing the current batch
const batchStartTime = Date.now();

// Create an array of limited promises for the current batch
const saveFileDetailsPromises = batch.map(element => limit(() => {
Expand All @@ -1561,7 +1629,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
this.logger.log(`Batch ${index + 1} processed in ${(batchEndTime - batchStartTime)} milliseconds.`);
}

const endTime = Date.now(); // End timing the entire process
const endTime = Date.now();
this.logger.log(`Total processing time: ${(endTime - startTime)} milliseconds.`);
return true;
} catch (error) {
Expand Down
5 changes: 5 additions & 0 deletions libs/common/src/common.constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ CACHE_TTL_SECONDS = 604800,
// Bulk-issuance
BATCH_SIZE = 100,
MAX_CONCURRENT_OPERATIONS = 50,
ISSUANCE_BATCH_SIZE = 2000,
ISSUANCE_MAX_CONCURRENT_OPERATIONS = 1000,
ISSUANCE_BATCH_DELAY = 60000 //Intially 60000


// MICROSERVICES NAMES
API_GATEWAY_SERVICE = 'api-gateway',
ORGANIZATION_SERVICE = 'organization',
Expand Down

0 comments on commit 912bcd0

Please sign in to comment.