Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch bulk issunace #826

Merged
merged 9 commits into from
Jul 5, 2024
7 changes: 7 additions & 0 deletions apps/issuance/interfaces/issuance.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,10 @@ export interface IDeletedFileUploadRecords {
deleteFileDetails: Prisma.BatchPayload;
deleteFileUploadDetails: Prisma.BatchPayload;
}

export interface BulkPayloadDetails {
clientId: string,
orgId: string,
requestId?: string,
isRetry: boolean
}
10 changes: 8 additions & 2 deletions apps/issuance/src/issuance.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ export class IssuanceRepository {
const errorCount = await this.prisma.file_data.count({
where: {
fileUploadId,
isError: true
OR: [
{ isError: true },
{ status: false }
]
}
});

Expand Down Expand Up @@ -568,7 +571,10 @@ export class IssuanceRepository {
return this.prisma.file_data.findMany({
where: {
fileUploadId: fileId,
isError: true
OR: [
{ isError: true },
{ status: false }
]
}
});
} catch (error) {
Expand Down
202 changes: 135 additions & 67 deletions apps/issuance/src/issuance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CommonConstants } from '@credebl/common/common.constant';
import { ResponseMessages } from '@credebl/common/response-messages';
import { ClientProxy, RpcException } from '@nestjs/microservices';
import { map } from 'rxjs';
import { CredentialOffer, FileUpload, FileUploadData, IAttributes, IBulkPayloadObject, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISchemaAttributes, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces';
import { BulkPayloadDetails, CredentialOffer, FileUpload, FileUploadData, IAttributes, IBulkPayloadObject, IClientDetails, ICreateOfferResponse, ICredentialPayload, IIssuance, IIssueData, IPattern, IQueuePayload, ISchemaAttributes, ISendOfferNatsPayload, ImportFileDetails, IssueCredentialWebhookPayload, OutOfBandCredentialOfferPayload, PreviewRequest, SchemaDetails, SendEmailCredentialOffer, TemplateDetailsInterface } from '../interfaces/issuance.interfaces';
import { IssuanceProcessState, OrgAgentType, PromiseResult, SchemaType, TemplateIdentifier} from '@credebl/enum/enum';
import * as QRCode from 'qrcode';
import { OutOfBandIssuance } from '../templates/out-of-band-issuance.template';
Expand All @@ -32,7 +32,7 @@ import { ICredentialOfferResponse, IDeletedIssuanceRecords, IIssuedCredential, I
import { OOBIssueCredentialDto } from 'apps/api-gateway/src/issuance/dtos/issuance.dto';
import { RecordType, agent_invitations, organisation, user } from '@prisma/client';
import { createOobJsonldIssuancePayload, validateEmail } from '@credebl/common/cast.helper';
import { sendEmail } from '@credebl/common/send-grid-helper-file';
// import { sendEmail } from '@credebl/common/send-grid-helper-file';
import * as pLimit from 'p-limit';
import { UserActivityRepository } from 'libs/user-activity/repositories';
import { validateW3CSchemaAttributes } from '../libs/helpers/attributes.validator';
Expand Down Expand Up @@ -803,7 +803,10 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
disposition: 'attachment'
}
];
const isEmailSent = await sendEmail(this.emailData);

const isEmailSent = true; // change this after testing on local
// const isEmailSent = await sendEmail(this.emailData);

this.logger.log(`isEmailSent ::: ${JSON.stringify(isEmailSent)}-${this.counter}`);
this.counter++;
if (!isEmailSent) {
Expand All @@ -823,6 +826,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
message: `${ResponseMessages.issuance.error.walletError} at position ${iterationNo}`,
error: `${errorStack?.error?.message} at position ${iterationNo}` })
);
throw error; // Check With other issuance flow
} else {
errors.push(
new RpcException({
Expand All @@ -831,6 +835,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
error: error?.response?.error
})
);
throw error; // Check With other issuance flow
}
}
}
Expand Down Expand Up @@ -1157,25 +1162,114 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
return new Promise(resolve => setTimeout(resolve, ms));
}

