Skip to content

Commit

Permalink
feat: implemented loop logic for bulk-issuance
Browse files Browse the repository at this point in the history
Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>
Signed-off-by: KulkarniShashank <shashank.kulkarni@ayanworks.com>
  • Loading branch information
tipusinghaw authored and KulkarniShashank committed Sep 11, 2024
1 parent fc1510f commit 01bbac1
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 78 deletions.
6 changes: 0 additions & 6 deletions apps/api-gateway/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ import * as redisStore from 'cache-manager-redis-store';
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT)
}
}),
BullModule.registerQueue({
name: 'bulk-issuance',
redis: {
port: parseInt(process.env.REDIS_PORT)
}
})
],
controllers: [AppController],
Expand Down
38 changes: 19 additions & 19 deletions apps/api-gateway/src/issuance/issuance.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,25 +249,25 @@ export class IssuanceController {
throw err;
}
}
);

const reqPayload: RequestPayload = {
credDefId: credentialDefinitionId,
filePath: `${process.env.PWD}/uploadedFiles/import/${newFilename}`,
fileName: newFilename
};
this.logger.log(`reqPayload::::::${JSON.stringify(reqPayload)}`);
const importCsvDetails = await this.issueCredentialService.importCsv(
reqPayload
);
const finalResponse: IResponseType = {
statusCode: HttpStatus.CREATED,
message: ResponseMessages.issuance.success.importCSV,
data: importCsvDetails.response
};
return res.status(HttpStatus.CREATED).json(finalResponse);

}
const reqPayload: RequestPayload = {
credDefId: credentialDefinitionId,
fileKey,
fileName:file?.originalname
};
this.logger.log(`reqPayload::::::${JSON.stringify(reqPayload)}`);
const importCsvDetails = await this.issueCredentialService.importCsv(
reqPayload
);
const finalResponse: IResponseType = {
statusCode: HttpStatus.CREATED,
message: ResponseMessages.issuance.success.importCSV,
data: importCsvDetails.response
};
return res.status(HttpStatus.CREATED).json(finalResponse);
}
} catch (error) {
throw new RpcException(error.response ? error.response : error);
}
}


Expand Down
2 changes: 1 addition & 1 deletion apps/issuance/interfaces/issuance.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export interface FileUpload {
name?: string;
upload_type?: string;
status?: string;
orgId?: number | string;
orgId?: string;
createDateTime?: Date;
lastChangedDateTime?: Date;
}
Expand Down
11 changes: 1 addition & 10 deletions apps/issuance/src/issuance.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,8 @@ import { AwsService } from '@credebl/aws';
]),
CommonModule,
CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }),
BullModule.forRoot({
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT)
}
}),
BullModule.registerQueue({
name: 'bulk-issuance',
redis: {
port: parseInt(process.env.REDIS_PORT)
}
name: 'bulk-issuance'
})
],
controllers: [IssuanceController],
Expand Down
2 changes: 1 addition & 1 deletion apps/issuance/src/issuance.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ export class BulkIssuanceProcessor {
`Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...`
);

this.issuanceService.processIssuanceData(job.data);
await this.issuanceService.processIssuanceData(job.data);
}
}
39 changes: 20 additions & 19 deletions apps/issuance/src/issuance.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { PrismaService } from '@credebl/prisma-service';
// eslint-disable-next-line camelcase
import { agent_invitations, credentials, file_data, file_upload, org_agents, organisation, platform_config, shortening_url } from '@prisma/client';
import { ResponseMessages } from '@credebl/common/response-messages';
import { FileUpload, FileUploadData, PreviewRequest, SchemaDetails } from '../interfaces/issuance.interfaces';
import { FileUploadData, PreviewRequest, SchemaDetails } from '../interfaces/issuance.interfaces';
@Injectable()
export class IssuanceRepository {

Expand Down Expand Up @@ -198,13 +198,13 @@ export class IssuanceRepository {
}
}

