Skip to content

Commit

Permalink
feat: bulk issuance implementation using nest queue (#732)
Browse files Browse the repository at this point in the history
* feat: implemented bulk issuance with nest queue

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* refactor: changed comment message

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

---------

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>
  • Loading branch information
tipusinghaw authored May 22, 2024
1 parent e3dfbd3 commit a5c5906
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 25 deletions.
9 changes: 1 addition & 8 deletions apps/api-gateway/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { UserModule } from './user/user.module';
import { ConnectionModule } from './connection/connection.module';
import { EcosystemModule } from './ecosystem/ecosystem.module';
import { getNatsOptions } from '@credebl/common/nats.config';
import { BullModule } from '@nestjs/bull';
import { CacheModule } from '@nestjs/cache-manager';
import * as redisStore from 'cache-manager-redis-store';
import { WebhookModule } from './webhook/webhook.module';
Expand Down Expand Up @@ -54,13 +53,7 @@ import { NotificationModule } from './notification/notification.module';
UtilitiesModule,
WebhookModule,
NotificationModule,
CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }),
BullModule.forRoot({
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT)
}
})
CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT })
],
controllers: [AppController],
providers: [AppService]
Expand Down
6 changes: 6 additions & 0 deletions apps/issuance/src/issuance.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import { AwsService } from '@credebl/aws';
]),
CommonModule,
CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }),
BullModule.forRoot({
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT)
}
}),
BullModule.registerQueue({
name: 'bulk-issuance'
})
Expand Down
4 changes: 2 additions & 2 deletions apps/issuance/src/issuance.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ export class BulkIssuanceProcessor {
@OnQueueActive()
onActive(job: Job): void {
this.logger.log(
`Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...`
`Emitting job status${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...`
);
}

@Process('issue-credential')
@Process()
async issueCredential(job: Job<unknown>):Promise<void> {
this.logger.log(
`Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(job.data)}...`
Expand Down
38 changes: 23 additions & 15 deletions apps/issuance/src/issuance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1085,23 +1085,31 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
if (!respFile) {
throw new BadRequestException(ResponseMessages.issuance.error.fileData);
}
// ------------------------ Remove after debugging ---------------------------
const queueRunningStatus = await this.bulkIssuanceQueue.isReady();
this.logger.log(`respFile::::::`, respFile);
// eslint-disable-next-line no-console
console.log('queueRunningStatus:::::::::', queueRunningStatus);
// ------------------------ Remove after debugging ---------------------------

for (const element of respFile) {
try {
const payload = {
data: element.credential_data,
fileUploadId: element.fileUploadId,
clientId: clientDetails.clientId,
cacheId: requestId,
credentialDefinitionId: element.credDefId,
schemaLedgerId: element.schemaId,
isRetry: false,
orgId,
id: element.id,
isLastData: respFile.indexOf(element) === respFile.length - 1
};

await this.delay(500); // Wait for 0.5 secends
this.processIssuanceData(payload);
this.logger.log(`element log::::::`, element); //Remove after debugging
this.bulkIssuanceQueue.add(
{
data: element.credential_data,
fileUploadId: element.fileUploadId,
clientId: clientDetails.clientId,
cacheId: requestId,
credentialDefinitionId: element.credDefId,
schemaLedgerId: element.schemaId,
isRetry: false,
orgId,
id: element.id,
isLastData: respFile.indexOf(element) === respFile.length - 1
},
{ delay: 5000 }
);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}
Expand Down

0 comments on commit a5c5906

Please sign in to comment.