async issueBulkCredential(requestId: string, orgId: string, clientDetails: IClientDetails, reqPayload: ImportFileDetails): Promise<string> {
/**
* Processes bulk payload in batches and adds jobs to the queue.
* @param bulkPayload
* @param clientDetails
* @param orgId
* @param requestId
*/

private async processInBatches(bulkPayload, bulkPayloadDetails: BulkPayloadDetails):Promise<void> {

const {clientId, isRetry, orgId, requestId} = bulkPayloadDetails;
const delay = (ms: number): Promise<void> => new Promise<void>((resolve) => setTimeout(resolve, ms));
const batchSize = CommonConstants.ISSUANCE_BATCH_SIZE; // initial 1000
const uniqueJobId = uuidv4();
const limit = pLimit(CommonConstants.ISSUANCE_MAX_CONCURRENT_OPERATIONS);

// Generator function to yield batches
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
function* createBatches(array, size) {
for (let i = 0; i < array.length; i += size) {
yield array.slice(i, i + size);
}
}

// Helper function to process a batch
const processBatch = async (batch, batchIndex): Promise<[]> => {
const queueJobsArray = batch.map((item) => ({
data: {
id: item.id,
jobId: uniqueJobId,
cacheId: requestId,
clientId,
referenceId: item.referenceId,
fileUploadId: item.fileUploadId,
schemaLedgerId: item.schemaId,
credentialDefinitionId: item.credDefId,
status: item.status,
credential_data: item.credential_data,
orgId,
credentialType: item.credential_type,
totalJobs: bulkPayload.length,
isRetry,
isLastData: false
}
}));

this.logger.log(`Processing batch ${batchIndex + 1} with ${batch.length} items.`);

// Execute the batched jobs with limited concurrency
await Promise.all(queueJobsArray.map(job => limit(() => job)));

return queueJobsArray;
};

let batchIndex = 0;

for (const batch of createBatches(bulkPayload, batchSize)) {
const resolvedBatchJobs = await processBatch(batch, batchIndex);

this.logger.log("Adding resolved jobs to the queue:", resolvedBatchJobs);
await this.bulkIssuanceQueue.addBulk(resolvedBatchJobs);

batchIndex++;

// Wait for 60 seconds before processing the next batch, if more batches are remaining
if ((batchIndex * batchSize) < bulkPayload.length) {
await delay(CommonConstants.ISSUANCE_BATCH_DELAY);
}
}
}

/**
* Handles bulk credential issuance.
* @param requestId - The request ID.
* @param orgId - The organization ID.
* @param clientDetails - Client details.
* @param reqPayload - Request payload containing file details.
* @returns A promise resolving to a success message.
*/
async issueBulkCredential(
requestId: string,
orgId: string,
clientDetails: IClientDetails,
reqPayload: ImportFileDetails
): Promise<string> {
if (!requestId) {
throw new BadRequestException(ResponseMessages.issuance.error.missingRequestId);
}

const fileUpload: FileUpload = {
lastChangedDateTime: null,
upload_type: '',
status: '',
orgId: '',
createDateTime: null
createDateTime: null,
name: '',
credentialType: ''
};

let csvFileDetail;

try {
let cachedData = await this.cacheManager.get(requestId);
if (!cachedData) {
throw new BadRequestException(ResponseMessages.issuance.error.cacheTimeOut);
}
//for demo UI

// For demo UI
if (cachedData && clientDetails?.isSelectiveIssuance) {
await this.cacheManager.del(requestId);
await this.uploadCSVTemplate(reqPayload, requestId);
Expand All @@ -1198,58 +1292,46 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
fileUpload.createDateTime = new Date();
fileUpload.name = parsedFileDetails.fileName;
fileUpload.credentialType = parsedFileDetails.credentialType;

csvFileDetail = await this.issuanceRepository.saveFileUploadDetails(fileUpload, clientDetails.userId);

csvFileDetail = await this.issuanceRepository.saveFileUploadDetails(fileUpload, clientDetails.userId);

const bulkPayloadObject: IBulkPayloadObject = {
parsedData,
parsedFileDetails,
userId: clientDetails.userId,
fileUploadId: csvFileDetail.id
};
const bulkPayloadObject: IBulkPayloadObject = {
parsedData,
parsedFileDetails,
userId: clientDetails.userId,
fileUploadId: csvFileDetail.id
};

const storeBulkPayload = await this._storeBulkPayloadInBatch(bulkPayloadObject);

if (!storeBulkPayload) {
throw new BadRequestException(ResponseMessages.issuance.error.storeBulkData);
}

const bulkpayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id);
if (!bulkpayload) {
// Process in batches
const bulkPayload = await this.issuanceRepository.getFileDetails(csvFileDetail.id);
if (!bulkPayload) {
throw new BadRequestException(ResponseMessages.issuance.error.fileData);
}
const uniqueJobId = uuidv4();
const queueJobsArrayPromises = bulkpayload.map(async (item) => ({
data: {
id: item.id,
jobId: uniqueJobId,
cacheId: requestId,
clientId: clientDetails.clientId,
referenceId: item.referenceId,
fileUploadId: item.fileUploadId,
schemaLedgerId: item.schemaId,
credentialDefinitionId: item.credDefId,
status: item.status,
credential_data: item.credential_data,
orgId,
credentialType: item.credential_type,
totalJobs: bulkpayload.length,
isRetry: false,
isLastData: false
}
}));

const queueJobsArray = await Promise.all(queueJobsArrayPromises);
try {
await this.bulkIssuanceQueue.addBulk(queueJobsArray);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}
try {

const bulkPayloadDetails: BulkPayloadDetails = {
clientId: clientDetails.clientId,
orgId,
requestId,
isRetry: false
};

this.processInBatches(bulkPayload, bulkPayloadDetails);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}

return ResponseMessages.issuance.success.bulkProcess;
} catch (error) {
fileUpload.status = FileUploadStatus.interrupted;
this.logger.error(`error in issueBulkCredential : ${error}`);
this.logger.error(`Error in issueBulkCredential: ${error}`);
throw new RpcException(error.response);
} finally {
if (csvFileDetail !== undefined && csvFileDetail.id !== undefined) {
Expand All @@ -1271,28 +1353,14 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
const errorMessage = ResponseMessages.bulkIssuance.error.fileDetailsNotFound;
throw new BadRequestException(`${errorMessage}`);
}
const uniqueJobId = uuidv4();
const queueJobsArrayPromises = bulkpayloadRetry.map(async (item) => ({
data: {
id: item.id,
jobId: uniqueJobId,

try {
const bulkPayloadDetails: BulkPayloadDetails = {
clientId,
referenceId: item.referenceId,
fileUploadId: item.fileUploadId,
schemaLedgerId: item.schemaId,
credentialDefinitionId: item.credDefId,
status: item.status,
credential_data: item.credential_data,
orgId,
credentialType: item.credential_type,
totalJobs: bulkpayloadRetry.length,
isRetry: true,
isLastData: false
}
}));
const queueJobsArray = await Promise.all(queueJobsArrayPromises);
try {
await this.bulkIssuanceQueue.addBulk(queueJobsArray);
isRetry: true
};
this.processInBatches(bulkpayloadRetry, bulkPayloadDetails);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}
Expand Down Expand Up @@ -1330,7 +1398,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
fileUploadData.fileRow = JSON.stringify(jobDetails);
fileUploadData.isError = false;
fileUploadData.createDateTime = new Date();
fileUploadData.referenceId = jobDetails.credential_data.email_identifier;
fileUploadData.referenceId = jobDetails?.credential_data?.email_identifier;
fileUploadData.jobId = jobDetails.id;
const { orgId } = jobDetails;

Expand Down Expand Up @@ -1528,13 +1596,13 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
const {parsedFileDetails, parsedData, fileUploadId, userId} = bulkPayloadObject;

const limit = pLimit(CommonConstants.MAX_CONCURRENT_OPERATIONS);
const startTime = Date.now(); // Start timing the entire process
const startTime = Date.now();
const batches = await this.splitIntoBatches(parsedData, CommonConstants.BATCH_SIZE);
this.logger.log("Total number of batches:", batches.length);

for (const [index, batch] of batches.entries()) {

const batchStartTime = Date.now(); // Start timing the current batch
const batchStartTime = Date.now();

// Create an array of limited promises for the current batch
const saveFileDetailsPromises = batch.map(element => limit(() => {
Expand All @@ -1561,7 +1629,7 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
this.logger.log(`Batch ${index + 1} processed in ${(batchEndTime - batchStartTime)} milliseconds.`);
}

const endTime = Date.now(); // End timing the entire process
const endTime = Date.now();
this.logger.log(`Total processing time: ${(endTime - startTime)} milliseconds.`);
return true;
} catch (error) {
Expand Down
5 changes: 5 additions & 0 deletions libs/common/src/common.constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ CACHE_TTL_SECONDS = 604800,
// Bulk-issuance
BATCH_SIZE = 100,
MAX_CONCURRENT_OPERATIONS = 50,
ISSUANCE_BATCH_SIZE = 2000,
ISSUANCE_MAX_CONCURRENT_OPERATIONS = 1000,
ISSUANCE_BATCH_DELAY = 60000 //Intially 60000


// MICROSERVICES NAMES
API_GATEWAY_SERVICE = 'api-gateway',
ORGANIZATION_SERVICE = 'organization',
Expand Down