async saveFileUploadDetails(fileUploadPayload: FileUpload): Promise<file_upload> {
async saveFileUploadDetails(fileUploadPayload): Promise<file_upload> {
try {
const { name, orgId, status, upload_type } = fileUploadPayload;
const { name, status, upload_type, orgId } = fileUploadPayload;
return this.prisma.file_upload.create({
data: {
name,
orgId: `${orgId}`,
orgId: String(orgId),
status,
upload_type
}
Expand All @@ -216,18 +216,15 @@ export class IssuanceRepository {
}
}

async updateFileUploadDetails(fileId: string, fileUploadPayload: FileUpload): Promise<file_upload> {
async updateFileUploadDetails(fileId: string, fileUploadPayload): Promise<file_upload> {
try {
const { name, orgId, status, upload_type } = fileUploadPayload;
const { status } = fileUploadPayload;
return this.prisma.file_upload.update({
where: {
id: fileId
},
data: {
name,
orgId: `${orgId}`,
status,
upload_type
status
}
});

Expand Down Expand Up @@ -338,18 +335,22 @@ export class IssuanceRepository {
}
}

async getOrgAgentType(orgAgentId: string): Promise<string> {
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/explicit-function-return-type, @typescript-eslint/no-unused-vars
async saveFileDetails(fileData) {
try {
// const { fileUpload, isError, referenceId, error, detailError } = fileData;
// return this.prisma.file_data.create({
// data: {
// detailError,
// error,
// isError,
// referenceId,
// fileUploadId: fileUpload
// }
// });

const { agent } = await this.prisma.org_agents_type.findFirst({
where: {
id: orgAgentId
}
});

return agent;
} catch (error) {
this.logger.error(`[getOrgAgentType] - error: ${JSON.stringify(error)}`);
this.logger.error(`[saveFileUploadData] - error: ${JSON.stringify(error)}`);
throw error;
}
}
Expand Down
44 changes: 22 additions & 22 deletions apps/issuance/src/issuance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,9 @@ export class IssuanceService {

const parsedData = JSON.parse(cachedData as string).fileData.data;
const parsedPrimeDetails = JSON.parse(cachedData as string);

fileUpload.upload_type = FileUploadType.Issuance;
fileUpload.status = FileUploadStatus.started;
fileUpload.orgId = orgId;
fileUpload.orgId = String(orgId);
fileUpload.createDateTime = new Date();

if (parsedPrimeDetails && parsedPrimeDetails.fileName) {
Expand All @@ -753,29 +752,30 @@ export class IssuanceService {

respFileUpload = await this.issuanceRepository.saveFileUploadDetails(fileUpload);

this.logger.log(`respFileUpload----${JSON.stringify(respFileUpload)}`);
await parsedData.forEach(async (element, index) => {
this.logger.log(`element----${JSON.stringify(element)}`);
try {
this.bulkIssuanceQueue.add(
'issue-credential',
{
data: element,
fileUploadId: respFileUpload.id,
cacheId: requestId,
credentialDefinitionId: parsedPrimeDetails.credentialDefinitionId,
schemaLedgerId: parsedPrimeDetails.schemaLedgerId,
orgId,
isLastData: index === parsedData.length - 1
},
{ delay: 5000 }
);
} catch (error) {
this.logger.error('Error adding item to the queue:', error);
}

await parsedData.forEach(async (element) => {

await this.issuanceRepository.saveFileDetails(element);
});

// this.logger.log(`respFileUpload----${JSON.stringify(respFileUpload)}`);
// await parsedData.forEach(async (element, index) => {
// this.logger.log(`element11----${JSON.stringify(element)}`);
// const payload =
// {
// data: element,
// fileUploadId: respFileUpload.id,
// cacheId: requestId,
// credentialDefinitionId: parsedPrimeDetails.credentialDefinitionId,
// schemaLedgerId: parsedPrimeDetails.schemaLedgerId,
// orgId,
// isLastData: index === parsedData.length - 1
// };

// this.processIssuanceData(payload);

// });

return 'Process initiated for bulk issuance';
} catch (error) {
fileUpload.status = FileUploadStatus.interrupted;
Expand Down

0 comments on commit 01bbac1

Please sign in to comment.